diff --git a/docker-compose.yml b/docker-compose.yml index 86c4396..5ed4d80 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -81,7 +81,7 @@ services: volumes: # Montar archivos de configuración - ./config.yaml:/app/config.yaml:ro - - ./workers.json:/app/workers.json:ro + - ./workers.json:/app/workers.json:rw # Montar directorio de logs en lugar del archivo para evitar problemas - ./logs:/app/logs:rw depends_on: diff --git a/managers/worker.py b/managers/worker.py index 9c81f61..123c8d3 100644 --- a/managers/worker.py +++ b/managers/worker.py @@ -1,6 +1,7 @@ import time import logging import traceback +import threading from platforms.platform_factory import PlatformFactory from managers.worker_conditions import WorkerConditions @@ -13,6 +14,8 @@ class Worker: self._general_args = general_args self._queue_manager = queue_manager self._worker_conditions = WorkerConditions(item_to_monitor, general_args) + self._running = True + self._stop_event = threading.Event() # Initialize the platform based on item_to_monitor configuration platform_name = self._item_monitoring.get_platform() try: @@ -24,6 +27,21 @@ class Worker: # Initialize the queue with existing articles self._queue_manager.add_to_notified_articles(self._request_articles()) + + def update_general_args(self, general_args): + """Actualiza los argumentos generales del worker""" + self._general_args = general_args + self._worker_conditions = WorkerConditions(self._item_monitoring, general_args) + + def stop(self): + """Detiene el worker de forma controlada""" + self.logger.info(f"Deteniendo worker: {self._item_monitoring.get_name()}") + self._running = False + self._stop_event.set() + + def is_running(self): + """Verifica si el worker está corriendo""" + return self._running def _request_articles(self): return self._platform.fetch_articles() @@ -31,30 +49,55 @@ class Worker: def work(self): exec_times = [] - while True: + while self._running and not self._stop_event.is_set(): start_time = time.time() - articles = self._request_articles() - for article in articles: - if self._worker_conditions.meets_item_conditions(article): - try: - self._queue_manager.add_to_queue(article, self._item_monitoring.get_name(), self._item_monitoring.get_thread_id()) - except Exception as e: - self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}") - time.sleep(self._item_monitoring.get_check_every()) - exec_times.append(time.time() - start_time - self._item_monitoring.get_check_every()) - self.logger.info( - f"Worker '{self._item_monitoring.get_name()}', " - f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, " - f"Average: {sum(exec_times) / len(exec_times):.2f}s." - ) + try: + articles = self._request_articles() + for article in articles: + if not self._running or self._stop_event.is_set(): + break + if self._worker_conditions.meets_item_conditions(article): + try: + self._queue_manager.add_to_queue(article, self._item_monitoring.get_name(), self._item_monitoring.get_thread_id()) + except Exception as e: + self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}") + + if not self._running or self._stop_event.is_set(): + break + + # Sleep con posibilidad de cancelación + check_every = self._item_monitoring.get_check_every() + sleep_time = 0 + while sleep_time < check_every and self._running and not self._stop_event.is_set(): + time.sleep(min(1.0, check_every - sleep_time)) + sleep_time += 1.0 + + if exec_times: + exec_times.append(time.time() - start_time - check_every) + self.logger.info( + f"Worker '{self._item_monitoring.get_name()}', " + f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, " + f"Average: {sum(exec_times) / len(exec_times):.2f}s." + ) + except Exception as e: + if self._running and not self._stop_event.is_set(): + self.logger.error(f"Error en worker {self._item_monitoring.get_name()}: {e}") + time.sleep(1) def run(self): - while True: + while self._running and not self._stop_event.is_set(): try: platform_name = self._platform.get_platform_name() self.logger.info(f"{platform_name.capitalize()} monitor worker started - {self._item_monitoring.get_name()}") self.work() + + # Si el worker se detuvo normalmente, salir + if not self._running or self._stop_event.is_set(): + self.logger.info(f"Worker '{self._item_monitoring.get_name()}' detenido") + break except Exception as e: + if not self._running or self._stop_event.is_set(): + break self.logger.error(f"{''.join(traceback.format_exception(None, e, e.__traceback__))}") self.logger.error(f"{self._item_monitoring.get_name()} worker crashed. Restarting worker...") time.sleep(ERROR_SLEEP_TIME) diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..d0e7860 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "wallamonitor", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/wallamonitor.py b/wallamonitor.py index ba777fb..fbc1025 100644 --- a/wallamonitor.py +++ b/wallamonitor.py @@ -1,10 +1,12 @@ import json import logging from logging.handlers import RotatingFileHandler -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, Future import os import shutil import yaml +import time +import threading from datalayer.item_monitor import ItemMonitor from datalayer.general_monitor import GeneralMonitor @@ -119,23 +121,247 @@ def load_cache_config(): 'limit': 300 } +class WorkerManager: + """Gestiona workers dinámicamente, iniciando y deteniendo según workers.json""" + + def __init__(self, general_args, queue_manager): + self.logger = logging.getLogger(__name__) + self._general_args = general_args + self._queue_manager = queue_manager + self._executor = ThreadPoolExecutor(max_workers=1000) + self._workers = {} # Dict: worker_name -> {'worker': Worker, 'future': Future, 'item': ItemMonitor} + self._running = True + self._lock = threading.Lock() + base_dir = os.path.dirname(os.path.abspath(__file__)) + self._workers_path = os.path.join(base_dir, "workers.json") + + def load_workers_config(self): + """Carga la configuración de workers desde workers.json""" + try: + with open(self._workers_path, 'r') as f: + config = json.load(f) + disabled = set(config.get('disabled', [])) + items = [] + for item_data in config.get('items', []): + item = ItemMonitor.load_from_json(item_data) + items.append((item, item.get_name() not in disabled)) + general = GeneralMonitor.load_from_json(config.get('general', {})) + return items, general, disabled + except Exception as e: + self.logger.error(f"Error cargando workers.json: {e}") + return [], GeneralMonitor([], [], [], [], []), set() + + def start_worker(self, item, general_args): + """Inicia un worker""" + try: + worker = Worker(item, general_args, self._queue_manager) + future = self._executor.submit(worker.run) + self._workers[item.get_name()] = { + 'worker': worker, + 'future': future, + 'item': item + } + self.logger.info(f"Worker '{item.get_name()}' iniciado") + return True + except Exception as e: + self.logger.error(f"Error iniciando worker '{item.get_name()}': {e}") + return False + + def stop_worker(self, worker_name): + """Detiene un worker""" + if worker_name not in self._workers: + return False + + try: + worker_data = self._workers[worker_name] + worker = worker_data['worker'] + future = worker_data['future'] + + # Detener el worker + worker.stop() + + # Intentar cancelar el future si aún no está ejecutándose + if not future.done(): + future.cancel() + + # No esperamos a que termine, se detendrá automáticamente + del self._workers[worker_name] + self.logger.info(f"Worker '{worker_name}' detenido") + return True + except Exception as e: + self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") + # Asegurarse de eliminar la entrada aunque haya error + if worker_name in self._workers: + del self._workers[worker_name] + return False + + def sync_workers(self): + """Sincroniza los workers con workers.json""" + items, general_args, disabled = self.load_workers_config() + + # Actualizar general_args en todos los workers activos + old_general_args = self._general_args + self._general_args = general_args + + current_worker_names = set(self._workers.keys()) + enabled_worker_names = {item.get_name() for item, enabled in items if enabled} + + # Actualizar general_args en workers existentes + with self._lock: + for worker_data in self._workers.values(): + worker_data['worker'].update_general_args(general_args) + + # Detener workers que ya no existen o están deshabilitados + to_stop = current_worker_names - enabled_worker_names + for worker_name in list(to_stop): + with self._lock: + if worker_name in self._workers: + worker = self._workers[worker_name]['worker'] + worker.stop() + future = self._workers[worker_name]['future'] + if not future.done(): + future.cancel() + del self._workers[worker_name] + self.logger.info(f"Worker '{worker_name}' detenido") + + # Iniciar workers nuevos o que se hayan habilitado + for item, enabled in items: + worker_name = item.get_name() + if enabled and worker_name not in current_worker_names: + # Nuevo worker o recién activado + with self._lock: + if worker_name not in self._workers: + self.start_worker(item, general_args) + elif enabled and worker_name in current_worker_names: + # Worker existente, verificar si hay cambios significativos + needs_restart = False + with self._lock: + if worker_name in self._workers: + old_item = self._workers[worker_name]['item'] + # Comparar si hay cambios significativos + if self._has_changes(old_item, item): + self.logger.info(f"Reiniciando worker '{worker_name}' por cambios en configuración") + worker = self._workers[worker_name]['worker'] + worker.stop() + future = self._workers[worker_name]['future'] + if not future.done(): + future.cancel() + del self._workers[worker_name] + needs_restart = True + else: + # Actualizar la referencia al item sin reiniciar + self._workers[worker_name]['item'] = item + # Actualizar general_args en el worker + self._workers[worker_name]['worker'].update_general_args(general_args) + # Reiniciar fuera del lock para evitar deadlocks + if needs_restart: + time.sleep(0.5) # Dar tiempo para detener + with self._lock: + if worker_name not in self._workers: # Verificar que no se haya añadido en otro thread + self.start_worker(item, general_args) + + def _has_changes(self, old_item, new_item): + """Verifica si hay cambios significativos entre dos items""" + # Comparar campos importantes + return ( + old_item.get_search_query() != new_item.get_search_query() or + old_item.get_min_price() != new_item.get_min_price() or + old_item.get_max_price() != new_item.get_max_price() or + old_item.get_thread_id() != new_item.get_thread_id() or + old_item.get_platform() != new_item.get_platform() or + old_item.get_check_every() != new_item.get_check_every() or + old_item.get_latitude() != new_item.get_latitude() or + old_item.get_longitude() != new_item.get_longitude() or + old_item.get_max_distance() != new_item.get_max_distance() + ) + + def monitor_workers_file(self): + """Monitorea el archivo workers.json y sincroniza workers usando polling""" + self.logger.info("Iniciando monitor de workers.json...") + last_mtime = 0 + + try: + if os.path.exists(self._workers_path): + last_mtime = os.path.getmtime(self._workers_path) + except Exception as e: + self.logger.warning(f"Error obteniendo mtime inicial: {e}") + + while self._running: + try: + time.sleep(2) # Verificar cada 2 segundos + + if not os.path.exists(self._workers_path): + continue + + current_mtime = os.path.getmtime(self._workers_path) + if current_mtime != last_mtime: + self.logger.info("Detectado cambio en workers.json, sincronizando workers...") + time.sleep(0.5) # Esperar un poco para que se termine de escribir el archivo + self.sync_workers() + last_mtime = current_mtime + except Exception as e: + self.logger.error(f"Error monitoreando workers.json: {e}") + time.sleep(5) # Esperar más tiempo si hay error + + def start_monitoring(self): + """Inicia el monitoreo en un thread separado""" + monitor_thread = threading.Thread(target=self.monitor_workers_file, daemon=True) + monitor_thread.start() + return monitor_thread + + def stop_all(self): + """Detiene todos los workers""" + self._running = False + worker_names = list(self._workers.keys()) + for worker_name in worker_names: + try: + with self._lock: + if worker_name in self._workers: + worker = self._workers[worker_name]['worker'] + worker.stop() + future = self._workers[worker_name]['future'] + if not future.done(): + future.cancel() + del self._workers[worker_name] + except Exception as e: + self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") + + self._executor.shutdown(wait=False) + self.logger.info("Todos los workers detenidos") + if __name__ == "__main__": initialize_config_files() configure_logger() - items, general_args = parse_items_to_monitor() - + logger = logging.getLogger(__name__) + # Cargar configuración de cache y crear ArticleCache cache_config = load_cache_config() cache_type = cache_config['cache_type'] - # Crear kwargs sin cache_type cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'} article_cache = create_article_cache(cache_type, **cache_kwargs) # Crear QueueManager con ArticleCache queue_manager = QueueManager(article_cache) - - with ThreadPoolExecutor(max_workers=1000) as executor: - for item in items: - worker = Worker(item, general_args, queue_manager) - executor.submit(worker.run) + + # Crear WorkerManager + items, general_args = parse_items_to_monitor() + worker_manager = WorkerManager(general_args, queue_manager) + + # Sincronizar workers iniciales + worker_manager.sync_workers() + + # Iniciar monitoreo del archivo workers.json + worker_manager.start_monitoring() + + try: + logger.info("Sistema de monitoreo iniciado. Esperando cambios en workers.json...") + # Mantener el programa corriendo + while True: + time.sleep(60) + # Sincronización periódica por si acaso + worker_manager.sync_workers() + except KeyboardInterrupt: + logger.info("Deteniendo sistema...") + finally: + worker_manager.stop_all() diff --git a/web/frontend/src/views/Workers.vue b/web/frontend/src/views/Workers.vue index 1241398..0d3acae 100644 --- a/web/frontend/src/views/Workers.vue +++ b/web/frontend/src/views/Workers.vue @@ -2,9 +2,14 @@
{{ worker.platform || 'wallapop' }}
-{{ worker.search_query }}
- {{ worker.min_price || 'N/A' }} - {{ worker.max_price || 'N/A' }} + {{ worker.min_price || '0' }}€ - {{ worker.max_price || '∞' }}€
{{ worker.thread_id || 'General' }}
+{{ worker.thread_id }}
{{ worker.latitude }}, {{ worker.longitude }}
+{{ worker.max_distance }} km
+{{ worker.check_every }}s
+{{ worker.search_query }}
+No hay workers configurados
+