import json import logging from logging.handlers import RotatingFileHandler from concurrent.futures import ThreadPoolExecutor, Future import os import shutil import yaml import time import threading from pymongo import MongoClient from pymongo.errors import ConnectionFailure from datalayer.item_monitor import ItemMonitor from datalayer.general_monitor import GeneralMonitor from managers.worker import Worker from managers.queue_manager import QueueManager from managers.article_cache import create_article_cache def initialize_config_files(): """ Inicializa los archivos de configuración si no existen, copiando los archivos .sample correspondientes. Nota: workers.json ya no se usa, los workers se almacenan en MongoDB. """ base_dir = os.path.dirname(os.path.abspath(__file__)) config_files = [ ('config.yaml', 'config.sample.yaml'), # workers.json ya no se usa, se almacena en MongoDB ] for config_file, sample_file in config_files: config_path = os.path.join(base_dir, config_file) sample_path = os.path.join(base_dir, sample_file) if not os.path.exists(config_path): if os.path.exists(sample_path): shutil.copy2(sample_path, config_path) print(f"✓ Archivo '{config_file}' creado desde '{sample_file}'") print(f" Por favor, edita '{config_file}' con tu configuración antes de continuar.") else: raise FileNotFoundError( f"No se encontró ni '{config_file}' ni '{sample_file}'. " f"Por favor, crea uno de estos archivos." ) def configure_logger(): import os logging.getLogger("httpx").setLevel(logging.WARNING) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(levelname)s [%(asctime)s] %(name)s - %(message)s')) # Determinar la ruta del archivo de log # En Docker, usar /app/logs si existe, sino usar el directorio actual if os.path.isdir('/app/logs'): log_path = '/app/logs/monitor.log' else: log_path = 'monitor.log' # Asegurarse de que el directorio existe log_dir = os.path.dirname(log_path) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) file_handler = RotatingFileHandler(log_path, maxBytes=10e6) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) # Configure the root logger with both handlers logging.basicConfig(level=logging.NOTSET, handlers=[console_handler, file_handler]) def get_mongodb_client_for_workers(): """Obtiene un cliente MongoDB para leer workers""" base_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(base_dir, "config.yaml") logger = logging.getLogger(__name__) try: with open(config_path, 'r') as f: config = yaml.safe_load(f) cache_config = config.get('cache', {}) if cache_config.get('type') == 'mongodb': mongodb_config = cache_config.get('mongodb', {}) mongodb_host = os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost') mongodb_port = int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)) mongodb_database = os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher') mongodb_username = os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username') mongodb_password = os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password') mongodb_auth_source = mongodb_config.get('auth_source', 'admin') # Construir URL de conexión if mongodb_username and mongodb_password: connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}" else: connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/" mongo_client = MongoClient( connection_string, serverSelectionTimeoutMS=5000, connectTimeoutMS=5000, socketTimeoutMS=5000 ) # Verificar conexión mongo_client.admin.command('ping') db = mongo_client[mongodb_database] return db else: logger.error("MongoDB no está configurado. Workers requieren MongoDB.") return None except Exception as e: logger.error(f"Error conectando a MongoDB para workers: {e}") return None def get_workers_from_mongodb(username=None): """Obtiene workers de MongoDB para un usuario específico o todos los usuarios""" logger = logging.getLogger(__name__) db = get_mongodb_client_for_workers() if db is None: logger.error("No se pudo conectar a MongoDB para obtener workers") return { 'general': {'title_exclude': [], 'description_exclude': []}, 'items': [], 'disabled': [], 'workers_by_user': {} # Mapeo de username -> lista de items } try: workers_collection = db['workers'] # Si se especifica un usuario, obtener solo sus workers if username: workers_data = workers_collection.find_one({ 'username': username }) if not workers_data: logger.warning(f"No se encontraron workers para el usuario '{username}' en MongoDB") return { 'general': {'title_exclude': [], 'description_exclude': []}, 'items': [], 'disabled': [], 'workers_by_user': {username: []} } # Remover campos de MongoDB workers_data.pop('_id', None) workers_data.pop('updatedAt', None) workers_data.pop('createdAt', None) username_value = workers_data.pop('username') data = workers_data data['workers_by_user'] = {username_value: data.get('items', [])} return data else: # Obtener workers de todos los usuarios all_workers = list(workers_collection.find({})) if not all_workers: logger.warning("No se encontraron workers en MongoDB") return { 'general': {'title_exclude': [], 'description_exclude': []}, 'items': [], 'disabled': [], 'workers_by_user': {} } # Combinar workers de todos los usuarios all_items = [] all_disabled = [] workers_by_user = {} # Mapeo de username -> lista de items # Combinar las exclusiones generales (unión de todas) general_title_exclude = set() general_description_exclude = set() for user_workers_doc in all_workers: try: username_from_doc = user_workers_doc.get('username') if not username_from_doc: continue # Remover campos de MongoDB user_workers = {k: v for k, v in user_workers_doc.items() if k not in ['_id', 'username', 'updatedAt', 'createdAt']} user_items = user_workers.get('items', []) # Guardar el mapeo de usuario -> items workers_by_user[username_from_doc] = user_items # Agregar items del usuario for item in user_items: all_items.append(item) # Agregar disabled del usuario for disabled_name in user_workers.get('disabled', []): all_disabled.append(disabled_name) # Combinar exclusiones generales user_general = user_workers.get('general', {}) general_title_exclude.update(user_general.get('title_exclude', [])) general_description_exclude.update(user_general.get('description_exclude', [])) logger.debug(f"Cargados {len(user_items)} workers del usuario '{username_from_doc}'") except Exception as e: logger.error(f"Error procesando workers del usuario '{user_workers_doc.get('username', 'unknown')}': {e}") continue logger.info(f"Total de workers cargados: {len(all_items)} items de {len(all_workers)} usuarios") return { 'general': { 'title_exclude': list(general_title_exclude), 'description_exclude': list(general_description_exclude) }, 'items': all_items, 'disabled': all_disabled, 'workers_by_user': workers_by_user # Mapeo de username -> lista de items } except Exception as e: logger.error(f"Error obteniendo workers de MongoDB: {e}") return { 'general': {'title_exclude': [], 'description_exclude': []}, 'items': [], 'disabled': [], 'workers_by_user': {} } def parse_items_to_monitor(username=None): """Parsea items a monitorear desde MongoDB (todos los usuarios si username es None)""" args = get_workers_from_mongodb(username) if 'items' not in args: raise ValueError("Missing mandatory field: items") items = [ItemMonitor.load_from_json(item) for item in args.get('items', [])] general_args = GeneralMonitor.load_from_json(args.get('general', {})) return items, general_args def load_cache_config(): """Carga la configuración del cache desde config.yaml""" base_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(base_dir, "config.yaml") logger = logging.getLogger(__name__) try: with open(config_path, 'r') as f: config = yaml.safe_load(f) cache_config = config.get('cache', {}) cache_type = cache_config.get('type', 'mongodb') if cache_type == 'mongodb': mongodb_config = cache_config.get('mongodb', {}) return { 'cache_type': 'mongodb', 'mongodb_host': os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost'), 'mongodb_port': int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)), 'mongodb_database': os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher'), 'mongodb_username': os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username'), 'mongodb_password': os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password'), 'mongodb_auth_source': mongodb_config.get('auth_source', 'admin') } else: logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'mongodb' por defecto") return { 'cache_type': 'mongodb', 'mongodb_host': 'localhost', 'mongodb_port': 27017, 'mongodb_database': 'wallabicher', 'mongodb_username': None, 'mongodb_password': None, 'mongodb_auth_source': 'admin' } except Exception as e: logger.warning(f"Error cargando configuración de cache, usando valores por defecto (mongodb): {e}") return { 'cache_type': 'mongodb', 'mongodb_host': 'localhost', 'mongodb_port': 27017, 'mongodb_database': 'wallabicher', 'mongodb_username': None, 'mongodb_password': None, 'mongodb_auth_source': 'admin' } class WorkerManager: """Gestiona workers dinámicamente, iniciando y deteniendo según configuración en MongoDB""" def __init__(self, general_args, queue_manager): self.logger = logging.getLogger(__name__) self._general_args = general_args self._queue_manager = queue_manager self._executor = ThreadPoolExecutor(max_workers=1000) self._workers = {} # Dict: worker_id -> {'worker': Worker, 'future': Future, 'item': ItemMonitor, 'username': str} self._worker_to_username = {} # Mapeo de worker_id -> username self._worker_id_to_name = {} # Mapeo de worker_id -> worker_name (para compatibilidad) self._running = True self._lock = threading.Lock() self._db = get_mongodb_client_for_workers() def load_workers_config(self): """Carga la configuración de workers desde MongoDB (todos los usuarios)""" try: config = get_workers_from_mongodb(None) # None = todos los usuarios disabled = set(config.get('disabled', [])) items = [] workers_by_user = config.get('workers_by_user', {}) # Crear mapeos de worker_id -> username y worker_name -> username (para compatibilidad) worker_id_to_username = {} worker_name_to_username = {} for username, user_items in workers_by_user.items(): for item_data in user_items: worker_name = item_data.get('name') worker_id = item_data.get('id') or item_data.get('worker_id') if worker_id: worker_id_to_username[worker_id] = username if worker_name: worker_name_to_username[worker_name] = username for item_data in config.get('items', []): item = ItemMonitor.load_from_json(item_data) worker_id = item.get_id() worker_name = item.get_name() # Obtener el username del worker (priorizar por ID, luego por nombre para compatibilidad) username = worker_id_to_username.get(worker_id) or worker_name_to_username.get(worker_name) # Verificar si está deshabilitado (por ID o por nombre para compatibilidad) is_disabled = worker_id in disabled or worker_name in disabled items.append((item, not is_disabled, username)) general = GeneralMonitor.load_from_json(config.get('general', {})) return items, general, disabled, worker_id_to_username except Exception as e: self.logger.error(f"Error cargando workers de MongoDB: {e}") return [], GeneralMonitor([], [], [], [], []), set(), {} def start_worker(self, item, general_args, username=None): """Inicia un worker""" try: worker = Worker(item, general_args, self._queue_manager, username=username) future = self._executor.submit(worker.run) worker_id = item.get_id() worker_name = item.get_name() self._workers[worker_id] = { 'worker': worker, 'future': future, 'item': item, 'username': username } # Actualizar mapeos if username: self._worker_to_username[worker_id] = username self._worker_id_to_name[worker_id] = worker_name self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) iniciado para usuario '{username or 'unknown'}'") return True except Exception as e: self.logger.error(f"Error iniciando worker '{item.get_name()}' (ID: {item.get_id()}): {e}") return False def stop_worker(self, worker_id): """Detiene un worker por su ID""" if worker_id not in self._workers: return False try: worker_data = self._workers[worker_id] worker = worker_data['worker'] future = worker_data['future'] worker_name = self._worker_id_to_name.get(worker_id, worker_id) # Detener el worker worker.stop() # Intentar cancelar el future si aún no está ejecutándose if not future.done(): future.cancel() # No esperamos a que termine, se detendrá automáticamente del self._workers[worker_id] if worker_id in self._worker_to_username: del self._worker_to_username[worker_id] if worker_id in self._worker_id_to_name: del self._worker_id_to_name[worker_id] self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido") return True except Exception as e: self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}") # Asegurarse de eliminar la entrada aunque haya error if worker_id in self._workers: del self._workers[worker_id] if worker_id in self._worker_to_username: del self._worker_to_username[worker_id] if worker_id in self._worker_id_to_name: del self._worker_id_to_name[worker_id] return False def sync_workers(self): """Sincroniza los workers con la configuración en MongoDB (todos los usuarios)""" items, general_args, disabled, worker_id_to_username = self.load_workers_config() # Actualizar mapeo global self._worker_to_username = worker_id_to_username # Actualizar general_args en todos los workers activos old_general_args = self._general_args self._general_args = general_args current_worker_ids = set(self._workers.keys()) enabled_worker_ids = {item.get_id() for item, enabled, username in items if enabled} # Actualizar general_args en workers existentes with self._lock: for worker_data in self._workers.values(): worker_data['worker'].update_general_args(general_args) # Detener workers que ya no existen o están deshabilitados to_stop = current_worker_ids - enabled_worker_ids for worker_id in list(to_stop): with self._lock: if worker_id in self._workers: worker = self._workers[worker_id]['worker'] worker.stop() future = self._workers[worker_id]['future'] if not future.done(): future.cancel() worker_name = self._worker_id_to_name.get(worker_id, worker_id) del self._workers[worker_id] if worker_id in self._worker_to_username: del self._worker_to_username[worker_id] if worker_id in self._worker_id_to_name: del self._worker_id_to_name[worker_id] self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido") # Iniciar workers nuevos o que se hayan habilitado for item, enabled, username in items: worker_id = item.get_id() worker_name = item.get_name() if enabled and worker_id not in current_worker_ids: # Nuevo worker o recién activado with self._lock: if worker_id not in self._workers: self.start_worker(item, general_args, username=username) elif enabled and worker_id in current_worker_ids: # Worker existente, verificar si hay cambios significativos needs_restart = False with self._lock: if worker_id in self._workers: old_item = self._workers[worker_id]['item'] old_username = self._workers[worker_id].get('username') # Comparar si hay cambios significativos o cambio de usuario if self._has_changes(old_item, item) or old_username != username: self.logger.info(f"Reiniciando worker '{worker_name}' (ID: {worker_id}) por cambios en configuración") worker = self._workers[worker_id]['worker'] worker.stop() future = self._workers[worker_id]['future'] if not future.done(): future.cancel() del self._workers[worker_id] if worker_id in self._worker_to_username: del self._worker_to_username[worker_id] if worker_id in self._worker_id_to_name: del self._worker_id_to_name[worker_id] needs_restart = True else: # Actualizar la referencia al item sin reiniciar self._workers[worker_id]['item'] = item self._workers[worker_id]['username'] = username # Actualizar general_args en el worker self._workers[worker_id]['worker'].update_general_args(general_args) # Reiniciar fuera del lock para evitar deadlocks if needs_restart: time.sleep(0.5) # Dar tiempo para detener with self._lock: if worker_id not in self._workers: # Verificar que no se haya añadido en otro thread self.start_worker(item, general_args, username=username) def _has_changes(self, old_item, new_item): """Verifica si hay cambios significativos entre dos items""" # Comparar campos importantes return ( old_item.get_search_query() != new_item.get_search_query() or old_item.get_min_price() != new_item.get_min_price() or old_item.get_max_price() != new_item.get_max_price() or old_item.get_thread_id() != new_item.get_thread_id() or old_item.get_platform() != new_item.get_platform() or old_item.get_check_every() != new_item.get_check_every() or old_item.get_latitude() != new_item.get_latitude() or old_item.get_longitude() != new_item.get_longitude() or old_item.get_max_distance() != new_item.get_max_distance() ) def monitor_workers_mongodb(self): """Monitorea los workers en MongoDB (todos los usuarios) y sincroniza workers usando polling""" self.logger.info("Iniciando monitor de workers en MongoDB (todos los usuarios)...") last_config_hash = None while self._running: try: time.sleep(2) # Verificar cada 2 segundos if self._db is None: self._db = get_mongodb_client_for_workers() if self._db is None: time.sleep(5) # Esperar más tiempo si no hay MongoDB continue # Obtener configuración actual de MongoDB (todos los usuarios) try: workers_collection = self._db['workers'] all_workers = list(workers_collection.find({}, {'_id': 0})) # Crear un hash combinado de todos los workers if all_workers: import hashlib from datetime import datetime # Función para convertir datetime a string para serialización JSON def json_serial(obj): if isinstance(obj, datetime): return obj.isoformat() raise TypeError(f"Type {type(obj)} not serializable") all_configs_json = json.dumps(all_workers, sort_keys=True, default=json_serial) current_config_hash = hashlib.md5(all_configs_json.encode()).hexdigest() else: current_config_hash = None # Si la configuración cambió, sincronizar if current_config_hash != last_config_hash: if last_config_hash is not None: # Solo loguear si no es la primera vez self.logger.info(f"Detectado cambio en workers de MongoDB (usuarios: {len(all_workers)}), sincronizando workers...") time.sleep(0.5) # Esperar un poco para asegurar consistencia self.sync_workers() last_config_hash = current_config_hash except ConnectionFailure as e: self.logger.error(f"Error leyendo workers de MongoDB: {e}") time.sleep(5) # Esperar más tiempo si hay error self._db = None # Intentar reconectar except Exception as e: self.logger.error(f"Error monitoreando workers en MongoDB: {e}") time.sleep(5) # Esperar más tiempo si hay error def start_monitoring(self): """Inicia el monitoreo en un thread separado""" monitor_thread = threading.Thread(target=self.monitor_workers_mongodb, daemon=True) monitor_thread.start() return monitor_thread def stop_all(self): """Detiene todos los workers""" self._running = False worker_ids = list(self._workers.keys()) for worker_id in worker_ids: try: with self._lock: if worker_id in self._workers: worker = self._workers[worker_id]['worker'] worker.stop() future = self._workers[worker_id]['future'] if not future.done(): future.cancel() del self._workers[worker_id] if worker_id in self._worker_to_username: del self._worker_to_username[worker_id] if worker_id in self._worker_id_to_name: del self._worker_id_to_name[worker_id] except Exception as e: self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}") self._executor.shutdown(wait=False) self.logger.info("Todos los workers detenidos") if __name__ == "__main__": initialize_config_files() configure_logger() logger = logging.getLogger(__name__) # Cargar configuración de cache y crear ArticleCache cache_config = load_cache_config() cache_type = cache_config['cache_type'] cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'} article_cache = create_article_cache(cache_type, **cache_kwargs) # Preparar configuración de MongoDB para TelegramManagerFactory mongodb_config = { 'mongodb': { 'host': cache_config.get('mongodb_host', 'localhost'), 'port': cache_config.get('mongodb_port', 27017), 'database': cache_config.get('mongodb_database', 'wallabicher'), 'username': cache_config.get('mongodb_username'), 'password': cache_config.get('mongodb_password'), 'auth_source': cache_config.get('mongodb_auth_source', 'admin') } } # Crear QueueManager con ArticleCache y configuración de MongoDB queue_manager = QueueManager(article_cache, mongodb_config) # Crear WorkerManager (carga workers de todos los usuarios) items, general_args = parse_items_to_monitor(None) # None = todos los usuarios worker_manager = WorkerManager(general_args, queue_manager) # Sincronizar workers iniciales worker_manager.sync_workers() # Iniciar monitoreo de workers en MongoDB (todos los usuarios) worker_manager.start_monitoring() try: logger.info("Sistema de monitoreo iniciado (todos los usuarios). Esperando cambios en MongoDB...") # Mantener el programa corriendo while True: time.sleep(60) # Sincronización periódica por si acaso worker_manager.sync_workers() except KeyboardInterrupt: logger.info("Deteniendo sistema...") finally: worker_manager.stop_all()