Refactor worker management and enhance configuration options
- Updated docker-compose.yml to allow read-write access for workers.json. - Introduced WorkerManager class in wallamonitor.py to manage workers dynamically based on workers.json configuration. - Enhanced Worker class to support controlled start/stop operations and updated general arguments. - Improved Workers.vue to include a general configuration modal and refined UI for active and disabled workers. - Added functionality for global filters and improved worker editing capabilities. - Implemented methods for saving general configuration and deleting workers.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import time
|
||||
import logging
|
||||
import traceback
|
||||
import threading
|
||||
from platforms.platform_factory import PlatformFactory
|
||||
from managers.worker_conditions import WorkerConditions
|
||||
|
||||
@@ -13,6 +14,8 @@ class Worker:
|
||||
self._general_args = general_args
|
||||
self._queue_manager = queue_manager
|
||||
self._worker_conditions = WorkerConditions(item_to_monitor, general_args)
|
||||
self._running = True
|
||||
self._stop_event = threading.Event()
|
||||
# Initialize the platform based on item_to_monitor configuration
|
||||
platform_name = self._item_monitoring.get_platform()
|
||||
try:
|
||||
@@ -24,6 +27,21 @@ class Worker:
|
||||
|
||||
# Initialize the queue with existing articles
|
||||
self._queue_manager.add_to_notified_articles(self._request_articles())
|
||||
|
||||
def update_general_args(self, general_args):
|
||||
"""Actualiza los argumentos generales del worker"""
|
||||
self._general_args = general_args
|
||||
self._worker_conditions = WorkerConditions(self._item_monitoring, general_args)
|
||||
|
||||
def stop(self):
|
||||
"""Detiene el worker de forma controlada"""
|
||||
self.logger.info(f"Deteniendo worker: {self._item_monitoring.get_name()}")
|
||||
self._running = False
|
||||
self._stop_event.set()
|
||||
|
||||
def is_running(self):
|
||||
"""Verifica si el worker está corriendo"""
|
||||
return self._running
|
||||
|
||||
def _request_articles(self):
|
||||
return self._platform.fetch_articles()
|
||||
@@ -31,30 +49,55 @@ class Worker:
|
||||
def work(self):
|
||||
exec_times = []
|
||||
|
||||
while True:
|
||||
while self._running and not self._stop_event.is_set():
|
||||
start_time = time.time()
|
||||
articles = self._request_articles()
|
||||
for article in articles:
|
||||
if self._worker_conditions.meets_item_conditions(article):
|
||||
try:
|
||||
self._queue_manager.add_to_queue(article, self._item_monitoring.get_name(), self._item_monitoring.get_thread_id())
|
||||
except Exception as e:
|
||||
self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}")
|
||||
time.sleep(self._item_monitoring.get_check_every())
|
||||
exec_times.append(time.time() - start_time - self._item_monitoring.get_check_every())
|
||||
self.logger.info(
|
||||
f"Worker '{self._item_monitoring.get_name()}', "
|
||||
f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, "
|
||||
f"Average: {sum(exec_times) / len(exec_times):.2f}s."
|
||||
)
|
||||
try:
|
||||
articles = self._request_articles()
|
||||
for article in articles:
|
||||
if not self._running or self._stop_event.is_set():
|
||||
break
|
||||
if self._worker_conditions.meets_item_conditions(article):
|
||||
try:
|
||||
self._queue_manager.add_to_queue(article, self._item_monitoring.get_name(), self._item_monitoring.get_thread_id())
|
||||
except Exception as e:
|
||||
self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}")
|
||||
|
||||
if not self._running or self._stop_event.is_set():
|
||||
break
|
||||
|
||||
# Sleep con posibilidad de cancelación
|
||||
check_every = self._item_monitoring.get_check_every()
|
||||
sleep_time = 0
|
||||
while sleep_time < check_every and self._running and not self._stop_event.is_set():
|
||||
time.sleep(min(1.0, check_every - sleep_time))
|
||||
sleep_time += 1.0
|
||||
|
||||
if exec_times:
|
||||
exec_times.append(time.time() - start_time - check_every)
|
||||
self.logger.info(
|
||||
f"Worker '{self._item_monitoring.get_name()}', "
|
||||
f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, "
|
||||
f"Average: {sum(exec_times) / len(exec_times):.2f}s."
|
||||
)
|
||||
except Exception as e:
|
||||
if self._running and not self._stop_event.is_set():
|
||||
self.logger.error(f"Error en worker {self._item_monitoring.get_name()}: {e}")
|
||||
time.sleep(1)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
while self._running and not self._stop_event.is_set():
|
||||
try:
|
||||
platform_name = self._platform.get_platform_name()
|
||||
self.logger.info(f"{platform_name.capitalize()} monitor worker started - {self._item_monitoring.get_name()}")
|
||||
self.work()
|
||||
|
||||
# Si el worker se detuvo normalmente, salir
|
||||
if not self._running or self._stop_event.is_set():
|
||||
self.logger.info(f"Worker '{self._item_monitoring.get_name()}' detenido")
|
||||
break
|
||||
except Exception as e:
|
||||
if not self._running or self._stop_event.is_set():
|
||||
break
|
||||
self.logger.error(f"{''.join(traceback.format_exception(None, e, e.__traceback__))}")
|
||||
self.logger.error(f"{self._item_monitoring.get_name()} worker crashed. Restarting worker...")
|
||||
time.sleep(ERROR_SLEEP_TIME)
|
||||
|
||||
Reference in New Issue
Block a user