From d28710b9279dfe26f0d7eb854ef232401b3e9744 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Omar=20S=C3=A1nchez=20Pizarro?= Date: Tue, 20 Jan 2026 03:22:56 +0100 Subject: [PATCH] mongodb MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Omar Sánchez Pizarro --- config.sample.yaml | 19 +- datalayer/item_monitor.py | 15 +- docker-compose.yml | 50 +-- managers/article_cache.py | 77 +++-- managers/mongodb_manager.py | 434 ++++++++++++++++++++++++ managers/queue_manager.py | 41 ++- managers/telegram_manager.py | 125 ++++--- managers/telegram_manager_factory.py | 137 ++++++++ managers/worker.py | 13 +- requirements.txt | 2 +- wallabicher.py | 488 ++++++++++++++++++++------- workers.sample.json | 75 ---- 12 files changed, 1166 insertions(+), 310 deletions(-) create mode 100644 managers/mongodb_manager.py create mode 100644 managers/telegram_manager_factory.py delete mode 100644 workers.sample.json diff --git a/config.sample.yaml b/config.sample.yaml index 4302b70..0cac09c 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -1,12 +1,15 @@ telegram_token: "" telegram_channel: "@canal_o_grupo" +enable_polling: true # true para iniciar el bot en modo polling, false para desactivarlo -# Configuración del cache de artículos notificados (Redis requerido) -# Almacena en Redis con TTL de 7 días +# Configuración del cache de artículos notificados (MongoDB requerido) +# Almacena en MongoDB con expiración de 7 días cache: - type: "redis" - redis: - host: "localhost" # En Docker usar: "redis" - port: 6379 - db: 0 - password: null # null o string con la contraseña + type: "mongodb" + mongodb: + host: "localhost" # En Docker usar: "mongodb" + port: 27017 + database: "wallabicher" + username: "admin" # null o string con el usuario + password: "adminpassword" # null o string con la contraseña + auth_source: "admin" # Base de datos para autenticación (admin para usuarios root) diff --git a/datalayer/item_monitor.py b/datalayer/item_monitor.py index 42c666c..1437022 100644 --- a/datalayer/item_monitor.py +++ b/datalayer/item_monitor.py @@ -1,9 +1,11 @@ +import uuid class ItemMonitor: def __init__(self, name, search_query, latitude, longitude, max_distance, condition, min_price, max_price, title_exclude, description_exclude, title_must_include, description_must_include, - title_first_word_exclude, check_every, thread_id, platform, country): + title_first_word_exclude, check_every, thread_id, platform, country, worker_id=None): + self._worker_id = worker_id if worker_id else str(uuid.uuid4()) self._name = name self._search_query = search_query self._latitude = latitude @@ -27,6 +29,11 @@ class ItemMonitor: if 'search_query' not in json_data: raise ValueError("Missing mandatory field: search_query") + # Generar ID si no existe (para compatibilidad con workers antiguos) + worker_id = json_data.get('id') or json_data.get('worker_id') + if not worker_id: + worker_id = str(uuid.uuid4()) + return cls( json_data['name'], json_data['search_query'], @@ -44,9 +51,13 @@ class ItemMonitor: json_data.get('check_every', 30), json_data.get('thread_id', 1), json_data.get('platform', 'wallapop'), # Default to wallapop for backward compatibility - json_data.get('country', 'es') # Default country for platforms that support it (Vinted, etc.) + json_data.get('country', 'es'), # Default country for platforms that support it (Vinted, etc.) + worker_id ) + def get_id(self): + return self._worker_id + def get_name(self): return self._name diff --git a/docker-compose.yml b/docker-compose.yml index 3fa7974..3c9d83b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,18 +1,21 @@ services: - # Redis para cache de artículos - redis: - image: redis:7-alpine - container_name: wallabicher-redis + # MongoDB para almacenar datos + mongodb: + image: mongo:7.0 + container_name: wallabicher-mongodb ports: - - "6379:6379" + - "27018:27017" volumes: - - redis-data:/data - command: redis-server --appendonly yes + - mongodb-data:/data/db + environment: + - MONGO_INITDB_ROOT_USERNAME=admin + - MONGO_INITDB_ROOT_PASSWORD=adminpassword + - MONGO_INITDB_DATABASE=wallabicher healthcheck: - test: ["CMD", "redis-cli", "ping"] + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] interval: 10s - timeout: 3s - retries: 3 + timeout: 5s + retries: 5 networks: - wallabicher-network restart: unless-stopped @@ -27,16 +30,19 @@ services: - NODE_ENV=production - PORT=3001 - PROJECT_ROOT=/data - - REDIS_HOST=redis + - MONGODB_HOST=mongodb + - MONGODB_PORT=27017 + - MONGODB_DATABASE=wallabicher + - MONGODB_USERNAME=admin + - MONGODB_PASSWORD=adminpassword volumes: # Montar archivos de configuración y datos en ubicación predecible - ./config.yaml:/data/config.yaml:ro - - ./workers.json:/data/workers.json:rw - ./logs:/data/logs:rw # Montar el directorio raíz para acceso a archivos - .:/data/project:ro depends_on: - redis: + mongodb: condition: service_healthy networks: - wallabicher-network @@ -60,11 +66,11 @@ services: restart: unless-stopped # Servicio Python principal (Wallabicher) - # NOTA: Para usar Redis, asegúrate de que config.yaml tenga: + # NOTA: Para usar MongoDB, asegúrate de que config.yaml tenga: # cache: - # type: "redis" - # redis: - # host: "redis" # Nombre del servicio en Docker + # type: "mongodb" + # mongodb: + # host: "mongodb" # Nombre del servicio en Docker wallabicher: build: context: . @@ -72,14 +78,18 @@ services: container_name: wallabicher-python environment: - PYTHONUNBUFFERED=1 + - MONGODB_HOST=mongodb + - MONGODB_PORT=27017 + - MONGODB_DATABASE=wallabicher + - MONGODB_USERNAME=admin + - MONGODB_PASSWORD=adminpassword volumes: # Montar archivos de configuración - ./config.yaml:/app/config.yaml:ro - - ./workers.json:/app/workers.json:rw # Montar directorio de logs en lugar del archivo para evitar problemas - ./logs:/app/logs:rw depends_on: - redis: + mongodb: condition: service_healthy backend: condition: service_healthy @@ -89,7 +99,7 @@ services: # El servicio Python no necesita exponer puertos, solo se comunica con Redis y Telegram volumes: - redis-data: + mongodb-data: driver: local networks: diff --git a/managers/article_cache.py b/managers/article_cache.py index a55c488..d2b57d5 100644 --- a/managers/article_cache.py +++ b/managers/article_cache.py @@ -1,8 +1,10 @@ import logging -import redis import json -NOTIFIED_ARTICLE_TTL = 7 * 24 * 60 * 60 # TTL de 7 días en segundos para artículos notificados +# Importar MongoDBArticleCache desde mongodb_manager +from managers.mongodb_manager import MongoDBArticleCache + +NOTIFIED_ARTICLE_TTL = 7 * 24 * 60 * 60 # TTL de 7 días en segundos para artículos notificados (mantener para compatibilidad) class RedisArticleCache: """Maneja el cache de artículos notificados usando Redis""" @@ -45,21 +47,29 @@ class RedisArticleCache: self.logger.error(f"Error verificando artículo en Redis: {e}") return False - def mark_article_as_notified(self, article): + def mark_article_as_notified(self, article, username=None, worker_name=None): """Marca un artículo como notificado en Redis con TTL, guardando toda la información del artículo""" try: key = self._get_article_key(article) # Guardar toda la información del artículo como JSON - # Verificar si el artículo ya existe para mantener el estado de favorito + # Verificar si el artículo ya existe para mantener el estado de favorito, username y worker_name existing_value = self._redis_client.get(key) is_favorite = False + existing_username = None + existing_worker_name = None if existing_value: try: existing_data = json.loads(existing_value) is_favorite = existing_data.get('is_favorite', False) + existing_username = existing_data.get('username') + existing_worker_name = existing_data.get('worker_name') except json.JSONDecodeError: pass + # Mantener username y worker_name existentes si ya existen, o usar los nuevos si se proporcionan + final_username = existing_username if existing_username else username + final_worker_name = existing_worker_name if existing_worker_name else worker_name + article_data = { 'id': article.get_id(), 'title': article.get_title(), @@ -74,11 +84,18 @@ class RedisArticleCache: 'platform': article.get_platform(), 'is_favorite': is_favorite, # Mantener el estado de favorito } + + # Añadir username y worker_name si están disponibles + if final_username: + article_data['username'] = final_username + if final_worker_name: + article_data['worker_name'] = final_worker_name + self._redis_client.setex(key, NOTIFIED_ARTICLE_TTL, json.dumps(article_data)) except Exception as e: self.logger.error(f"Error marcando artículo como notificado en Redis: {e}") - def mark_articles_as_notified(self, articles): + def mark_articles_as_notified(self, articles, username=None, worker_name=None): """Añade múltiples artículos a la lista de artículos ya notificados en Redis""" article_list = articles if isinstance(articles, list) else [articles] @@ -122,6 +139,13 @@ class RedisArticleCache: 'platform': article.get_platform(), 'is_favorite': False, # Por defecto no es favorito } + + # Añadir username y worker_name si están disponibles + if username: + article_data['username'] = username + if worker_name: + article_data['worker_name'] = worker_name + pipe.setex(key, NOTIFIED_ARTICLE_TTL, json.dumps(article_data)) added_count += 1 @@ -209,28 +233,35 @@ class RedisArticleCache: return 0 -def create_article_cache(cache_type='redis', **kwargs): +def create_article_cache(cache_type='mongodb', **kwargs): """ - Factory function para crear el cache de artículos usando Redis. + Factory function para crear el cache de artículos usando MongoDB. Args: - cache_type: 'redis' (solo Redis está soportado) - **kwargs: Argumentos para Redis: - - redis_host: host de Redis (default: 'localhost') - - redis_port: puerto de Redis (default: 6379) - - redis_db: base de datos de Redis (default: 0) - - redis_password: contraseña de Redis (opcional) + cache_type: 'mongodb' (solo MongoDB está soportado) + **kwargs: Argumentos para MongoDB: + - mongodb_host: host de MongoDB (default: 'localhost') + - mongodb_port: puerto de MongoDB (default: 27017) + - mongodb_database: base de datos (default: 'wallabicher') + - mongodb_username: usuario de MongoDB (opcional) + - mongodb_password: contraseña de MongoDB (opcional) + - mongodb_auth_source: base de datos para autenticación (default: 'admin') Returns: - RedisArticleCache + MongoDBArticleCache """ - if cache_type != 'redis': - raise ValueError(f"Tipo de cache desconocido: {cache_type}. Solo se soporta 'redis'") - - return RedisArticleCache( - redis_host=kwargs.get('redis_host', 'localhost'), - redis_port=kwargs.get('redis_port', 6379), - redis_db=kwargs.get('redis_db', 0), - redis_password=kwargs.get('redis_password') - ) + if cache_type == 'mongodb': + return MongoDBArticleCache( + mongodb_host=kwargs.get('mongodb_host', 'localhost'), + mongodb_port=kwargs.get('mongodb_port', 27017), + mongodb_database=kwargs.get('mongodb_database', 'wallabicher'), + mongodb_username=kwargs.get('mongodb_username'), + mongodb_password=kwargs.get('mongodb_password'), + mongodb_auth_source=kwargs.get('mongodb_auth_source', 'admin') + ) + elif cache_type == 'redis': + # Mantener compatibilidad con Redis (deprecated) + raise ValueError("Redis ya no está soportado. Por favor, usa MongoDB.") + else: + raise ValueError(f"Tipo de cache desconocido: {cache_type}. Solo se soporta 'mongodb'") diff --git a/managers/mongodb_manager.py b/managers/mongodb_manager.py new file mode 100644 index 0000000..5076e43 --- /dev/null +++ b/managers/mongodb_manager.py @@ -0,0 +1,434 @@ +import logging +from pymongo import MongoClient +from pymongo.operations import UpdateOne +from pymongo.errors import ConnectionFailure, DuplicateKeyError +from datetime import datetime, timedelta +import json + +NOTIFIED_ARTICLE_TTL_DAYS = 7 # TTL de 7 días + +class MongoDBArticleCache: + """Maneja el cache de artículos notificados usando MongoDB""" + + def __init__(self, mongodb_host='localhost', mongodb_port=27017, mongodb_database='wallabicher', + mongodb_username=None, mongodb_password=None, mongodb_auth_source='admin'): + self.logger = logging.getLogger(__name__) + + # Inicializar conexión MongoDB + try: + # Construir URL de conexión + if mongodb_username and mongodb_password: + connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}" + else: + connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/" + + self._mongo_client = MongoClient( + connection_string, + serverSelectionTimeoutMS=5000, + connectTimeoutMS=5000, + socketTimeoutMS=5000 + ) + + # Verificar conexión + self._mongo_client.admin.command('ping') + self._db = self._mongo_client[mongodb_database] + self._articles_collection = self._db['articles'] + self._workers_collection = self._db['workers'] + + # Crear índices + self._create_indexes() + + self.logger.info(f"Conectado a MongoDB en {mongodb_host}:{mongodb_port} (db={mongodb_database})") + except ConnectionFailure as e: + self.logger.error(f"Error conectando a MongoDB: {e}") + self.logger.error("MongoDB no está disponible. El sistema no podrá evitar duplicados sin MongoDB.") + raise + except Exception as e: + self.logger.error(f"Error inesperado inicializando MongoDB: {e}") + raise + + def _create_indexes(self): + """Crea los índices necesarios en MongoDB""" + try: + # Índices para artículos + self._articles_collection.create_index([('platform', 1), ('id', 1)], unique=True) + self._articles_collection.create_index([('username', 1)]) + self._articles_collection.create_index([('worker_name', 1)]) + self._articles_collection.create_index([('notifiedAt', -1)]) + self._articles_collection.create_index([('expiresAt', 1)], expireAfterSeconds=0) + + # Índices para workers + self._workers_collection.create_index([('username', 1)], unique=True) + + self.logger.debug("Índices de MongoDB creados") + except Exception as e: + self.logger.warning(f"Error creando índices de MongoDB: {e}") + + def _get_article_key(self, article): + """Genera una clave única para un artículo (solo para compatibilidad)""" + return f"notified:{article.get_platform()}:{article.get_id()}" + + def is_article_notified(self, article): + """Verifica si un artículo ya ha sido notificado""" + try: + count = self._articles_collection.count_documents({ + 'platform': article.get_platform(), + 'id': str(article.get_id()) + }) + return count > 0 + except Exception as e: + self.logger.error(f"Error verificando artículo en MongoDB: {e}") + return False + + def mark_article_as_notified(self, article, username=None, worker_name=None): + """Marca un artículo como notificado en MongoDB con TTL usando estructura user_info""" + try: + expires_at = datetime.utcnow() + timedelta(days=NOTIFIED_ARTICLE_TTL_DAYS) + + # Obtener artículo existente si existe + existing = self._articles_collection.find_one({ + 'platform': article.get_platform(), + 'id': str(article.get_id()) + }) + + # Preparar datos del artículo (sin user_info) + article_data = { + 'platform': article.get_platform(), + 'id': str(article.get_id()), + 'title': article.get_title(), + 'description': article._description, + 'price': article.get_price(), + 'currency': article.get_currency(), + 'location': article.get_location(), + 'allows_shipping': article._allows_shipping, + 'url': article.get_url(), + 'images': article.get_images(), + 'modified_at': article.get_modified_at(), + 'expiresAt': expires_at, + 'updatedAt': datetime.utcnow(), + } + + # Preparar user_info para este usuario/worker + user_info_entry = { + 'username': username, + 'worker_name': worker_name, + 'notified': True, + 'notified_at': datetime.utcnow(), + 'is_favorite': False, + } + + if existing: + # Artículo existe, actualizar o añadir user_info + existing_user_info = existing.get('user_info', []) + + # Buscar si ya existe un user_info para este usuario + user_info_index = None + for i, ui in enumerate(existing_user_info): + if ui.get('username') == username: + user_info_index = i + break + + if user_info_index is not None: + # Actualizar user_info existente + existing_user_info[user_info_index].update({ + 'worker_name': worker_name or existing_user_info[user_info_index].get('worker_name'), + 'notified': True, + 'notified_at': datetime.utcnow(), + # Mantener is_favorite existente + 'is_favorite': existing_user_info[user_info_index].get('is_favorite', False), + }) + else: + # Añadir nuevo user_info + existing_user_info.append(user_info_entry) + + article_data['user_info'] = existing_user_info + + # Upsert: actualizar + self._articles_collection.update_one( + {'platform': article.get_platform(), 'id': str(article.get_id())}, + {'$set': article_data} + ) + else: + # Artículo nuevo, crear con user_info + article_data['user_info'] = [user_info_entry] + article_data['createdAt'] = datetime.utcnow() + + # Insertar nuevo artículo + self._articles_collection.insert_one(article_data) + + except Exception as e: + self.logger.error(f"Error marcando artículo como notificado en MongoDB: {e}") + + def mark_articles_as_notified(self, articles, username=None, worker_name=None): + """Añade múltiples artículos a la lista de artículos ya notificados en MongoDB usando estructura user_info""" + if not articles: + return + + article_list = articles if isinstance(articles, list) else [articles] + + try: + expires_at = datetime.utcnow() + timedelta(days=NOTIFIED_ARTICLE_TTL_DAYS) + now = datetime.utcnow() + + # Obtener todos los artículos existentes de una vez para optimizar + article_ids = [(a.get_platform(), str(a.get_id())) for a in article_list] + existing_articles = { + (a['platform'], a['id']): a + for a in self._articles_collection.find({ + '$or': [{'platform': p, 'id': i} for p, i in article_ids] + }) + } + + operations = [] + for article in article_list: + platform = article.get_platform() + article_id = str(article.get_id()) + key = (platform, article_id) + + # Preparar datos del artículo (sin user_info) + article_data = { + 'platform': platform, + 'id': article_id, + 'title': article.get_title(), + 'description': article._description, + 'price': article.get_price(), + 'currency': article.get_currency(), + 'location': article.get_location(), + 'allows_shipping': article._allows_shipping, + 'url': article.get_url(), + 'images': article.get_images(), + 'modified_at': article.get_modified_at(), + 'expiresAt': expires_at, + 'updatedAt': now, + } + + # Preparar user_info + existing = existing_articles.get(key) + if existing: + # Artículo existe, actualizar user_info + existing_user_info = existing.get('user_info', []) + + # Buscar si ya existe user_info para este usuario + user_info_index = None + for i, ui in enumerate(existing_user_info): + if ui.get('username') == username: + user_info_index = i + break + + if user_info_index is not None: + # Actualizar user_info existente + existing_user_info[user_info_index].update({ + 'worker_name': worker_name or existing_user_info[user_info_index].get('worker_name'), + 'notified': True, + 'notified_at': now, + # Mantener is_favorite existente + 'is_favorite': existing_user_info[user_info_index].get('is_favorite', False), + }) + else: + # Añadir nuevo user_info + existing_user_info.append({ + 'username': username, + 'worker_name': worker_name, + 'notified': True, + 'notified_at': now, + 'is_favorite': False, + }) + + article_data['user_info'] = existing_user_info + + operations.append( + UpdateOne( + {'platform': platform, 'id': article_id}, + {'$set': article_data} + ) + ) + else: + # Artículo nuevo + article_data['user_info'] = [{ + 'username': username, + 'worker_name': worker_name, + 'notified': True, + 'notified_at': now, + 'is_favorite': False, + }] + article_data['createdAt'] = now + + operations.append( + UpdateOne( + {'platform': platform, 'id': article_id}, + { + '$set': article_data, + }, + upsert=True + ) + ) + + if operations: + self._articles_collection.bulk_write(operations, ordered=False) + self.logger.debug(f"{len(operations)} artículos añadidos/actualizados en MongoDB") + except Exception as e: + self.logger.error(f"Error añadiendo artículos a MongoDB: {e}") + + def set_favorite(self, platform, article_id, username, is_favorite=True): + """Marca o desmarca un artículo como favorito en MongoDB para un usuario específico""" + try: + if not username: + self.logger.error("username es requerido para marcar favoritos") + return False + + article = self._articles_collection.find_one({ + 'platform': platform, + 'id': str(article_id) + }) + + if not article: + self.logger.error(f"Artículo no encontrado: {platform}:{article_id}") + return False + + user_info_list = article.get('user_info', []) + user_info_index = None + + # Buscar user_info para este usuario + for i, ui in enumerate(user_info_list): + if ui.get('username') == username: + user_info_index = i + break + + if user_info_index is not None: + # Actualizar user_info existente + user_info_list[user_info_index]['is_favorite'] = is_favorite + else: + # Si no existe user_info para este usuario, crear uno + user_info_list.append({ + 'username': username, + 'worker_name': None, + 'notified': False, + 'notified_at': None, + 'is_favorite': is_favorite, + }) + + self._articles_collection.update_one( + {'platform': platform, 'id': str(article_id)}, + { + '$set': { + 'user_info': user_info_list, + 'updatedAt': datetime.utcnow() + } + } + ) + return True + except Exception as e: + self.logger.error(f"Error marcando favorito en MongoDB: {e}") + return False + + def get_favorites(self, username=None): + """Obtiene todos los artículos marcados como favoritos, opcionalmente filtrados por usuario""" + try: + # Buscar artículos que tengan favoritos en user_info + query = {'user_info.is_favorite': True} + if username: + query['user_info.username'] = username + + articles = list(self._articles_collection.find(query)) + + # Filtrar y transformar para devolver solo los favoritos relevantes + favorites = [] + for article in articles: + user_info_list = article.get('user_info', []) + + for user_info in user_info_list: + if user_info.get('is_favorite') and (not username or user_info.get('username') == username): + fav = dict(article) + fav['_id'] = str(fav['_id']) + fav['username'] = user_info.get('username') + fav['worker_name'] = user_info.get('worker_name') + fav['is_favorite'] = True + + # Convertir fechas a timestamps/isoformat + notified_at = user_info.get('notified_at') + if isinstance(notified_at, datetime): + fav['notifiedAt'] = notified_at.isoformat() + elif 'notifiedAt' in fav and isinstance(fav['notifiedAt'], datetime): + fav['notifiedAt'] = fav['notifiedAt'].isoformat() + + if 'expiresAt' in fav and isinstance(fav['expiresAt'], datetime): + fav['expiresAt'] = fav['expiresAt'].isoformat() + + favorites.append(fav) + + return favorites + except Exception as e: + self.logger.error(f"Error obteniendo favoritos de MongoDB: {e}") + return [] + + def is_favorite(self, platform, article_id, username=None): + """Verifica si un artículo es favorito para un usuario específico""" + try: + article = self._articles_collection.find_one({ + 'platform': platform, + 'id': str(article_id) + }) + + if not article: + return False + + # Buscar en user_info si hay username + if username: + user_info_list = article.get('user_info', []) + for ui in user_info_list: + if ui.get('username') == username: + return ui.get('is_favorite', False) + return False + else: + # Sin username, verificar si hay algún favorito (compatibilidad) + user_info_list = article.get('user_info', []) + for ui in user_info_list: + if ui.get('is_favorite'): + return True + # Compatibilidad con estructura antigua + return article.get('is_favorite', False) + except Exception as e: + self.logger.error(f"Error verificando favorito en MongoDB: {e}") + return False + + def clear_cache(self): + """Elimina toda la caché de artículos notificados en MongoDB""" + try: + result = self._articles_collection.delete_many({}) + count = result.deleted_count + self.logger.info(f"Cache de MongoDB limpiado: {count} artículos eliminados") + return count + except Exception as e: + self.logger.error(f"Error limpiando cache de MongoDB: {e}") + return 0 + + +def create_article_cache(cache_type='mongodb', **kwargs): + """ + Factory function para crear el cache de artículos usando MongoDB. + + Args: + cache_type: 'mongodb' (solo MongoDB está soportado) + **kwargs: Argumentos para MongoDB: + - mongodb_host: host de MongoDB (default: 'localhost') + - mongodb_port: puerto de MongoDB (default: 27017) + - mongodb_database: base de datos (default: 'wallabicher') + - mongodb_username: usuario de MongoDB (opcional) + - mongodb_password: contraseña de MongoDB (opcional) + - mongodb_auth_source: base de datos para autenticación (default: 'admin') + + Returns: + MongoDBArticleCache + """ + if cache_type != 'mongodb': + raise ValueError(f"Tipo de cache desconocido: {cache_type}. Solo se soporta 'mongodb'") + + return MongoDBArticleCache( + mongodb_host=kwargs.get('mongodb_host', 'localhost'), + mongodb_port=kwargs.get('mongodb_port', 27017), + mongodb_database=kwargs.get('mongodb_database', 'wallabicher'), + mongodb_username=kwargs.get('mongodb_username'), + mongodb_password=kwargs.get('mongodb_password'), + mongodb_auth_source=kwargs.get('mongodb_auth_source', 'admin') + ) + diff --git a/managers/queue_manager.py b/managers/queue_manager.py index af8ec8f..c45e6ec 100644 --- a/managers/queue_manager.py +++ b/managers/queue_manager.py @@ -2,38 +2,41 @@ import queue import threading import time import logging -from managers.telegram_manager import TelegramManager +from managers.telegram_manager_factory import TelegramManagerFactory MESSAGE_DELAY = 3.0 # Tiempo de espera entre mensajes en segundos RETRY_TIMES = 3 class QueueManager: - def __init__(self, article_cache): + def __init__(self, article_cache, mongodb_config=None): self.logger = logging.getLogger(__name__) - self._queue = queue.Queue() # Cola thread-safe - self._telegram_manager = TelegramManager() + self._queue = queue.Queue() # Cola thread-safe: (username, search_name, article, thread_id, retry_times) self._article_cache = article_cache self._running = True + # Inicializar factory de TelegramManager + TelegramManagerFactory.initialize(mongodb_config) + # Iniciar el thread de procesamiento self._processor_thread = threading.Thread(target=self._process_queue, daemon=True) self._processor_thread.start() - def add_to_queue(self, article, search_name=None, thread_id=None, retry_times=RETRY_TIMES): + def add_to_queue(self, article, search_name=None, thread_id=None, retry_times=RETRY_TIMES, username=None): # Verificar si el artículo ya ha sido enviado if self._article_cache.is_article_notified(article): return if search_name is None: search_name = "Unknown" - self._queue.put((search_name, article, thread_id, retry_times)) - self.logger.debug(f"Artículo añadido a la cola: {article.get_title()}") + # Añadir username a la cola para usar el bot correcto + self._queue.put((username, search_name, article, thread_id, retry_times)) + self.logger.debug(f"Artículo añadido a la cola para usuario '{username}': {article.get_title()}") - self._article_cache.mark_article_as_notified(article) + self._article_cache.mark_article_as_notified(article, username=username, worker_name=search_name) - def add_to_notified_articles(self, articles): + def add_to_notified_articles(self, articles, username=None, worker_name=None): """Añade artículos a la lista de artículos ya notificados""" - self._article_cache.mark_articles_as_notified(articles) + self._article_cache.mark_articles_as_notified(articles, username=username, worker_name=worker_name) def _process_queue(self): self.logger.info("Procesador de cola: Iniciado") @@ -42,19 +45,27 @@ class QueueManager: try: # Esperar hasta que haya un elemento en la cola (timeout de 1 segundo) try: - search_name, article, thread_id, retry_times = self._queue.get(timeout=1.0) + username, search_name, article, thread_id, retry_times = self._queue.get(timeout=1.0) except queue.Empty: continue + # Obtener TelegramManager para el usuario específico + telegram_manager = TelegramManagerFactory.get_manager(username) + + if telegram_manager is None: + self.logger.warning(f"No hay TelegramManager disponible para usuario '{username}', saltando artículo") + self._queue.task_done() + continue + # Procesar el artículo try: - self._telegram_manager.send_telegram_article(search_name, article, thread_id) + telegram_manager.send_telegram_article(search_name, article, thread_id) except Exception as e: - self.logger.error(f"Error al enviar artículo a Telegram: {e}") + self.logger.error(f"Error al enviar artículo a Telegram para usuario '{username}': {e}") if retry_times > 0: - self._queue.put((search_name, article, thread_id, retry_times - 1)) + self._queue.put((username, search_name, article, thread_id, retry_times - 1)) else: - self.logger.error(f"Artículo no enviado después de {RETRY_TIMES} intentos") + self.logger.error(f"Artículo no enviado después de {RETRY_TIMES} intentos para usuario '{username}'") finally: self._queue.task_done() diff --git a/managers/telegram_manager.py b/managers/telegram_manager.py index de72ae0..9c43854 100644 --- a/managers/telegram_manager.py +++ b/managers/telegram_manager.py @@ -26,10 +26,29 @@ ITEM_HTML = """ """ class TelegramManager: - def __init__(self): + def __init__(self, token=None, channel=None, enable_polling=True, username=None): + """ + Inicializa TelegramManager con configuración específica + + Args: + token: Token del bot de Telegram (si None, intenta leer de config.yaml) + channel: Canal de Telegram (si None, intenta leer de config.yaml) + enable_polling: Si iniciar el polling del bot + username: Usuario propietario de esta configuración (para logging) + """ self.logger = logging.getLogger(__name__) - token, channel = self.get_config() + self._username = username + + # Si no se proporcionan token/channel, intentar leer de config.yaml (compatibilidad) + if token is None or channel is None: + token, channel, enable_polling = self._get_config_from_file() + + if not token or not channel: + raise ValueError("Token y channel de Telegram son requeridos") + self._channel = channel + self._token = token + # Use ApplicationBuilder to create the bot application with increased timeouts self._application = telegram.ext.ApplicationBuilder() \ .token(token) \ @@ -42,26 +61,35 @@ class TelegramManager: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) - # Inicializar Redis para favoritos - self._article_cache = self._init_redis_cache() + # Inicializar MongoDB para favoritos + self._article_cache = self._init_mongodb_cache() # Añadir handlers para comandos y callbacks self._add_handlers() - # Iniciar polling en un thread separado - self._start_polling() + # Iniciar polling en un thread separado solo si está habilitado + if enable_polling: + self._start_polling() + else: + self.logger.info(f"Polling deshabilitado por configuración{' para usuario ' + username if username else ''}") - def get_config(self): + def _get_config_from_file(self): + """Lee configuración de config.yaml (para compatibilidad hacia atrás)""" base_dir = os.path.dirname(os.path.abspath(__file__)) config_file = os.path.join(os.path.dirname(base_dir), 'config.yaml') - with open(config_file, 'r') as file: - config = yaml.safe_load(file) - token = config['telegram_token'] - telegram_channel = config['telegram_channel'] - return token, telegram_channel + try: + with open(config_file, 'r') as file: + config = yaml.safe_load(file) + token = config.get('telegram_token') + telegram_channel = config.get('telegram_channel') + enable_polling = config.get('enable_polling', True) + return token, telegram_channel, enable_polling + except Exception as e: + self.logger.warning(f"No se pudo leer config.yaml: {e}") + return None, None, False - def _init_redis_cache(self): - """Inicializa Redis cache para favoritos""" + def _init_mongodb_cache(self): + """Inicializa MongoDB cache para favoritos""" try: base_dir = os.path.dirname(os.path.abspath(__file__)) config_file = os.path.join(os.path.dirname(base_dir), 'config.yaml') @@ -69,20 +97,22 @@ class TelegramManager: config = yaml.safe_load(file) cache_config = config.get('cache', {}) - if cache_config.get('type') == 'redis': - redis_config = cache_config.get('redis', {}) + if cache_config.get('type') == 'mongodb': + mongodb_config = cache_config.get('mongodb', {}) return create_article_cache( - cache_type='redis', - redis_host=redis_config.get('host', 'localhost'), - redis_port=redis_config.get('port', 6379), - redis_db=redis_config.get('db', 0), - redis_password=redis_config.get('password') + cache_type='mongodb', + mongodb_host=os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost'), + mongodb_port=int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)), + mongodb_database=os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher'), + mongodb_username=os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username'), + mongodb_password=os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password'), + mongodb_auth_source=mongodb_config.get('auth_source', 'admin') ) else: - self.logger.warning("Redis no configurado para favoritos, se requiere Redis") + self.logger.warning("MongoDB no configurado para favoritos, se requiere MongoDB") return None except Exception as e: - self.logger.error(f"Error inicializando Redis para favoritos: {e}") + self.logger.error(f"Error inicializando MongoDB para favoritos: {e}") return None def escape_html(self, text): @@ -198,7 +228,8 @@ class TelegramManager: """Inicia el bot en modo polling en un thread separado""" def run_polling(): try: - self.logger.info("Iniciando polling de Telegram bot...") + user_info = f" para usuario '{self._username}'" if self._username else "" + self.logger.info(f"Iniciando polling de Telegram bot{user_info}...") # Crear un nuevo event loop para este thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -208,11 +239,13 @@ class TelegramManager: loop.create_task(self._application.updater.start_polling(allowed_updates=["message", "callback_query"])) loop.run_forever() except Exception as e: - self.logger.error(f"Error en polling: {e}") + user_info = f" (usuario: {self._username})" if self._username else "" + self.logger.error(f"Error en polling{user_info}: {e}") polling_thread = threading.Thread(target=run_polling, daemon=True) polling_thread.start() - self.logger.info("Thread de polling iniciado") + user_info = f" para usuario '{self._username}'" if self._username else "" + self.logger.info(f"Thread de polling iniciado{user_info}") async def handle_favorite_callback(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE): """Maneja el callback cuando se presiona el botón de favoritos""" @@ -220,7 +253,7 @@ class TelegramManager: await query.answer() if not self._article_cache: - await query.edit_message_text("❌ Redis no está disponible para favoritos") + await query.edit_message_text("❌ MongoDB no está disponible para favoritos") return # Extraer plataforma, ID del artículo y nombre de búsqueda del callback_data @@ -241,23 +274,23 @@ class TelegramManager: await query.message.reply_text("ℹ️ Este artículo ya está en favoritos") return - # Obtener la URL del artículo desde Redis + # Obtener la URL del artículo desde MongoDB url = "" try: - redis_client = self._article_cache._redis_client - key = f"notified:{platform}:{article_id}" - value = redis_client.get(key) - if value: - article_data = json.loads(value) - url = article_data.get('url', '') + article = self._article_cache._articles_collection.find_one({ + 'platform': platform, + 'id': str(article_id) + }) + if article: + url = article.get('url', '') except Exception as e: self.logger.debug(f"Error obteniendo URL: {e}") - # Marcar como favorito en Redis + # Marcar como favorito en MongoDB success = self._article_cache.set_favorite(platform, article_id, is_favorite=True) if not success: - await query.edit_message_text("❌ No se pudo encontrar el artículo en Redis") + await query.edit_message_text("❌ No se pudo encontrar el artículo en MongoDB") return # Actualizar el mensaje del botón @@ -320,7 +353,7 @@ class TelegramManager: async def handle_favs_command(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE): """Maneja el comando /favs para mostrar los favoritos""" if not self._article_cache: - await update.message.reply_text("❌ Redis no está disponible para favoritos") + await update.message.reply_text("❌ MongoDB no está disponible para favoritos") return favorites = self._article_cache.get_favorites() @@ -357,7 +390,7 @@ class TelegramManager: await query.answer() if not self._article_cache: - await query.edit_message_text("❌ Redis no está disponible para favoritos") + await query.edit_message_text("❌ MongoDB no está disponible para favoritos") return # Extraer plataforma e ID del artículo del callback_data @@ -372,19 +405,19 @@ class TelegramManager: platform = parts[1] article_id = parts[2] - # Obtener la URL del artículo desde Redis antes de desmarcar + # Obtener la URL del artículo desde MongoDB antes de desmarcar url = "" try: - redis_client = self._article_cache._redis_client - key = f"notified:{platform}:{article_id}" - value = redis_client.get(key) - if value: - article_data = json.loads(value) - url = article_data.get('url', '') + article = self._article_cache._articles_collection.find_one({ + 'platform': platform, + 'id': str(article_id) + }) + if article: + url = article.get('url', '') except Exception as e: self.logger.debug(f"Error obteniendo URL: {e}") - # Desmarcar como favorito en Redis + # Desmarcar como favorito en MongoDB success = self._article_cache.set_favorite(platform, article_id, is_favorite=False) if not success: diff --git a/managers/telegram_manager_factory.py b/managers/telegram_manager_factory.py new file mode 100644 index 0000000..aad3ff1 --- /dev/null +++ b/managers/telegram_manager_factory.py @@ -0,0 +1,137 @@ +import logging +import threading +from managers.telegram_manager import TelegramManager +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure + +class TelegramManagerFactory: + """Factory para gestionar múltiples instancias de TelegramManager (una por usuario)""" + + _instances = {} # Dict: username -> TelegramManager + _lock = threading.Lock() + _mongodb_client = None + _mongodb_config = None + + @classmethod + def initialize(cls, mongodb_config=None): + """Inicializa la factory con configuración de MongoDB""" + cls._mongodb_config = mongodb_config + cls.logger = logging.getLogger(__name__) + + @classmethod + def _get_mongodb_client(cls): + """Obtiene cliente de MongoDB para leer configuración de Telegram""" + if cls._mongodb_client is not None: + return cls._mongodb_client + + if not cls._mongodb_config: + return None + + try: + mongodb_config = cls._mongodb_config.get('mongodb', {}) + mongodb_host = mongodb_config.get('host', 'localhost') + mongodb_port = mongodb_config.get('port', 27017) + mongodb_database = mongodb_config.get('database', 'wallabicher') + mongodb_username = mongodb_config.get('username') + mongodb_password = mongodb_config.get('password') + mongodb_auth_source = mongodb_config.get('auth_source', 'admin') + + if mongodb_username and mongodb_password: + connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}" + else: + connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/" + + client = MongoClient(connection_string, serverSelectionTimeoutMS=5000) + client.admin.command('ping') + cls._mongodb_client = client[mongodb_database] + return cls._mongodb_client + except Exception as e: + cls.logger.error(f"Error conectando a MongoDB para configuración de Telegram: {e}") + return None + + @classmethod + def get_telegram_config(cls, username): + """Obtiene configuración de Telegram para un usuario desde MongoDB""" + db = cls._get_mongodb_client() + if db is None: + return None + + try: + users_collection = db['users'] + user = users_collection.find_one({'username': username}) + + if user and 'telegram' in user: + telegram_config = user['telegram'] + return { + 'token': telegram_config.get('token'), + 'channel': telegram_config.get('channel'), + 'enable_polling': telegram_config.get('enable_polling', False) + } + except Exception as e: + cls.logger.error(f"Error obteniendo configuración de Telegram para {username}: {e}") + + return None + + @classmethod + def get_manager(cls, username): + """ + Obtiene o crea un TelegramManager para un usuario específico + + Args: + username: Nombre de usuario + + Returns: + TelegramManager o None si no hay configuración + """ + if username is None: + # Si no hay username, usar configuración global (compatibilidad) + with cls._lock: + if 'global' not in cls._instances: + try: + cls._instances['global'] = TelegramManager() + except Exception as e: + cls.logger.error(f"Error creando TelegramManager global: {e}") + return None + return cls._instances.get('global') + + with cls._lock: + # Si ya existe, retornarlo + if username in cls._instances: + return cls._instances[username] + + # Obtener configuración de MongoDB + config = cls.get_telegram_config(username) + if not config or not config.get('token') or not config.get('channel'): + cls.logger.warning(f"No hay configuración de Telegram para usuario '{username}'") + return None + + # Crear nueva instancia + try: + manager = TelegramManager( + token=config['token'], + channel=config['channel'], + enable_polling=config.get('enable_polling', False), + username=username + ) + cls._instances[username] = manager + cls.logger.info(f"TelegramManager creado para usuario '{username}'") + return manager + except Exception as e: + cls.logger.error(f"Error creando TelegramManager para usuario '{username}': {e}") + return None + + @classmethod + def remove_manager(cls, username): + """Elimina un TelegramManager del cache""" + with cls._lock: + if username in cls._instances: + del cls._instances[username] + cls.logger.info(f"TelegramManager eliminado del cache para usuario '{username}'") + + @classmethod + def clear_cache(cls): + """Limpia todo el cache de managers""" + with cls._lock: + cls._instances.clear() + cls.logger.info("Cache de TelegramManager limpiado") + diff --git a/managers/worker.py b/managers/worker.py index 123c8d3..d77c20d 100644 --- a/managers/worker.py +++ b/managers/worker.py @@ -8,11 +8,12 @@ from managers.worker_conditions import WorkerConditions ERROR_SLEEP_TIME = 60 class Worker: - def __init__(self, item_to_monitor, general_args, queue_manager): + def __init__(self, item_to_monitor, general_args, queue_manager, username=None): self.logger = logging.getLogger(__name__) self._item_monitoring = item_to_monitor self._general_args = general_args self._queue_manager = queue_manager + self._username = username # Usuario propietario del worker self._worker_conditions = WorkerConditions(item_to_monitor, general_args) self._running = True self._stop_event = threading.Event() @@ -26,7 +27,8 @@ class Worker: raise # Initialize the queue with existing articles - self._queue_manager.add_to_notified_articles(self._request_articles()) + worker_name = self._item_monitoring.get_name() + self._queue_manager.add_to_notified_articles(self._request_articles(), username=self._username, worker_name=worker_name) def update_general_args(self, general_args): """Actualiza los argumentos generales del worker""" @@ -58,7 +60,12 @@ class Worker: break if self._worker_conditions.meets_item_conditions(article): try: - self._queue_manager.add_to_queue(article, self._item_monitoring.get_name(), self._item_monitoring.get_thread_id()) + self._queue_manager.add_to_queue( + article, + self._item_monitoring.get_name(), + self._item_monitoring.get_thread_id(), + username=self._username + ) except Exception as e: self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}") diff --git a/requirements.txt b/requirements.txt index 2b52220..3872a23 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,4 @@ python-telegram-bot==21.6 PyYAML==6.0.2 Requests==2.32.3 Pandas -redis==5.0.1 \ No newline at end of file +pymongo==4.6.0 \ No newline at end of file diff --git a/wallabicher.py b/wallabicher.py index 3fcf7c0..231207b 100644 --- a/wallabicher.py +++ b/wallabicher.py @@ -7,6 +7,8 @@ import shutil import yaml import time import threading +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure from datalayer.item_monitor import ItemMonitor from datalayer.general_monitor import GeneralMonitor @@ -18,12 +20,13 @@ def initialize_config_files(): """ Inicializa los archivos de configuración si no existen, copiando los archivos .sample correspondientes. + Nota: workers.json ya no se usa, los workers se almacenan en MongoDB. """ base_dir = os.path.dirname(os.path.abspath(__file__)) config_files = [ ('config.yaml', 'config.sample.yaml'), - ('workers.json', 'workers.sample.json') + # workers.json ya no se usa, se almacena en MongoDB ] for config_file, sample_file in config_files: @@ -69,16 +72,172 @@ def configure_logger(): logging.basicConfig(level=logging.NOTSET, handlers=[console_handler, file_handler]) -def parse_items_to_monitor(): - import os +def get_mongodb_client_for_workers(): + """Obtiene un cliente MongoDB para leer workers""" base_dir = os.path.dirname(os.path.abspath(__file__)) - workers_path = os.path.join(base_dir, "workers.json") - with open(workers_path) as f: - args = json.load(f) - if 'items' not in args: - raise ValueError("Missing mandatory field: items") - items = [ItemMonitor.load_from_json(item) for item in args['items']] - general_args = GeneralMonitor.load_from_json(args['general']) + config_path = os.path.join(base_dir, "config.yaml") + logger = logging.getLogger(__name__) + + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + cache_config = config.get('cache', {}) + + if cache_config.get('type') == 'mongodb': + mongodb_config = cache_config.get('mongodb', {}) + mongodb_host = os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost') + mongodb_port = int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)) + mongodb_database = os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher') + mongodb_username = os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username') + mongodb_password = os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password') + mongodb_auth_source = mongodb_config.get('auth_source', 'admin') + + # Construir URL de conexión + if mongodb_username and mongodb_password: + connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}" + else: + connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/" + + mongo_client = MongoClient( + connection_string, + serverSelectionTimeoutMS=5000, + connectTimeoutMS=5000, + socketTimeoutMS=5000 + ) + + # Verificar conexión + mongo_client.admin.command('ping') + db = mongo_client[mongodb_database] + return db + else: + logger.error("MongoDB no está configurado. Workers requieren MongoDB.") + return None + except Exception as e: + logger.error(f"Error conectando a MongoDB para workers: {e}") + return None + +def get_workers_from_mongodb(username=None): + """Obtiene workers de MongoDB para un usuario específico o todos los usuarios""" + logger = logging.getLogger(__name__) + db = get_mongodb_client_for_workers() + + if db is None: + logger.error("No se pudo conectar a MongoDB para obtener workers") + return { + 'general': {'title_exclude': [], 'description_exclude': []}, + 'items': [], + 'disabled': [], + 'workers_by_user': {} # Mapeo de username -> lista de items + } + + try: + workers_collection = db['workers'] + + # Si se especifica un usuario, obtener solo sus workers + if username: + workers_data = workers_collection.find_one({ 'username': username }) + + if not workers_data: + logger.warning(f"No se encontraron workers para el usuario '{username}' en MongoDB") + return { + 'general': {'title_exclude': [], 'description_exclude': []}, + 'items': [], + 'disabled': [], + 'workers_by_user': {username: []} + } + + # Remover campos de MongoDB + workers_data.pop('_id', None) + workers_data.pop('updatedAt', None) + workers_data.pop('createdAt', None) + username_value = workers_data.pop('username') + + data = workers_data + data['workers_by_user'] = {username_value: data.get('items', [])} + return data + else: + # Obtener workers de todos los usuarios + all_workers = list(workers_collection.find({})) + + if not all_workers: + logger.warning("No se encontraron workers en MongoDB") + return { + 'general': {'title_exclude': [], 'description_exclude': []}, + 'items': [], + 'disabled': [], + 'workers_by_user': {} + } + + # Combinar workers de todos los usuarios + all_items = [] + all_disabled = [] + workers_by_user = {} # Mapeo de username -> lista de items + # Combinar las exclusiones generales (unión de todas) + general_title_exclude = set() + general_description_exclude = set() + + for user_workers_doc in all_workers: + try: + username_from_doc = user_workers_doc.get('username') + if not username_from_doc: + continue + + # Remover campos de MongoDB + user_workers = {k: v for k, v in user_workers_doc.items() + if k not in ['_id', 'username', 'updatedAt', 'createdAt']} + + user_items = user_workers.get('items', []) + + # Guardar el mapeo de usuario -> items + workers_by_user[username_from_doc] = user_items + + # Agregar items del usuario + for item in user_items: + all_items.append(item) + + # Agregar disabled del usuario + for disabled_name in user_workers.get('disabled', []): + all_disabled.append(disabled_name) + + # Combinar exclusiones generales + user_general = user_workers.get('general', {}) + general_title_exclude.update(user_general.get('title_exclude', [])) + general_description_exclude.update(user_general.get('description_exclude', [])) + + logger.debug(f"Cargados {len(user_items)} workers del usuario '{username_from_doc}'") + except Exception as e: + logger.error(f"Error procesando workers del usuario '{user_workers_doc.get('username', 'unknown')}': {e}") + continue + + logger.info(f"Total de workers cargados: {len(all_items)} items de {len(all_workers)} usuarios") + + return { + 'general': { + 'title_exclude': list(general_title_exclude), + 'description_exclude': list(general_description_exclude) + }, + 'items': all_items, + 'disabled': all_disabled, + 'workers_by_user': workers_by_user # Mapeo de username -> lista de items + } + except Exception as e: + logger.error(f"Error obteniendo workers de MongoDB: {e}") + return { + 'general': {'title_exclude': [], 'description_exclude': []}, + 'items': [], + 'disabled': [], + 'workers_by_user': {} + } + +def parse_items_to_monitor(username=None): + """Parsea items a monitorear desde MongoDB (todos los usuarios si username es None)""" + args = get_workers_from_mongodb(username) + + if 'items' not in args: + raise ValueError("Missing mandatory field: items") + + items = [ItemMonitor.load_from_json(item) for item in args.get('items', [])] + general_args = GeneralMonitor.load_from_json(args.get('general', {})) return items, general_args def load_cache_config(): @@ -91,91 +250,126 @@ def load_cache_config(): with open(config_path, 'r') as f: config = yaml.safe_load(f) cache_config = config.get('cache', {}) - cache_type = cache_config.get('type', 'redis') + cache_type = cache_config.get('type', 'mongodb') - if cache_type == 'redis': - redis_config = cache_config.get('redis', {}) + if cache_type == 'mongodb': + mongodb_config = cache_config.get('mongodb', {}) return { - 'cache_type': 'redis', - 'redis_host': redis_config.get('host', 'localhost'), - 'redis_port': redis_config.get('port', 6379), - 'redis_db': redis_config.get('db', 0), - 'redis_password': redis_config.get('password') + 'cache_type': 'mongodb', + 'mongodb_host': os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost'), + 'mongodb_port': int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)), + 'mongodb_database': os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher'), + 'mongodb_username': os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username'), + 'mongodb_password': os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password'), + 'mongodb_auth_source': mongodb_config.get('auth_source', 'admin') } else: - logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'redis' por defecto") + logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'mongodb' por defecto") return { - 'cache_type': 'redis', - 'redis_host': 'localhost', - 'redis_port': 6379, - 'redis_db': 0, - 'redis_password': None + 'cache_type': 'mongodb', + 'mongodb_host': 'localhost', + 'mongodb_port': 27017, + 'mongodb_database': 'wallabicher', + 'mongodb_username': None, + 'mongodb_password': None, + 'mongodb_auth_source': 'admin' } except Exception as e: - logger.warning(f"Error cargando configuración de cache, usando valores por defecto (redis): {e}") + logger.warning(f"Error cargando configuración de cache, usando valores por defecto (mongodb): {e}") return { - 'cache_type': 'redis', - 'redis_host': 'localhost', - 'redis_port': 6379, - 'redis_db': 0, - 'redis_password': None + 'cache_type': 'mongodb', + 'mongodb_host': 'localhost', + 'mongodb_port': 27017, + 'mongodb_database': 'wallabicher', + 'mongodb_username': None, + 'mongodb_password': None, + 'mongodb_auth_source': 'admin' } class WorkerManager: - """Gestiona workers dinámicamente, iniciando y deteniendo según workers.json""" + """Gestiona workers dinámicamente, iniciando y deteniendo según configuración en MongoDB""" def __init__(self, general_args, queue_manager): self.logger = logging.getLogger(__name__) self._general_args = general_args self._queue_manager = queue_manager self._executor = ThreadPoolExecutor(max_workers=1000) - self._workers = {} # Dict: worker_name -> {'worker': Worker, 'future': Future, 'item': ItemMonitor} + self._workers = {} # Dict: worker_id -> {'worker': Worker, 'future': Future, 'item': ItemMonitor, 'username': str} + self._worker_to_username = {} # Mapeo de worker_id -> username + self._worker_id_to_name = {} # Mapeo de worker_id -> worker_name (para compatibilidad) self._running = True self._lock = threading.Lock() - base_dir = os.path.dirname(os.path.abspath(__file__)) - self._workers_path = os.path.join(base_dir, "workers.json") + self._db = get_mongodb_client_for_workers() def load_workers_config(self): - """Carga la configuración de workers desde workers.json""" + """Carga la configuración de workers desde MongoDB (todos los usuarios)""" try: - with open(self._workers_path, 'r') as f: - config = json.load(f) + config = get_workers_from_mongodb(None) # None = todos los usuarios disabled = set(config.get('disabled', [])) items = [] + workers_by_user = config.get('workers_by_user', {}) + + # Crear mapeos de worker_id -> username y worker_name -> username (para compatibilidad) + worker_id_to_username = {} + worker_name_to_username = {} + for username, user_items in workers_by_user.items(): + for item_data in user_items: + worker_name = item_data.get('name') + worker_id = item_data.get('id') or item_data.get('worker_id') + if worker_id: + worker_id_to_username[worker_id] = username + if worker_name: + worker_name_to_username[worker_name] = username + for item_data in config.get('items', []): item = ItemMonitor.load_from_json(item_data) - items.append((item, item.get_name() not in disabled)) + worker_id = item.get_id() + worker_name = item.get_name() + # Obtener el username del worker (priorizar por ID, luego por nombre para compatibilidad) + username = worker_id_to_username.get(worker_id) or worker_name_to_username.get(worker_name) + # Verificar si está deshabilitado (por ID o por nombre para compatibilidad) + is_disabled = worker_id in disabled or worker_name in disabled + items.append((item, not is_disabled, username)) + general = GeneralMonitor.load_from_json(config.get('general', {})) - return items, general, disabled + return items, general, disabled, worker_id_to_username except Exception as e: - self.logger.error(f"Error cargando workers.json: {e}") - return [], GeneralMonitor([], [], [], [], []), set() + self.logger.error(f"Error cargando workers de MongoDB: {e}") + return [], GeneralMonitor([], [], [], [], []), set(), {} - def start_worker(self, item, general_args): + def start_worker(self, item, general_args, username=None): """Inicia un worker""" try: - worker = Worker(item, general_args, self._queue_manager) + worker = Worker(item, general_args, self._queue_manager, username=username) future = self._executor.submit(worker.run) - self._workers[item.get_name()] = { + worker_id = item.get_id() + worker_name = item.get_name() + self._workers[worker_id] = { 'worker': worker, 'future': future, - 'item': item + 'item': item, + 'username': username } - self.logger.info(f"Worker '{item.get_name()}' iniciado") + # Actualizar mapeos + if username: + self._worker_to_username[worker_id] = username + self._worker_id_to_name[worker_id] = worker_name + self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) iniciado para usuario '{username or 'unknown'}'") return True except Exception as e: - self.logger.error(f"Error iniciando worker '{item.get_name()}': {e}") + self.logger.error(f"Error iniciando worker '{item.get_name()}' (ID: {item.get_id()}): {e}") return False - def stop_worker(self, worker_name): - """Detiene un worker""" - if worker_name not in self._workers: + def stop_worker(self, worker_id): + """Detiene un worker por su ID""" + if worker_id not in self._workers: return False try: - worker_data = self._workers[worker_name] + worker_data = self._workers[worker_id] worker = worker_data['worker'] future = worker_data['future'] + worker_name = self._worker_id_to_name.get(worker_id, worker_id) # Detener el worker worker.stop() @@ -185,26 +379,37 @@ class WorkerManager: future.cancel() # No esperamos a que termine, se detendrá automáticamente - del self._workers[worker_name] - self.logger.info(f"Worker '{worker_name}' detenido") + del self._workers[worker_id] + if worker_id in self._worker_to_username: + del self._worker_to_username[worker_id] + if worker_id in self._worker_id_to_name: + del self._worker_id_to_name[worker_id] + self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido") return True except Exception as e: - self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") + self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}") # Asegurarse de eliminar la entrada aunque haya error - if worker_name in self._workers: - del self._workers[worker_name] + if worker_id in self._workers: + del self._workers[worker_id] + if worker_id in self._worker_to_username: + del self._worker_to_username[worker_id] + if worker_id in self._worker_id_to_name: + del self._worker_id_to_name[worker_id] return False def sync_workers(self): - """Sincroniza los workers con workers.json""" - items, general_args, disabled = self.load_workers_config() + """Sincroniza los workers con la configuración en MongoDB (todos los usuarios)""" + items, general_args, disabled, worker_id_to_username = self.load_workers_config() + + # Actualizar mapeo global + self._worker_to_username = worker_id_to_username # Actualizar general_args en todos los workers activos old_general_args = self._general_args self._general_args = general_args - current_worker_names = set(self._workers.keys()) - enabled_worker_names = {item.get_name() for item, enabled in items if enabled} + current_worker_ids = set(self._workers.keys()) + enabled_worker_ids = {item.get_id() for item, enabled, username in items if enabled} # Actualizar general_args en workers existentes with self._lock: @@ -212,53 +417,65 @@ class WorkerManager: worker_data['worker'].update_general_args(general_args) # Detener workers que ya no existen o están deshabilitados - to_stop = current_worker_names - enabled_worker_names - for worker_name in list(to_stop): + to_stop = current_worker_ids - enabled_worker_ids + for worker_id in list(to_stop): with self._lock: - if worker_name in self._workers: - worker = self._workers[worker_name]['worker'] + if worker_id in self._workers: + worker = self._workers[worker_id]['worker'] worker.stop() - future = self._workers[worker_name]['future'] + future = self._workers[worker_id]['future'] if not future.done(): future.cancel() - del self._workers[worker_name] - self.logger.info(f"Worker '{worker_name}' detenido") + worker_name = self._worker_id_to_name.get(worker_id, worker_id) + del self._workers[worker_id] + if worker_id in self._worker_to_username: + del self._worker_to_username[worker_id] + if worker_id in self._worker_id_to_name: + del self._worker_id_to_name[worker_id] + self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido") # Iniciar workers nuevos o que se hayan habilitado - for item, enabled in items: + for item, enabled, username in items: + worker_id = item.get_id() worker_name = item.get_name() - if enabled and worker_name not in current_worker_names: + if enabled and worker_id not in current_worker_ids: # Nuevo worker o recién activado with self._lock: - if worker_name not in self._workers: - self.start_worker(item, general_args) - elif enabled and worker_name in current_worker_names: + if worker_id not in self._workers: + self.start_worker(item, general_args, username=username) + elif enabled and worker_id in current_worker_ids: # Worker existente, verificar si hay cambios significativos needs_restart = False with self._lock: - if worker_name in self._workers: - old_item = self._workers[worker_name]['item'] - # Comparar si hay cambios significativos - if self._has_changes(old_item, item): - self.logger.info(f"Reiniciando worker '{worker_name}' por cambios en configuración") - worker = self._workers[worker_name]['worker'] + if worker_id in self._workers: + old_item = self._workers[worker_id]['item'] + old_username = self._workers[worker_id].get('username') + # Comparar si hay cambios significativos o cambio de usuario + if self._has_changes(old_item, item) or old_username != username: + self.logger.info(f"Reiniciando worker '{worker_name}' (ID: {worker_id}) por cambios en configuración") + worker = self._workers[worker_id]['worker'] worker.stop() - future = self._workers[worker_name]['future'] + future = self._workers[worker_id]['future'] if not future.done(): future.cancel() - del self._workers[worker_name] + del self._workers[worker_id] + if worker_id in self._worker_to_username: + del self._worker_to_username[worker_id] + if worker_id in self._worker_id_to_name: + del self._worker_id_to_name[worker_id] needs_restart = True else: # Actualizar la referencia al item sin reiniciar - self._workers[worker_name]['item'] = item + self._workers[worker_id]['item'] = item + self._workers[worker_id]['username'] = username # Actualizar general_args en el worker - self._workers[worker_name]['worker'].update_general_args(general_args) + self._workers[worker_id]['worker'].update_general_args(general_args) # Reiniciar fuera del lock para evitar deadlocks if needs_restart: time.sleep(0.5) # Dar tiempo para detener with self._lock: - if worker_name not in self._workers: # Verificar que no se haya añadido en otro thread - self.start_worker(item, general_args) + if worker_id not in self._workers: # Verificar que no se haya añadido en otro thread + self.start_worker(item, general_args, username=username) def _has_changes(self, old_item, new_item): """Verifica si hay cambios significativos entre dos items""" @@ -275,56 +492,81 @@ class WorkerManager: old_item.get_max_distance() != new_item.get_max_distance() ) - def monitor_workers_file(self): - """Monitorea el archivo workers.json y sincroniza workers usando polling""" - self.logger.info("Iniciando monitor de workers.json...") - last_mtime = 0 - - try: - if os.path.exists(self._workers_path): - last_mtime = os.path.getmtime(self._workers_path) - except Exception as e: - self.logger.warning(f"Error obteniendo mtime inicial: {e}") + def monitor_workers_mongodb(self): + """Monitorea los workers en MongoDB (todos los usuarios) y sincroniza workers usando polling""" + self.logger.info("Iniciando monitor de workers en MongoDB (todos los usuarios)...") + last_config_hash = None while self._running: try: time.sleep(2) # Verificar cada 2 segundos - if not os.path.exists(self._workers_path): - continue + if self._db is None: + self._db = get_mongodb_client_for_workers() + if self._db is None: + time.sleep(5) # Esperar más tiempo si no hay MongoDB + continue - current_mtime = os.path.getmtime(self._workers_path) - if current_mtime != last_mtime: - self.logger.info("Detectado cambio en workers.json, sincronizando workers...") - time.sleep(0.5) # Esperar un poco para que se termine de escribir el archivo - self.sync_workers() - last_mtime = current_mtime + # Obtener configuración actual de MongoDB (todos los usuarios) + try: + workers_collection = self._db['workers'] + all_workers = list(workers_collection.find({}, {'_id': 0})) + + # Crear un hash combinado de todos los workers + if all_workers: + import hashlib + from datetime import datetime + # Función para convertir datetime a string para serialización JSON + def json_serial(obj): + if isinstance(obj, datetime): + return obj.isoformat() + raise TypeError(f"Type {type(obj)} not serializable") + all_configs_json = json.dumps(all_workers, sort_keys=True, default=json_serial) + current_config_hash = hashlib.md5(all_configs_json.encode()).hexdigest() + else: + current_config_hash = None + + # Si la configuración cambió, sincronizar + if current_config_hash != last_config_hash: + if last_config_hash is not None: # Solo loguear si no es la primera vez + self.logger.info(f"Detectado cambio en workers de MongoDB (usuarios: {len(all_workers)}), sincronizando workers...") + time.sleep(0.5) # Esperar un poco para asegurar consistencia + self.sync_workers() + last_config_hash = current_config_hash + except ConnectionFailure as e: + self.logger.error(f"Error leyendo workers de MongoDB: {e}") + time.sleep(5) # Esperar más tiempo si hay error + self._db = None # Intentar reconectar except Exception as e: - self.logger.error(f"Error monitoreando workers.json: {e}") + self.logger.error(f"Error monitoreando workers en MongoDB: {e}") time.sleep(5) # Esperar más tiempo si hay error def start_monitoring(self): """Inicia el monitoreo en un thread separado""" - monitor_thread = threading.Thread(target=self.monitor_workers_file, daemon=True) + monitor_thread = threading.Thread(target=self.monitor_workers_mongodb, daemon=True) monitor_thread.start() return monitor_thread def stop_all(self): """Detiene todos los workers""" self._running = False - worker_names = list(self._workers.keys()) - for worker_name in worker_names: + worker_ids = list(self._workers.keys()) + for worker_id in worker_ids: try: with self._lock: - if worker_name in self._workers: - worker = self._workers[worker_name]['worker'] + if worker_id in self._workers: + worker = self._workers[worker_id]['worker'] worker.stop() - future = self._workers[worker_name]['future'] + future = self._workers[worker_id]['future'] if not future.done(): future.cancel() - del self._workers[worker_name] + del self._workers[worker_id] + if worker_id in self._worker_to_username: + del self._worker_to_username[worker_id] + if worker_id in self._worker_id_to_name: + del self._worker_id_to_name[worker_id] except Exception as e: - self.logger.error(f"Error deteniendo worker '{worker_name}': {e}") + self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}") self._executor.shutdown(wait=False) self.logger.info("Todos los workers detenidos") @@ -340,21 +582,33 @@ if __name__ == "__main__": cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'} article_cache = create_article_cache(cache_type, **cache_kwargs) - # Crear QueueManager con ArticleCache - queue_manager = QueueManager(article_cache) + # Preparar configuración de MongoDB para TelegramManagerFactory + mongodb_config = { + 'mongodb': { + 'host': cache_config.get('mongodb_host', 'localhost'), + 'port': cache_config.get('mongodb_port', 27017), + 'database': cache_config.get('mongodb_database', 'wallabicher'), + 'username': cache_config.get('mongodb_username'), + 'password': cache_config.get('mongodb_password'), + 'auth_source': cache_config.get('mongodb_auth_source', 'admin') + } + } - # Crear WorkerManager - items, general_args = parse_items_to_monitor() + # Crear QueueManager con ArticleCache y configuración de MongoDB + queue_manager = QueueManager(article_cache, mongodb_config) + + # Crear WorkerManager (carga workers de todos los usuarios) + items, general_args = parse_items_to_monitor(None) # None = todos los usuarios worker_manager = WorkerManager(general_args, queue_manager) # Sincronizar workers iniciales worker_manager.sync_workers() - # Iniciar monitoreo del archivo workers.json + # Iniciar monitoreo de workers en MongoDB (todos los usuarios) worker_manager.start_monitoring() try: - logger.info("Sistema de monitoreo iniciado. Esperando cambios en workers.json...") + logger.info("Sistema de monitoreo iniciado (todos los usuarios). Esperando cambios en MongoDB...") # Mantener el programa corriendo while True: time.sleep(60) diff --git a/workers.sample.json b/workers.sample.json deleted file mode 100644 index e68785c..0000000 --- a/workers.sample.json +++ /dev/null @@ -1,75 +0,0 @@ -{ - "general": { - "title_exclude": ["flexicar", "giochi", "gioco", "joco", "carta", "cartas", "coche" , "patines"], - "description_exclude": ["flexicar", "giochi", "gioco", "joco", "carta", "cartas", "coche" , "patinete"] - }, - "items": [ - { - "name": "Playstation 1", - "search_query": "playstation", - "title_exclude" : ["playstation 2", "playstation 3", "playstation 4", "playstation 5", "ps2", "ps3", "ps4", "ps5"], - "description_exclude": ["playstation 2", "playstation 3", "playstation 4", "playstation 5", "ps2", "ps3", "ps4", "ps5"], - "thread_id": 8 - }, - { - "name": "Playstation 2", - "search_query": "playstation", - "title_exclude" : ["ps1", "playstation 3", "playstation 4", "playstation 5", "ps3", "ps4", "ps5"], - "description_exclude": ["ps1", "playstation 3", "playstation 4", "playstation 5", "ps3", "ps4", "ps5"], - "thread_id": 9 - }, - { - "name": "Gameboy", - "search_query": "gameboy", - "title_exclude" : ["carcasa", "solo carcasa", "pantalla rota", "para piezas", "estropeado"], - "description_exclude": ["carcasa", "solo carcasa", "pantalla rota", "para piezas", "estropeado"], - "thread_id": 10 - }, - { - "name": "Gameboy LOCAL", - "latitude": 41.3163083, - "longitude": 2.0156034, - "search_query": "gameboy", - "max_distance":"25", - "title_exclude" : ["carcasa", "solo carcasa", "pantalla rota", "para piezas", "estropeado"], - "description_exclude": ["carcasa", "solo carcasa", "pantalla rota", "para piezas", "estropeado"], - "thread_id": 10 - }, - { - "name": "Nintendo 64", - "search_query": "nintendo 64", - "thread_id": 10 - }, - { - "name": "Digivice", - "search_query": "digivice", - "thread_id": 12 - }, - { - "name": "Toy Story", - "search_query": "toy story", - "thread_id": 12 - }, - { - "name": "PC Big Box", - "search_query": "pc big box", - "thread_id": 11 - }, - { - "name": "VHS", - "latitude": 41.3163083, - "longitude": 2.0156034, - "max_distance":"25", - "search_query": "vhs", - "thread_id": 12 - }, - { - "name": "VHS Vinted", - "platform": "vinted", - "search_query": "vhs", - "thread_id": 3150 - } - ], - "disabled": [ - ] -}