From a71efd92cd5c303ca8d08bf95a4168e0ad0fa3eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Omar=20S=C3=A1nchez=20Pizarro?= Date: Mon, 19 Jan 2026 22:10:36 +0100 Subject: [PATCH] rename main --- wallamonitor.py => wallabicher.py | 734 +++++++++++++++--------------- 1 file changed, 367 insertions(+), 367 deletions(-) rename wallamonitor.py => wallabicher.py (97%) diff --git a/wallamonitor.py b/wallabicher.py similarity index 97% rename from wallamonitor.py rename to wallabicher.py index be9c175..3fcf7c0 100644 --- a/wallamonitor.py +++ b/wallabicher.py @@ -1,367 +1,367 @@ -import json -import logging -from logging.handlers import RotatingFileHandler -from concurrent.futures import ThreadPoolExecutor, Future -import os -import shutil -import yaml -import time -import threading - -from datalayer.item_monitor import ItemMonitor -from datalayer.general_monitor import GeneralMonitor -from managers.worker import Worker -from managers.queue_manager import QueueManager -from managers.article_cache import create_article_cache - -def initialize_config_files(): - """ - Inicializa los archivos de configuración si no existen, - copiando los archivos .sample correspondientes. - """ - base_dir = os.path.dirname(os.path.abspath(__file__)) - - config_files = [ - ('config.yaml', 'config.sample.yaml'), - ('workers.json', 'workers.sample.json') - ] - - for config_file, sample_file in config_files: - config_path = os.path.join(base_dir, config_file) - sample_path = os.path.join(base_dir, sample_file) - - if not os.path.exists(config_path): - if os.path.exists(sample_path): - shutil.copy2(sample_path, config_path) - print(f"✓ Archivo '{config_file}' creado desde '{sample_file}'") - print(f" Por favor, edita '{config_file}' con tu configuración antes de continuar.") - else: - raise FileNotFoundError( - f"No se encontró ni '{config_file}' ni '{sample_file}'. " - f"Por favor, crea uno de estos archivos." - ) - -def configure_logger(): - import os - logging.getLogger("httpx").setLevel(logging.WARNING) - - console_handler = logging.StreamHandler() - console_handler.setLevel(logging.INFO) - console_handler.setFormatter(logging.Formatter('%(levelname)s [%(asctime)s] %(name)s - %(message)s')) - - # Determinar la ruta del archivo de log - # En Docker, usar /app/logs si existe, sino usar el directorio actual - if os.path.isdir('/app/logs'): - log_path = '/app/logs/monitor.log' - else: - log_path = 'monitor.log' - - # Asegurarse de que el directorio existe - log_dir = os.path.dirname(log_path) - if log_dir and not os.path.exists(log_dir): - os.makedirs(log_dir, exist_ok=True) - - file_handler = RotatingFileHandler(log_path, maxBytes=10e6) - file_handler.setLevel(logging.DEBUG) - file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) - - # Configure the root logger with both handlers - logging.basicConfig(level=logging.NOTSET, - handlers=[console_handler, file_handler]) - -def parse_items_to_monitor(): - import os - base_dir = os.path.dirname(os.path.abspath(__file__)) - workers_path = os.path.join(base_dir, "workers.json") - with open(workers_path) as f: - args = json.load(f) - if 'items' not in args: - raise ValueError("Missing mandatory field: items") - items = [ItemMonitor.load_from_json(item) for item in args['items']] - general_args = GeneralMonitor.load_from_json(args['general']) - return items, general_args - -def load_cache_config(): - """Carga la configuración del cache desde config.yaml""" - base_dir = os.path.dirname(os.path.abspath(__file__)) - config_path = os.path.join(base_dir, "config.yaml") - logger = logging.getLogger(__name__) - - try: - with open(config_path, 'r') as f: - config = yaml.safe_load(f) - cache_config = config.get('cache', {}) - cache_type = cache_config.get('type', 'redis') - - if cache_type == 'redis': - redis_config = cache_config.get('redis', {}) - return { - 'cache_type': 'redis', - 'redis_host': redis_config.get('host', 'localhost'), - 'redis_port': redis_config.get('port', 6379), - 'redis_db': redis_config.get('db', 0), - 'redis_password': redis_config.get('password') - } - else: - logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'redis' por defecto") - return { - 'cache_type': 'redis', - 'redis_host': 'localhost', - 'redis_port': 6379, - 'redis_db': 0, - 'redis_password': None - } - except Exception as e: - logger.warning(f"Error cargando configuración de cache, usando valores por defecto (redis): {e}") - return { - 'cache_type': 'redis', - 'redis_host': 'localhost', - 'redis_port': 6379, - 'redis_db': 0, - 'redis_password': None - } - -class WorkerManager: - """Gestiona workers dinámicamente, iniciando y deteniendo según workers.json""" - - def __init__(self, general_args, queue_manager): - self.logger = logging.getLogger(__name__) - self._general_args = general_args - self._queue_manager = queue_manager - self._executor = ThreadPoolExecutor(max_workers=1000) - self._workers = {} # Dict: worker_name -> {'worker': Worker, 'future': Future, 'item': ItemMonitor} - self._running = True - self._lock = threading.Lock() - base_dir = os.path.dirname(os.path.abspath(__file__)) - self._workers_path = os.path.join(base_dir, "workers.json") - - def load_workers_config(self): - """Carga la configuración de workers desde workers.json""" - try: - with open(self._workers_path, 'r') as f: - config = json.load(f) - disabled = set(config.get('disabled', [])) - items = [] - for item_data in config.get('items', []): - item = ItemMonitor.load_from_json(item_data) - items.append((item, item.get_name() not in disabled)) - general = GeneralMonitor.load_from_json(config.get('general', {})) - return items, general, disabled - except Exception as e: - self.logger.error(f"Error cargando workers.json: {e}") - return [], GeneralMonitor([], [], [], [], []), set() - - def start_worker(self, item, general_args): - """Inicia un worker""" - try: - worker = Worker(item, general_args, self._queue_manager) - future = self._executor.submit(worker.run) - self._workers[item.get_name()] = { - 'worker': worker, - 'future': future, - 'item': item - } - self.logger.info(f"Worker '{item.get_name()}' iniciado") - return True - except Exception as e: - self.logger.error(f"Error iniciando worker '{item.get_name()}': {e}") - return False - - def stop_worker(self, worker_name): - """Detiene un worker""" - if worker_name not in self._workers: - return False - - try: - worker_data = self._workers[worker_name] - worker = worker_data['worker'] - future = worker_data['future'] - - # Detener el worker - worker.stop() - - # Intentar cancelar el future si aún no está ejecutándose - if not future.done(): - future.cancel() - - # No esperamos a que termine, se detendrá automáticamente - del self._workers[worker_name] - self.logger.info(f"Worker '{worker_name}' detenido") - return True - except Exception as e: - self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") - # Asegurarse de eliminar la entrada aunque haya error - if worker_name in self._workers: - del self._workers[worker_name] - return False - - def sync_workers(self): - """Sincroniza los workers con workers.json""" - items, general_args, disabled = self.load_workers_config() - - # Actualizar general_args en todos los workers activos - old_general_args = self._general_args - self._general_args = general_args - - current_worker_names = set(self._workers.keys()) - enabled_worker_names = {item.get_name() for item, enabled in items if enabled} - - # Actualizar general_args en workers existentes - with self._lock: - for worker_data in self._workers.values(): - worker_data['worker'].update_general_args(general_args) - - # Detener workers que ya no existen o están deshabilitados - to_stop = current_worker_names - enabled_worker_names - for worker_name in list(to_stop): - with self._lock: - if worker_name in self._workers: - worker = self._workers[worker_name]['worker'] - worker.stop() - future = self._workers[worker_name]['future'] - if not future.done(): - future.cancel() - del self._workers[worker_name] - self.logger.info(f"Worker '{worker_name}' detenido") - - # Iniciar workers nuevos o que se hayan habilitado - for item, enabled in items: - worker_name = item.get_name() - if enabled and worker_name not in current_worker_names: - # Nuevo worker o recién activado - with self._lock: - if worker_name not in self._workers: - self.start_worker(item, general_args) - elif enabled and worker_name in current_worker_names: - # Worker existente, verificar si hay cambios significativos - needs_restart = False - with self._lock: - if worker_name in self._workers: - old_item = self._workers[worker_name]['item'] - # Comparar si hay cambios significativos - if self._has_changes(old_item, item): - self.logger.info(f"Reiniciando worker '{worker_name}' por cambios en configuración") - worker = self._workers[worker_name]['worker'] - worker.stop() - future = self._workers[worker_name]['future'] - if not future.done(): - future.cancel() - del self._workers[worker_name] - needs_restart = True - else: - # Actualizar la referencia al item sin reiniciar - self._workers[worker_name]['item'] = item - # Actualizar general_args en el worker - self._workers[worker_name]['worker'].update_general_args(general_args) - # Reiniciar fuera del lock para evitar deadlocks - if needs_restart: - time.sleep(0.5) # Dar tiempo para detener - with self._lock: - if worker_name not in self._workers: # Verificar que no se haya añadido en otro thread - self.start_worker(item, general_args) - - def _has_changes(self, old_item, new_item): - """Verifica si hay cambios significativos entre dos items""" - # Comparar campos importantes - return ( - old_item.get_search_query() != new_item.get_search_query() or - old_item.get_min_price() != new_item.get_min_price() or - old_item.get_max_price() != new_item.get_max_price() or - old_item.get_thread_id() != new_item.get_thread_id() or - old_item.get_platform() != new_item.get_platform() or - old_item.get_check_every() != new_item.get_check_every() or - old_item.get_latitude() != new_item.get_latitude() or - old_item.get_longitude() != new_item.get_longitude() or - old_item.get_max_distance() != new_item.get_max_distance() - ) - - def monitor_workers_file(self): - """Monitorea el archivo workers.json y sincroniza workers usando polling""" - self.logger.info("Iniciando monitor de workers.json...") - last_mtime = 0 - - try: - if os.path.exists(self._workers_path): - last_mtime = os.path.getmtime(self._workers_path) - except Exception as e: - self.logger.warning(f"Error obteniendo mtime inicial: {e}") - - while self._running: - try: - time.sleep(2) # Verificar cada 2 segundos - - if not os.path.exists(self._workers_path): - continue - - current_mtime = os.path.getmtime(self._workers_path) - if current_mtime != last_mtime: - self.logger.info("Detectado cambio en workers.json, sincronizando workers...") - time.sleep(0.5) # Esperar un poco para que se termine de escribir el archivo - self.sync_workers() - last_mtime = current_mtime - except Exception as e: - self.logger.error(f"Error monitoreando workers.json: {e}") - time.sleep(5) # Esperar más tiempo si hay error - - def start_monitoring(self): - """Inicia el monitoreo en un thread separado""" - monitor_thread = threading.Thread(target=self.monitor_workers_file, daemon=True) - monitor_thread.start() - return monitor_thread - - def stop_all(self): - """Detiene todos los workers""" - self._running = False - worker_names = list(self._workers.keys()) - for worker_name in worker_names: - try: - with self._lock: - if worker_name in self._workers: - worker = self._workers[worker_name]['worker'] - worker.stop() - future = self._workers[worker_name]['future'] - if not future.done(): - future.cancel() - del self._workers[worker_name] - except Exception as e: - self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") - - self._executor.shutdown(wait=False) - self.logger.info("Todos los workers detenidos") - -if __name__ == "__main__": - initialize_config_files() - configure_logger() - logger = logging.getLogger(__name__) - - # Cargar configuración de cache y crear ArticleCache - cache_config = load_cache_config() - cache_type = cache_config['cache_type'] - cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'} - article_cache = create_article_cache(cache_type, **cache_kwargs) - - # Crear QueueManager con ArticleCache - queue_manager = QueueManager(article_cache) - - # Crear WorkerManager - items, general_args = parse_items_to_monitor() - worker_manager = WorkerManager(general_args, queue_manager) - - # Sincronizar workers iniciales - worker_manager.sync_workers() - - # Iniciar monitoreo del archivo workers.json - worker_manager.start_monitoring() - - try: - logger.info("Sistema de monitoreo iniciado. Esperando cambios en workers.json...") - # Mantener el programa corriendo - while True: - time.sleep(60) - # Sincronización periódica por si acaso - worker_manager.sync_workers() - except KeyboardInterrupt: - logger.info("Deteniendo sistema...") - finally: - worker_manager.stop_all() - +import json +import logging +from logging.handlers import RotatingFileHandler +from concurrent.futures import ThreadPoolExecutor, Future +import os +import shutil +import yaml +import time +import threading + +from datalayer.item_monitor import ItemMonitor +from datalayer.general_monitor import GeneralMonitor +from managers.worker import Worker +from managers.queue_manager import QueueManager +from managers.article_cache import create_article_cache + +def initialize_config_files(): + """ + Inicializa los archivos de configuración si no existen, + copiando los archivos .sample correspondientes. + """ + base_dir = os.path.dirname(os.path.abspath(__file__)) + + config_files = [ + ('config.yaml', 'config.sample.yaml'), + ('workers.json', 'workers.sample.json') + ] + + for config_file, sample_file in config_files: + config_path = os.path.join(base_dir, config_file) + sample_path = os.path.join(base_dir, sample_file) + + if not os.path.exists(config_path): + if os.path.exists(sample_path): + shutil.copy2(sample_path, config_path) + print(f"✓ Archivo '{config_file}' creado desde '{sample_file}'") + print(f" Por favor, edita '{config_file}' con tu configuración antes de continuar.") + else: + raise FileNotFoundError( + f"No se encontró ni '{config_file}' ni '{sample_file}'. " + f"Por favor, crea uno de estos archivos." + ) + +def configure_logger(): + import os + logging.getLogger("httpx").setLevel(logging.WARNING) + + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_handler.setFormatter(logging.Formatter('%(levelname)s [%(asctime)s] %(name)s - %(message)s')) + + # Determinar la ruta del archivo de log + # En Docker, usar /app/logs si existe, sino usar el directorio actual + if os.path.isdir('/app/logs'): + log_path = '/app/logs/monitor.log' + else: + log_path = 'monitor.log' + + # Asegurarse de que el directorio existe + log_dir = os.path.dirname(log_path) + if log_dir and not os.path.exists(log_dir): + os.makedirs(log_dir, exist_ok=True) + + file_handler = RotatingFileHandler(log_path, maxBytes=10e6) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + + # Configure the root logger with both handlers + logging.basicConfig(level=logging.NOTSET, + handlers=[console_handler, file_handler]) + +def parse_items_to_monitor(): + import os + base_dir = os.path.dirname(os.path.abspath(__file__)) + workers_path = os.path.join(base_dir, "workers.json") + with open(workers_path) as f: + args = json.load(f) + if 'items' not in args: + raise ValueError("Missing mandatory field: items") + items = [ItemMonitor.load_from_json(item) for item in args['items']] + general_args = GeneralMonitor.load_from_json(args['general']) + return items, general_args + +def load_cache_config(): + """Carga la configuración del cache desde config.yaml""" + base_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(base_dir, "config.yaml") + logger = logging.getLogger(__name__) + + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + cache_config = config.get('cache', {}) + cache_type = cache_config.get('type', 'redis') + + if cache_type == 'redis': + redis_config = cache_config.get('redis', {}) + return { + 'cache_type': 'redis', + 'redis_host': redis_config.get('host', 'localhost'), + 'redis_port': redis_config.get('port', 6379), + 'redis_db': redis_config.get('db', 0), + 'redis_password': redis_config.get('password') + } + else: + logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'redis' por defecto") + return { + 'cache_type': 'redis', + 'redis_host': 'localhost', + 'redis_port': 6379, + 'redis_db': 0, + 'redis_password': None + } + except Exception as e: + logger.warning(f"Error cargando configuración de cache, usando valores por defecto (redis): {e}") + return { + 'cache_type': 'redis', + 'redis_host': 'localhost', + 'redis_port': 6379, + 'redis_db': 0, + 'redis_password': None + } + +class WorkerManager: + """Gestiona workers dinámicamente, iniciando y deteniendo según workers.json""" + + def __init__(self, general_args, queue_manager): + self.logger = logging.getLogger(__name__) + self._general_args = general_args + self._queue_manager = queue_manager + self._executor = ThreadPoolExecutor(max_workers=1000) + self._workers = {} # Dict: worker_name -> {'worker': Worker, 'future': Future, 'item': ItemMonitor} + self._running = True + self._lock = threading.Lock() + base_dir = os.path.dirname(os.path.abspath(__file__)) + self._workers_path = os.path.join(base_dir, "workers.json") + + def load_workers_config(self): + """Carga la configuración de workers desde workers.json""" + try: + with open(self._workers_path, 'r') as f: + config = json.load(f) + disabled = set(config.get('disabled', [])) + items = [] + for item_data in config.get('items', []): + item = ItemMonitor.load_from_json(item_data) + items.append((item, item.get_name() not in disabled)) + general = GeneralMonitor.load_from_json(config.get('general', {})) + return items, general, disabled + except Exception as e: + self.logger.error(f"Error cargando workers.json: {e}") + return [], GeneralMonitor([], [], [], [], []), set() + + def start_worker(self, item, general_args): + """Inicia un worker""" + try: + worker = Worker(item, general_args, self._queue_manager) + future = self._executor.submit(worker.run) + self._workers[item.get_name()] = { + 'worker': worker, + 'future': future, + 'item': item + } + self.logger.info(f"Worker '{item.get_name()}' iniciado") + return True + except Exception as e: + self.logger.error(f"Error iniciando worker '{item.get_name()}': {e}") + return False + + def stop_worker(self, worker_name): + """Detiene un worker""" + if worker_name not in self._workers: + return False + + try: + worker_data = self._workers[worker_name] + worker = worker_data['worker'] + future = worker_data['future'] + + # Detener el worker + worker.stop() + + # Intentar cancelar el future si aún no está ejecutándose + if not future.done(): + future.cancel() + + # No esperamos a que termine, se detendrá automáticamente + del self._workers[worker_name] + self.logger.info(f"Worker '{worker_name}' detenido") + return True + except Exception as e: + self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") + # Asegurarse de eliminar la entrada aunque haya error + if worker_name in self._workers: + del self._workers[worker_name] + return False + + def sync_workers(self): + """Sincroniza los workers con workers.json""" + items, general_args, disabled = self.load_workers_config() + + # Actualizar general_args en todos los workers activos + old_general_args = self._general_args + self._general_args = general_args + + current_worker_names = set(self._workers.keys()) + enabled_worker_names = {item.get_name() for item, enabled in items if enabled} + + # Actualizar general_args en workers existentes + with self._lock: + for worker_data in self._workers.values(): + worker_data['worker'].update_general_args(general_args) + + # Detener workers que ya no existen o están deshabilitados + to_stop = current_worker_names - enabled_worker_names + for worker_name in list(to_stop): + with self._lock: + if worker_name in self._workers: + worker = self._workers[worker_name]['worker'] + worker.stop() + future = self._workers[worker_name]['future'] + if not future.done(): + future.cancel() + del self._workers[worker_name] + self.logger.info(f"Worker '{worker_name}' detenido") + + # Iniciar workers nuevos o que se hayan habilitado + for item, enabled in items: + worker_name = item.get_name() + if enabled and worker_name not in current_worker_names: + # Nuevo worker o recién activado + with self._lock: + if worker_name not in self._workers: + self.start_worker(item, general_args) + elif enabled and worker_name in current_worker_names: + # Worker existente, verificar si hay cambios significativos + needs_restart = False + with self._lock: + if worker_name in self._workers: + old_item = self._workers[worker_name]['item'] + # Comparar si hay cambios significativos + if self._has_changes(old_item, item): + self.logger.info(f"Reiniciando worker '{worker_name}' por cambios en configuración") + worker = self._workers[worker_name]['worker'] + worker.stop() + future = self._workers[worker_name]['future'] + if not future.done(): + future.cancel() + del self._workers[worker_name] + needs_restart = True + else: + # Actualizar la referencia al item sin reiniciar + self._workers[worker_name]['item'] = item + # Actualizar general_args en el worker + self._workers[worker_name]['worker'].update_general_args(general_args) + # Reiniciar fuera del lock para evitar deadlocks + if needs_restart: + time.sleep(0.5) # Dar tiempo para detener + with self._lock: + if worker_name not in self._workers: # Verificar que no se haya añadido en otro thread + self.start_worker(item, general_args) + + def _has_changes(self, old_item, new_item): + """Verifica si hay cambios significativos entre dos items""" + # Comparar campos importantes + return ( + old_item.get_search_query() != new_item.get_search_query() or + old_item.get_min_price() != new_item.get_min_price() or + old_item.get_max_price() != new_item.get_max_price() or + old_item.get_thread_id() != new_item.get_thread_id() or + old_item.get_platform() != new_item.get_platform() or + old_item.get_check_every() != new_item.get_check_every() or + old_item.get_latitude() != new_item.get_latitude() or + old_item.get_longitude() != new_item.get_longitude() or + old_item.get_max_distance() != new_item.get_max_distance() + ) + + def monitor_workers_file(self): + """Monitorea el archivo workers.json y sincroniza workers usando polling""" + self.logger.info("Iniciando monitor de workers.json...") + last_mtime = 0 + + try: + if os.path.exists(self._workers_path): + last_mtime = os.path.getmtime(self._workers_path) + except Exception as e: + self.logger.warning(f"Error obteniendo mtime inicial: {e}") + + while self._running: + try: + time.sleep(2) # Verificar cada 2 segundos + + if not os.path.exists(self._workers_path): + continue + + current_mtime = os.path.getmtime(self._workers_path) + if current_mtime != last_mtime: + self.logger.info("Detectado cambio en workers.json, sincronizando workers...") + time.sleep(0.5) # Esperar un poco para que se termine de escribir el archivo + self.sync_workers() + last_mtime = current_mtime + except Exception as e: + self.logger.error(f"Error monitoreando workers.json: {e}") + time.sleep(5) # Esperar más tiempo si hay error + + def start_monitoring(self): + """Inicia el monitoreo en un thread separado""" + monitor_thread = threading.Thread(target=self.monitor_workers_file, daemon=True) + monitor_thread.start() + return monitor_thread + + def stop_all(self): + """Detiene todos los workers""" + self._running = False + worker_names = list(self._workers.keys()) + for worker_name in worker_names: + try: + with self._lock: + if worker_name in self._workers: + worker = self._workers[worker_name]['worker'] + worker.stop() + future = self._workers[worker_name]['future'] + if not future.done(): + future.cancel() + del self._workers[worker_name] + except Exception as e: + self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") + + self._executor.shutdown(wait=False) + self.logger.info("Todos los workers detenidos") + +if __name__ == "__main__": + initialize_config_files() + configure_logger() + logger = logging.getLogger(__name__) + + # Cargar configuración de cache y crear ArticleCache + cache_config = load_cache_config() + cache_type = cache_config['cache_type'] + cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'} + article_cache = create_article_cache(cache_type, **cache_kwargs) + + # Crear QueueManager con ArticleCache + queue_manager = QueueManager(article_cache) + + # Crear WorkerManager + items, general_args = parse_items_to_monitor() + worker_manager = WorkerManager(general_args, queue_manager) + + # Sincronizar workers iniciales + worker_manager.sync_workers() + + # Iniciar monitoreo del archivo workers.json + worker_manager.start_monitoring() + + try: + logger.info("Sistema de monitoreo iniciado. Esperando cambios en workers.json...") + # Mantener el programa corriendo + while True: + time.sleep(60) + # Sincronización periódica por si acaso + worker_manager.sync_workers() + except KeyboardInterrupt: + logger.info("Deteniendo sistema...") + finally: + worker_manager.stop_all() +