import asyncio import logging import requests, json, pytz from datetime import datetime from time import sleep import random from app import arg_parser, config_parser, telegram_bot, google_calendar, utils args = arg_parser.ArgParser().parse() config = config_parser.ConfigParser().loadConfig() from app import strings # Configurar logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) class timenetManager: telegram = None message_acumulator = "" headers = {} def __init__(self): try: logger.info("Inicializando timenetManager...") self.telegram = telegram_bot.telegramBot() logger.info("Telegram bot inicializado correctamente") if args.pin is None: # if pin is None, prompt for it logger.info("PIN no proporcionado, solicitando al usuario...") args.pin = int(input("Introduzca su pin: ")) logger.info("PIN recibido") self.headers = { "Content-type": "application/x-www-form-urlencoded; charset=UTF-8", "Accept": "application/json, text/plain, */*", "Referer": "https://timenet-wcp.gpisoftware.com/", "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": "\"Linux\"", "Sec-Ch-Ua": "\"Chromium\";v=\"138\", \"Not=A?Brand\";v=\"8\", \"Google Chrome\";v=\"138\"", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", strings.tnv_string: "wcp_8.0.0.2", "User-Agent": strings.user_agent, "Tn-U": strings.tn_u } logger.info("Headers configurados, iniciando login...") self.login() logger.info("timenetManager inicializado correctamente") except Exception as e: error_msg = f"Error al inicializar timenetManager: {str(e)}" logger.error(error_msg, exc_info=True) try: asyncio.run(self.telegram.sendMessage(f"馃煡馃晲 {error_msg}")) except: pass raise def get(self, url, data={}, headers={}): try: furl = config['api_url'] + url if url != "version": furl = config['api_url'] + "v1/" + url logger.info(f"[GET] {furl} con headers {headers} y data {data}") print("[GET] %s con headers %s y data %s" % (furl, headers, data)) response = requests.get(furl, data=data, headers=headers, timeout=30) logger.info(f"[GET] Respuesta recibida: status_code={response.status_code}") return response except requests.exceptions.Timeout as e: error_msg = f"Timeout al hacer GET a {url}: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise except requests.exceptions.RequestException as e: error_msg = f"Error de conexi贸n al hacer GET a {url}: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise except Exception as e: error_msg = f"Error inesperado al hacer GET a {url}: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise def post(self, url, data={}, headers={}): try: furl = config['api_url'] + url if url != "version": furl = config['api_url'] + "v1/" + url logger.info(f"[POST] {furl} con headers {headers} y data {data}") print("[POST] %s con headers %s y data %s" % (furl, headers, data)) response = requests.post(furl, data=data, headers=headers, timeout=30) logger.info(f"[POST] Respuesta recibida: status_code={response.status_code}, text={response.text[:200]}") return response except requests.exceptions.Timeout as e: error_msg = f"Timeout al hacer POST a {url}: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise except requests.exceptions.RequestException as e: error_msg = f"Error de conexi贸n al hacer POST a {url}: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise except Exception as e: error_msg = f"Error inesperado al hacer POST a {url}: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise def login(self): try: logger.info(f"Iniciando login para usuario: {config.get('user', 'N/A')}") self.updateTime() response = self.post("cp/login/" + config['user'], { "p": args.pin, }, self.headers) logger.info(f"Login response status: {response.status_code}") if response.status_code != 200: error_msg = f"Error al iniciar sesi贸n: status_code={response.status_code}, response={response.text}" logger.error(error_msg) print("Error al iniciar sesi贸n: %s" % response.text) self.addMessage(f"馃煡馃晲 Error al iniciar sesi贸n: {response.text}") self.sendReport() exit(20) token = response.text.replace("\"", "") self.headers["Token"] = token logger.info("Login exitoso, token obtenido") except Exception as e: error_msg = f"Error durante el login: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise def addMessage(self, message): self.message_acumulator += message + "\n" def sendReport(self): try: if self.message_acumulator.strip(): logger.info(f"Enviando reporte por Telegram: {self.message_acumulator[:100]}...") asyncio.run(self.telegram.sendMessage(self.message_acumulator)) logger.info("Reporte enviado correctamente") else: logger.debug("No hay mensajes para enviar") self.message_acumulator = "" except Exception as e: error_msg = f"Error al enviar reporte por Telegram: {str(e)}" logger.error(error_msg, exc_info=True) print(f"Error al enviar mensaje por Telegram: {error_msg}") # No acumulamos este error para evitar loops infinitos self.message_acumulator = "" def updateTime(self): now = datetime.now().astimezone(pytz.UTC) self.headers[strings.tnd_string] = "\"" + now.strftime("%Y-%m-%dT%H:%M:%SZ") + "\"" self.headers["dStr"] = "\"" + now.strftime("%d/%m/%Y %H:%M:%S") + "\"" def updateVersion(self): self.headers[strings.tnv_string] = self.getVersion() def getVersion(self): self.updateTime() return self.get('version', {}, self.headers) def getInfo(self): return self.get('check/info?guid=' + config['user'], {}, self.headers) def checkLastCheck(self, typ): try: logger.info(f"Verificando 煤ltimo check de tipo {typ}") date_str = datetime.now().strftime("%d/%m/%Y") url = f'cp/checks?start={date_str}&end={date_str}' logger.info(f"Consultando checks para fecha: {date_str}") response = self.get(url, {}, self.headers) if response.status_code != 200: error_msg = f"Error al obtener los checks: status_code={response.status_code}, response={response.text}" logger.error(error_msg) self.addMessage(f"馃煡馃晲 Error al obtener los checks: {response.text}") self.sendReport() return False try: today_checks = json.loads(response.text) logger.info(f"Checks obtenidos: {today_checks}") except json.JSONDecodeError as e: error_msg = f"Error al parsear JSON de checks: {str(e)}, response={response.text}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 Error al procesar los checks: {str(e)}") self.sendReport() return False if today_checks.get('C') and today_checks['C'][0] and today_checks['C'][0].get('C') and today_checks['C'][0]['C'] and today_checks['C'][0]['C'][-1] and today_checks['C'][0]['C'][-1].get('T') == typ: hora = today_checks['C'][0]['C'][-1].get('H', 'N/A') logger.info(f"Ya se ha realizado este marcaje antes a las {hora}") self.addMessage(f"馃煡馃晲 Ya se ha realizado este marcaje antes a las {hora}") self.sendReport() return True logger.info("No se encontr贸 un check previo del mismo tipo") return False except Exception as e: error_msg = f"Error inesperado al verificar 煤ltimo check: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() return False def sendUpdate(self): try: logger.info("Iniciando sendUpdate...") # Verificar calendario try: logger.info("Consultando Google Calendar...") calendar = google_calendar.GoogleCalendar() calendar_event = calendar.getEvent() logger.info(f"Resultado de calendario: {calendar_event}") if calendar_event and not args.force: logger.info(f"Evento de calendario encontrado: {calendar_event}, cancelando fichaje") self.addMessage("馃煡馃晲 Comprobaci贸n de calendario: " + calendar_event) self.sendReport() return except Exception as e: error_msg = f"Error al consultar Google Calendar: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() # Continuamos con el fichaje aunque falle el calendario # Determinar tipo de fichaje typ = 0 try: if args.basedtime and not args.type: hour = datetime.now().strftime("%H") logger.info(f"Hora actual: {hour}") hIN = config.get('in_hours', []) hOUT = config.get('out_hours', []) logger.info(f"Horas de entrada: {hIN}, Horas de salida: {hOUT}") if hour in hIN: typ = 0 logger.info("Tipo determinado: Entrada (0) por horario") elif hour in hOUT: typ = 1 logger.info("Tipo determinado: Salida (1) por horario") else: logger.warning(f"No se puede determinar el tipo de fichaje para la hora {hour}") self.addMessage("馃煡馃晲 No se ha fichado ni desfichado ya que estamos en horario laboral, fuerze el estado con el parametro --type <0 = Entrada, 1 = Salida>") self.sendReport() return else: typ = args.type logger.info(f"Tipo determinado por argumento: {typ}") except Exception as e: error_msg = f"Error al determinar tipo de fichaje: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() return # Verificar 煤ltimo check logger.info("Verificando 煤ltimo check...") if self.checkLastCheck(typ): logger.info("Check ya realizado previamente, cancelando") return # Realizar check response = None if not args.debug: try: logger.info("Iniciando proceso de fichaje...") self.addMessage("####HACIENDO CHECK####") rand = utils.numero_aleatorio_con_probabilidad(230, 0.5) logger.info(f"Esperando {rand} segundos antes de fichar") self.addMessage("Esperando %s segundos para %s en un minuto aleatorio" % (rand,strings.conditional_response[int(typ)]["text2"])) self.sendReport() sleep(rand) self.updateTime() data = { "typ": typ, "date": self.headers[strings.tnd_string], "geoLatitude": config.get('geo', {}).get('latitude', ''), "geoLongitude": config.get('geo', {}).get('longitude', ''), "geoError": "", "dStr": self.headers["dStr"].replace("\"", "") } logger.info(f"Enviando check con datos: typ={typ}, date={data['date']}") response = self.post("cp/checks", data, self.headers) logger.info(f"Respuesta del check recibida: status={response.status_code}, text={response.text[:200]}") except Exception as e: error_msg = f"Error durante el proceso de fichaje: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise else: logger.info("Modo debug activado, no se realizar谩 fichaje") self.addMessage('Corriendo en modo debug. No se realizar谩 ninguna acci贸n') self.sendReport() exit(20) # Validar respuesta if not response: error_msg = "No se recibi贸 respuesta del servidor" logger.error(error_msg) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() exit(20) if response.text == "no valid worker": error_msg = "El trabajador no ha podido ser identificado" logger.error(error_msg) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() exit(20) try: rj = json.loads(response.text) logger.info(f"JSON parseado correctamente: {rj}") except json.JSONDecodeError as e: error_msg = f"La respuesta al hacer check no es correcta... algo ha pasado :/ - response={response.text[:200]} - status_code={response.status_code} - error={str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() exit(20) except Exception as e: error_msg = f"Error al procesar respuesta del check: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() exit(20) # Procesar resultado if response.status_code == 200: if rj.get('Repeated'): try: time = datetime.strptime(rj['RepeatedTime'], "%Y-%m-%dT%H:%M:%SZ").replace( tzinfo=pytz.utc).astimezone(pytz.timezone("Europe/Madrid")).strftime("%H:%M") logger.info(f"Check repetido a las {time}") self.addMessage("馃晲 Ya se ha realizado este marcaje antes a las %s" % time) except Exception as e: error_msg = f"Error al procesar tiempo de check repetido: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") else: try: time = datetime.now().astimezone(pytz.timezone("Europe/Madrid")).strftime("%H:%M") logger.info(f"Check exitoso a las {time}") self.addMessage('%s Has %s correctamente a las %s' % (strings.conditional_response[int(typ)]['emoji'], strings.conditional_response[int(typ)]['text'], time)) except Exception as e: error_msg = f"Error al formatear tiempo de check exitoso: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") else: error_msg = f"No se ha podido fichar, la web no ha devuelto 200 OK - status_code={response.status_code}, response={response.text[:200]}" logger.error(error_msg) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() logger.info("sendUpdate completado") except Exception as e: error_msg = f"Error cr铆tico en sendUpdate: {str(e)}" logger.error(error_msg, exc_info=True) self.addMessage(f"馃煡馃晲 {error_msg}") self.sendReport() raise