Files
wallabicher/managers/worker.py

61 lines
2.8 KiB
Python

import time
import logging
import traceback
from platforms.platform_factory import PlatformFactory
from managers.worker_conditions import WorkerConditions
ERROR_SLEEP_TIME = 60
class Worker:
def __init__(self, item_to_monitor, general_args, queue_manager):
self.logger = logging.getLogger(__name__)
self._item_monitoring = item_to_monitor
self._general_args = general_args
self._queue_manager = queue_manager
self._worker_conditions = WorkerConditions(item_to_monitor, general_args)
# Initialize the platform based on item_to_monitor configuration
platform_name = self._item_monitoring.get_platform()
try:
self._platform = PlatformFactory.create_platform(platform_name, item_to_monitor)
self.logger.info(f"Initialized platform: {platform_name}")
except ValueError as e:
self.logger.error(f"Failed to initialize platform: {e}")
raise
# Initialize the queue with existing articles
self._queue_manager.add_to_notified_articles(self._request_articles())
def _request_articles(self):
return self._platform.fetch_articles()
def work(self):
exec_times = []
while True:
start_time = time.time()
articles = self._request_articles()
for article in articles:
if self._worker_conditions.meets_item_conditions(article):
try:
self._queue_manager.add_to_queue(article, self._item_monitoring.get_name(), self._item_monitoring.get_thread_id())
except Exception as e:
self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}")
time.sleep(self._item_monitoring.get_check_every())
exec_times.append(time.time() - start_time - self._item_monitoring.get_check_every())
self.logger.info(
f"Worker '{self._item_monitoring.get_name()}', "
f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, "
f"Average: {sum(exec_times) / len(exec_times):.2f}s."
)
def run(self):
while True:
try:
platform_name = self._platform.get_platform_name()
self.logger.info(f"{platform_name.capitalize()} monitor worker started - {self._item_monitoring.get_name()}")
self.work()
except Exception as e:
self.logger.error(f"{''.join(traceback.format_exception(None, e, e.__traceback__))}")
self.logger.error(f"{self._item_monitoring.get_name()} worker crashed. Restarting worker...")
time.sleep(ERROR_SLEEP_TIME)