import time import logging import traceback from platforms.platform_factory import PlatformFactory from managers.worker_conditions import WorkerConditions ERROR_SLEEP_TIME = 60 class Worker: def __init__(self, item_to_monitor, general_args, queue_manager): self.logger = logging.getLogger(__name__) self._item_monitoring = item_to_monitor self._general_args = general_args self._queue_manager = queue_manager self._worker_conditions = WorkerConditions(item_to_monitor, general_args) # Initialize the platform based on item_to_monitor configuration platform_name = self._item_monitoring.get_platform() try: self._platform = PlatformFactory.create_platform(platform_name, item_to_monitor) self.logger.info(f"Initialized platform: {platform_name}") except ValueError as e: self.logger.error(f"Failed to initialize platform: {e}") raise # Initialize the queue with existing articles self._queue_manager.add_to_notified_articles(self._request_articles()) def _request_articles(self): return self._platform.fetch_articles() def work(self): exec_times = [] while True: start_time = time.time() articles = self._request_articles() for article in articles: if self._worker_conditions.meets_item_conditions(article): try: self._queue_manager.add_to_queue(article, self._item_monitoring.get_name(), self._item_monitoring.get_thread_id()) except Exception as e: self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}") time.sleep(self._item_monitoring.get_check_every()) exec_times.append(time.time() - start_time - self._item_monitoring.get_check_every()) self.logger.info( f"Worker '{self._item_monitoring.get_name()}', " f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, " f"Average: {sum(exec_times) / len(exec_times):.2f}s." ) def run(self): while True: try: platform_name = self._platform.get_platform_name() self.logger.info(f"{platform_name.capitalize()} monitor worker started - {self._item_monitoring.get_name()}") self.work() except Exception as e: self.logger.error(f"{''.join(traceback.format_exception(None, e, e.__traceback__))}") self.logger.error(f"{self._item_monitoring.get_name()} worker crashed. Restarting worker...") time.sleep(ERROR_SLEEP_TIME)