import time import logging import traceback import threading from platforms.platform_factory import PlatformFactory from managers.worker_conditions import WorkerConditions ERROR_SLEEP_TIME = 60 class Worker: def __init__(self, item_to_monitor, general_args, queue_manager, username=None): self.logger = logging.getLogger(__name__) self._item_monitoring = item_to_monitor self._general_args = general_args self._queue_manager = queue_manager self._username = username # Usuario propietario del worker self._worker_conditions = WorkerConditions(item_to_monitor, general_args) self._running = True self._stop_event = threading.Event() # Initialize the platform based on item_to_monitor configuration platform_name = self._item_monitoring.get_platform() try: self._platform = PlatformFactory.create_platform(platform_name, item_to_monitor) self.logger.info(f"Initialized platform: {platform_name}") except ValueError as e: self.logger.error(f"Failed to initialize platform: {e}") raise # Initialize the queue with existing articles worker_name = self._item_monitoring.get_name() self._queue_manager.add_to_notified_articles(self._request_articles(), username=self._username, worker_name=worker_name) def update_general_args(self, general_args): """Actualiza los argumentos generales del worker""" self._general_args = general_args self._worker_conditions = WorkerConditions(self._item_monitoring, general_args) def stop(self): """Detiene el worker de forma controlada""" self.logger.info(f"Deteniendo worker: {self._item_monitoring.get_name()}") self._running = False self._stop_event.set() def is_running(self): """Verifica si el worker está corriendo""" return self._running def _request_articles(self): return self._platform.fetch_articles() def work(self): exec_times = [] while self._running and not self._stop_event.is_set(): start_time = time.time() try: articles = self._request_articles() for article in articles: if not self._running or self._stop_event.is_set(): break if self._worker_conditions.meets_item_conditions(article): try: self._queue_manager.add_to_queue( article, self._item_monitoring.get_name(), self._item_monitoring.get_thread_id(), username=self._username ) except Exception as e: self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}") if not self._running or self._stop_event.is_set(): break # Sleep con posibilidad de cancelación check_every = self._item_monitoring.get_check_every() sleep_time = 0 while sleep_time < check_every and self._running and not self._stop_event.is_set(): time.sleep(min(1.0, check_every - sleep_time)) sleep_time += 1.0 if exec_times: exec_times.append(time.time() - start_time - check_every) self.logger.info( f"Worker '{self._item_monitoring.get_name()}', " f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, " f"Average: {sum(exec_times) / len(exec_times):.2f}s." ) except Exception as e: if self._running and not self._stop_event.is_set(): self.logger.error(f"Error en worker {self._item_monitoring.get_name()}: {e}") time.sleep(1) def run(self): while self._running and not self._stop_event.is_set(): try: platform_name = self._platform.get_platform_name() self.logger.info(f"{platform_name.capitalize()} monitor worker started - {self._item_monitoring.get_name()}") self.work() # Si el worker se detuvo normalmente, salir if not self._running or self._stop_event.is_set(): self.logger.info(f"Worker '{self._item_monitoring.get_name()}' detenido") break except Exception as e: if not self._running or self._stop_event.is_set(): break self.logger.error(f"{''.join(traceback.format_exception(None, e, e.__traceback__))}") self.logger.error(f"{self._item_monitoring.get_name()} worker crashed. Restarting worker...") time.sleep(ERROR_SLEEP_TIME)