import json import logging from logging.handlers import RotatingFileHandler from concurrent.futures import ThreadPoolExecutor, Future import os import shutil import yaml import time import threading from datalayer.item_monitor import ItemMonitor from datalayer.general_monitor import GeneralMonitor from managers.worker import Worker from managers.queue_manager import QueueManager from managers.article_cache import create_article_cache def initialize_config_files(): """ Inicializa los archivos de configuración si no existen, copiando los archivos .sample correspondientes. """ base_dir = os.path.dirname(os.path.abspath(__file__)) config_files = [ ('config.yaml', 'config.sample.yaml'), ('workers.json', 'workers.sample.json') ] for config_file, sample_file in config_files: config_path = os.path.join(base_dir, config_file) sample_path = os.path.join(base_dir, sample_file) if not os.path.exists(config_path): if os.path.exists(sample_path): shutil.copy2(sample_path, config_path) print(f"✓ Archivo '{config_file}' creado desde '{sample_file}'") print(f" Por favor, edita '{config_file}' con tu configuración antes de continuar.") else: raise FileNotFoundError( f"No se encontró ni '{config_file}' ni '{sample_file}'. " f"Por favor, crea uno de estos archivos." ) def configure_logger(): import os logging.getLogger("httpx").setLevel(logging.WARNING) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(levelname)s [%(asctime)s] %(name)s - %(message)s')) # Determinar la ruta del archivo de log # En Docker, usar /app/logs si existe, sino usar el directorio actual if os.path.isdir('/app/logs'): log_path = '/app/logs/monitor.log' else: log_path = 'monitor.log' # Asegurarse de que el directorio existe log_dir = os.path.dirname(log_path) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) file_handler = RotatingFileHandler(log_path, maxBytes=10e6) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) # Configure the root logger with both handlers logging.basicConfig(level=logging.NOTSET, handlers=[console_handler, file_handler]) def parse_items_to_monitor(): import os base_dir = os.path.dirname(os.path.abspath(__file__)) workers_path = os.path.join(base_dir, "workers.json") with open(workers_path) as f: args = json.load(f) if 'items' not in args: raise ValueError("Missing mandatory field: items") items = [ItemMonitor.load_from_json(item) for item in args['items']] general_args = GeneralMonitor.load_from_json(args['general']) return items, general_args def load_cache_config(): """Carga la configuración del cache desde config.yaml""" base_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(base_dir, "config.yaml") logger = logging.getLogger(__name__) try: with open(config_path, 'r') as f: config = yaml.safe_load(f) cache_config = config.get('cache', {}) cache_type = cache_config.get('type', 'redis') if cache_type == 'redis': redis_config = cache_config.get('redis', {}) return { 'cache_type': 'redis', 'redis_host': redis_config.get('host', 'localhost'), 'redis_port': redis_config.get('port', 6379), 'redis_db': redis_config.get('db', 0), 'redis_password': redis_config.get('password') } else: logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'redis' por defecto") return { 'cache_type': 'redis', 'redis_host': 'localhost', 'redis_port': 6379, 'redis_db': 0, 'redis_password': None } except Exception as e: logger.warning(f"Error cargando configuración de cache, usando valores por defecto (redis): {e}") return { 'cache_type': 'redis', 'redis_host': 'localhost', 'redis_port': 6379, 'redis_db': 0, 'redis_password': None } class WorkerManager: """Gestiona workers dinámicamente, iniciando y deteniendo según workers.json""" def __init__(self, general_args, queue_manager): self.logger = logging.getLogger(__name__) self._general_args = general_args self._queue_manager = queue_manager self._executor = ThreadPoolExecutor(max_workers=1000) self._workers = {} # Dict: worker_name -> {'worker': Worker, 'future': Future, 'item': ItemMonitor} self._running = True self._lock = threading.Lock() base_dir = os.path.dirname(os.path.abspath(__file__)) self._workers_path = os.path.join(base_dir, "workers.json") def load_workers_config(self): """Carga la configuración de workers desde workers.json""" try: with open(self._workers_path, 'r') as f: config = json.load(f) disabled = set(config.get('disabled', [])) items = [] for item_data in config.get('items', []): item = ItemMonitor.load_from_json(item_data) items.append((item, item.get_name() not in disabled)) general = GeneralMonitor.load_from_json(config.get('general', {})) return items, general, disabled except Exception as e: self.logger.error(f"Error cargando workers.json: {e}") return [], GeneralMonitor([], [], [], [], []), set() def start_worker(self, item, general_args): """Inicia un worker""" try: worker = Worker(item, general_args, self._queue_manager) future = self._executor.submit(worker.run) self._workers[item.get_name()] = { 'worker': worker, 'future': future, 'item': item } self.logger.info(f"Worker '{item.get_name()}' iniciado") return True except Exception as e: self.logger.error(f"Error iniciando worker '{item.get_name()}': {e}") return False def stop_worker(self, worker_name): """Detiene un worker""" if worker_name not in self._workers: return False try: worker_data = self._workers[worker_name] worker = worker_data['worker'] future = worker_data['future'] # Detener el worker worker.stop() # Intentar cancelar el future si aún no está ejecutándose if not future.done(): future.cancel() # No esperamos a que termine, se detendrá automáticamente del self._workers[worker_name] self.logger.info(f"Worker '{worker_name}' detenido") return True except Exception as e: self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") # Asegurarse de eliminar la entrada aunque haya error if worker_name in self._workers: del self._workers[worker_name] return False def sync_workers(self): """Sincroniza los workers con workers.json""" items, general_args, disabled = self.load_workers_config() # Actualizar general_args en todos los workers activos old_general_args = self._general_args self._general_args = general_args current_worker_names = set(self._workers.keys()) enabled_worker_names = {item.get_name() for item, enabled in items if enabled} # Actualizar general_args en workers existentes with self._lock: for worker_data in self._workers.values(): worker_data['worker'].update_general_args(general_args) # Detener workers que ya no existen o están deshabilitados to_stop = current_worker_names - enabled_worker_names for worker_name in list(to_stop): with self._lock: if worker_name in self._workers: worker = self._workers[worker_name]['worker'] worker.stop() future = self._workers[worker_name]['future'] if not future.done(): future.cancel() del self._workers[worker_name] self.logger.info(f"Worker '{worker_name}' detenido") # Iniciar workers nuevos o que se hayan habilitado for item, enabled in items: worker_name = item.get_name() if enabled and worker_name not in current_worker_names: # Nuevo worker o recién activado with self._lock: if worker_name not in self._workers: self.start_worker(item, general_args) elif enabled and worker_name in current_worker_names: # Worker existente, verificar si hay cambios significativos needs_restart = False with self._lock: if worker_name in self._workers: old_item = self._workers[worker_name]['item'] # Comparar si hay cambios significativos if self._has_changes(old_item, item): self.logger.info(f"Reiniciando worker '{worker_name}' por cambios en configuración") worker = self._workers[worker_name]['worker'] worker.stop() future = self._workers[worker_name]['future'] if not future.done(): future.cancel() del self._workers[worker_name] needs_restart = True else: # Actualizar la referencia al item sin reiniciar self._workers[worker_name]['item'] = item # Actualizar general_args en el worker self._workers[worker_name]['worker'].update_general_args(general_args) # Reiniciar fuera del lock para evitar deadlocks if needs_restart: time.sleep(0.5) # Dar tiempo para detener with self._lock: if worker_name not in self._workers: # Verificar que no se haya añadido en otro thread self.start_worker(item, general_args) def _has_changes(self, old_item, new_item): """Verifica si hay cambios significativos entre dos items""" # Comparar campos importantes return ( old_item.get_search_query() != new_item.get_search_query() or old_item.get_min_price() != new_item.get_min_price() or old_item.get_max_price() != new_item.get_max_price() or old_item.get_thread_id() != new_item.get_thread_id() or old_item.get_platform() != new_item.get_platform() or old_item.get_check_every() != new_item.get_check_every() or old_item.get_latitude() != new_item.get_latitude() or old_item.get_longitude() != new_item.get_longitude() or old_item.get_max_distance() != new_item.get_max_distance() ) def monitor_workers_file(self): """Monitorea el archivo workers.json y sincroniza workers usando polling""" self.logger.info("Iniciando monitor de workers.json...") last_mtime = 0 try: if os.path.exists(self._workers_path): last_mtime = os.path.getmtime(self._workers_path) except Exception as e: self.logger.warning(f"Error obteniendo mtime inicial: {e}") while self._running: try: time.sleep(2) # Verificar cada 2 segundos if not os.path.exists(self._workers_path): continue current_mtime = os.path.getmtime(self._workers_path) if current_mtime != last_mtime: self.logger.info("Detectado cambio en workers.json, sincronizando workers...") time.sleep(0.5) # Esperar un poco para que se termine de escribir el archivo self.sync_workers() last_mtime = current_mtime except Exception as e: self.logger.error(f"Error monitoreando workers.json: {e}") time.sleep(5) # Esperar más tiempo si hay error def start_monitoring(self): """Inicia el monitoreo en un thread separado""" monitor_thread = threading.Thread(target=self.monitor_workers_file, daemon=True) monitor_thread.start() return monitor_thread def stop_all(self): """Detiene todos los workers""" self._running = False worker_names = list(self._workers.keys()) for worker_name in worker_names: try: with self._lock: if worker_name in self._workers: worker = self._workers[worker_name]['worker'] worker.stop() future = self._workers[worker_name]['future'] if not future.done(): future.cancel() del self._workers[worker_name] except Exception as e: self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") self._executor.shutdown(wait=False) self.logger.info("Todos los workers detenidos") if __name__ == "__main__": initialize_config_files() configure_logger() logger = logging.getLogger(__name__) # Cargar configuración de cache y crear ArticleCache cache_config = load_cache_config() cache_type = cache_config['cache_type'] cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'} article_cache = create_article_cache(cache_type, **cache_kwargs) # Crear QueueManager con ArticleCache queue_manager = QueueManager(article_cache) # Crear WorkerManager items, general_args = parse_items_to_monitor() worker_manager = WorkerManager(general_args, queue_manager) # Sincronizar workers iniciales worker_manager.sync_workers() # Iniciar monitoreo del archivo workers.json worker_manager.start_monitoring() try: logger.info("Sistema de monitoreo iniciado. Esperando cambios en workers.json...") # Mantener el programa corriendo while True: time.sleep(60) # Sincronización periódica por si acaso worker_manager.sync_workers() except KeyboardInterrupt: logger.info("Deteniendo sistema...") finally: worker_manager.stop_all()