diff --git a/managers/article_cache.py b/managers/article_cache.py deleted file mode 100644 index d2b57d5..0000000 --- a/managers/article_cache.py +++ /dev/null @@ -1,267 +0,0 @@ -import logging -import json - -# Importar MongoDBArticleCache desde mongodb_manager -from managers.mongodb_manager import MongoDBArticleCache - -NOTIFIED_ARTICLE_TTL = 7 * 24 * 60 * 60 # TTL de 7 días en segundos para artículos notificados (mantener para compatibilidad) - -class RedisArticleCache: - """Maneja el cache de artículos notificados usando Redis""" - - def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0, redis_password=None): - self.logger = logging.getLogger(__name__) - - # Inicializar conexión Redis - try: - self._redis_client = redis.Redis( - host=redis_host, - port=redis_port, - db=redis_db, - password=redis_password, - decode_responses=True, - socket_connect_timeout=5, - socket_timeout=5 - ) - # Verificar conexión - self._redis_client.ping() - self.logger.info(f"Conectado a Redis en {redis_host}:{redis_port} (db={redis_db})") - except (redis.ConnectionError, redis.TimeoutError) as e: - self.logger.error(f"Error conectando a Redis: {e}") - self.logger.error("Redis no está disponible. El sistema no podrá evitar duplicados sin Redis.") - raise - except Exception as e: - self.logger.error(f"Error inesperado inicializando Redis: {e}") - raise - - def _get_article_key(self, article): - """Genera una clave única para un artículo en Redis""" - return f"notified:{article.get_platform()}:{article.get_id()}" - - def is_article_notified(self, article): - """Verifica si un artículo ya ha sido notificado""" - try: - key = self._get_article_key(article) - return self._redis_client.exists(key) > 0 - except Exception as e: - self.logger.error(f"Error verificando artículo en Redis: {e}") - return False - - def mark_article_as_notified(self, article, username=None, worker_name=None): - """Marca un artículo como notificado en Redis con TTL, guardando toda la información del artículo""" - try: - key = self._get_article_key(article) - # Guardar toda la información del artículo como JSON - # Verificar si el artículo ya existe para mantener el estado de favorito, username y worker_name - existing_value = self._redis_client.get(key) - is_favorite = False - existing_username = None - existing_worker_name = None - if existing_value: - try: - existing_data = json.loads(existing_value) - is_favorite = existing_data.get('is_favorite', False) - existing_username = existing_data.get('username') - existing_worker_name = existing_data.get('worker_name') - except json.JSONDecodeError: - pass - - # Mantener username y worker_name existentes si ya existen, o usar los nuevos si se proporcionan - final_username = existing_username if existing_username else username - final_worker_name = existing_worker_name if existing_worker_name else worker_name - - article_data = { - 'id': article.get_id(), - 'title': article.get_title(), - 'description': article._description, # Acceder al campo privado para obtener la descripción completa - 'price': article.get_price(), - 'currency': article.get_currency(), - 'location': article.get_location(), - 'allows_shipping': article._allows_shipping, # Acceder al campo privado para obtener el valor booleano - 'url': article.get_url(), - 'images': article.get_images(), - 'modified_at': article.get_modified_at(), - 'platform': article.get_platform(), - 'is_favorite': is_favorite, # Mantener el estado de favorito - } - - # Añadir username y worker_name si están disponibles - if final_username: - article_data['username'] = final_username - if final_worker_name: - article_data['worker_name'] = final_worker_name - - self._redis_client.setex(key, NOTIFIED_ARTICLE_TTL, json.dumps(article_data)) - except Exception as e: - self.logger.error(f"Error marcando artículo como notificado en Redis: {e}") - - def mark_articles_as_notified(self, articles, username=None, worker_name=None): - """Añade múltiples artículos a la lista de artículos ya notificados en Redis""" - article_list = articles if isinstance(articles, list) else [articles] - - try: - # Verificar qué artículos ya existen antes de añadirlos - # Usar pipeline para mejor rendimiento al verificar múltiples artículos - pipe = self._redis_client.pipeline() - keys_to_check = [] - for article in article_list: - key = self._get_article_key(article) - keys_to_check.append((article, key)) - pipe.exists(key) - - # Ejecutar las verificaciones - exists_results = pipe.execute() - - # Ahora añadir solo los artículos que no existen - pipe = self._redis_client.pipeline() - added_count = 0 - skipped_count = 0 - - for (article, key), exists in zip(keys_to_check, exists_results): - if exists > 0: - # El artículo ya existe, no hacer nada - skipped_count += 1 - continue - - # El artículo no existe, añadirlo - # Guardar toda la información del artículo como JSON - article_data = { - 'id': article.get_id(), - 'title': article.get_title(), - 'description': article._description, # Acceder al campo privado para obtener la descripción completa - 'price': article.get_price(), - 'currency': article.get_currency(), - 'location': article.get_location(), - 'allows_shipping': article._allows_shipping, # Acceder al campo privado para obtener el valor booleano - 'url': article.get_url(), - 'images': article.get_images(), - 'modified_at': article.get_modified_at(), - 'platform': article.get_platform(), - 'is_favorite': False, # Por defecto no es favorito - } - - # Añadir username y worker_name si están disponibles - if username: - article_data['username'] = username - if worker_name: - article_data['worker_name'] = worker_name - - pipe.setex(key, NOTIFIED_ARTICLE_TTL, json.dumps(article_data)) - added_count += 1 - - # Ejecutar solo si hay artículos para añadir - if added_count > 0: - pipe.execute() - - self.logger.debug(f"{added_count} artículos añadidos, {skipped_count} ya existían en Redis") - except Exception as e: - self.logger.error(f"Error añadiendo artículos a Redis: {e}") - - def set_favorite(self, platform, article_id, is_favorite=True): - """Marca o desmarca un artículo como favorito en Redis""" - try: - key = f"notified:{platform}:{article_id}" - value = self._redis_client.get(key) - if value: - article_data = json.loads(value) - article_data['is_favorite'] = is_favorite - # Mantener el TTL existente o usar el default - ttl = self._redis_client.ttl(key) - if ttl > 0: - self._redis_client.setex(key, ttl, json.dumps(article_data)) - else: - self._redis_client.setex(key, NOTIFIED_ARTICLE_TTL, json.dumps(article_data)) - return True - return False - except Exception as e: - self.logger.error(f"Error marcando favorito en Redis: {e}") - return False - - def get_favorites(self): - """Obtiene todos los artículos marcados como favoritos""" - try: - keys = self._redis_client.keys('notified:*') - favorites = [] - for key in keys: - value = self._redis_client.get(key) - if value: - try: - article_data = json.loads(value) - if article_data.get('is_favorite', False): - favorites.append(article_data) - except json.JSONDecodeError: - continue - return favorites - except Exception as e: - self.logger.error(f"Error obteniendo favoritos de Redis: {e}") - return [] - - def is_favorite(self, platform, article_id): - """Verifica si un artículo es favorito""" - try: - key = f"notified:{platform}:{article_id}" - value = self._redis_client.get(key) - if value: - article_data = json.loads(value) - return article_data.get('is_favorite', False) - return False - except Exception as e: - self.logger.error(f"Error verificando favorito en Redis: {e}") - return False - - def clear_cache(self): - """Elimina toda la caché de artículos notificados en Redis""" - try: - # Obtener todas las claves que empiezan con 'notified:' - keys = self._redis_client.keys('notified:*') - - if not keys: - self.logger.info("Cache de Redis ya está vacío") - return 0 - - # Eliminar todas las claves usando pipeline para mejor rendimiento - count = len(keys) - pipe = self._redis_client.pipeline() - for key in keys: - pipe.delete(key) - pipe.execute() - - self.logger.info(f"Cache de Redis limpiado: {count} artículos eliminados") - return count - except Exception as e: - self.logger.error(f"Error limpiando cache de Redis: {e}") - return 0 - - -def create_article_cache(cache_type='mongodb', **kwargs): - """ - Factory function para crear el cache de artículos usando MongoDB. - - Args: - cache_type: 'mongodb' (solo MongoDB está soportado) - **kwargs: Argumentos para MongoDB: - - mongodb_host: host de MongoDB (default: 'localhost') - - mongodb_port: puerto de MongoDB (default: 27017) - - mongodb_database: base de datos (default: 'wallabicher') - - mongodb_username: usuario de MongoDB (opcional) - - mongodb_password: contraseña de MongoDB (opcional) - - mongodb_auth_source: base de datos para autenticación (default: 'admin') - - Returns: - MongoDBArticleCache - """ - if cache_type == 'mongodb': - return MongoDBArticleCache( - mongodb_host=kwargs.get('mongodb_host', 'localhost'), - mongodb_port=kwargs.get('mongodb_port', 27017), - mongodb_database=kwargs.get('mongodb_database', 'wallabicher'), - mongodb_username=kwargs.get('mongodb_username'), - mongodb_password=kwargs.get('mongodb_password'), - mongodb_auth_source=kwargs.get('mongodb_auth_source', 'admin') - ) - elif cache_type == 'redis': - # Mantener compatibilidad con Redis (deprecated) - raise ValueError("Redis ya no está soportado. Por favor, usa MongoDB.") - else: - raise ValueError(f"Tipo de cache desconocido: {cache_type}. Solo se soporta 'mongodb'") - diff --git a/managers/mongodb_manager.py b/managers/mongodb_manager.py index 98154de..bb793c6 100644 --- a/managers/mongodb_manager.py +++ b/managers/mongodb_manager.py @@ -54,7 +54,6 @@ class MongoDBArticleCache: self._articles_collection.create_index([('platform', 1), ('id', 1)], unique=True) self._articles_collection.create_index([('username', 1)]) self._articles_collection.create_index([('worker_name', 1)]) - self._articles_collection.create_index([('notifiedAt', -1)]) self._articles_collection.create_index([('expiresAt', 1)], expireAfterSeconds=0) # Índices para workers @@ -82,95 +81,8 @@ class MongoDBArticleCache: def mark_article_as_notified(self, article, username=None, worker_name=None): """Marca un artículo como notificado en MongoDB con TTL usando estructura user_info""" - try: - expires_at = datetime.utcnow() + timedelta(days=NOTIFIED_ARTICLE_TTL_DAYS) - - # Obtener artículo existente si existe - existing = self._articles_collection.find_one({ - 'platform': article.get_platform(), - 'id': str(article.get_id()) - }) - - # Preparar datos del artículo (sin user_info) - article_data = { - 'platform': article.get_platform(), - 'id': str(article.get_id()), - 'title': article.get_title(), - 'description': article._description, - 'price': article.get_price(), - 'currency': article.get_currency(), - 'location': article.get_location(), - 'allows_shipping': article._allows_shipping, - 'url': article.get_url(), - 'images': article.get_images(), - 'modified_at': article.get_modified_at(), - 'expiresAt': expires_at, - 'updatedAt': datetime.utcnow(), - } - - # Preparar user_info para este usuario/worker - user_info_entry = { - 'username': username, - 'worker_name': worker_name, - 'notified': True, - 'notified_at': datetime.utcnow(), - 'is_favorite': False, - } - - if existing: - # Artículo existe, actualizar o añadir user_info - existing_user_info = existing.get('user_info', []) - - # Buscar si ya existe un user_info para este usuario - user_info_index = None - for i, ui in enumerate(existing_user_info): - if ui.get('username') == username: - user_info_index = i - break - - if user_info_index is not None: - # Actualizar user_info existente pero mantener notified_at original - existing_user_info[user_info_index].update({ - 'worker_name': worker_name or existing_user_info[user_info_index].get('worker_name'), - 'notified': True, - # NO actualizar notified_at, mantener el valor existente - # Mantener is_favorite existente - 'is_favorite': existing_user_info[user_info_index].get('is_favorite', False), - }) - else: - # Añadir nuevo user_info - existing_user_info.append(user_info_entry) - - article_data['user_info'] = existing_user_info - - # Solo actualizar precio si es diferente, no actualizar fechas de notificación - existing_price = existing.get('price') - new_price = article.get_price() - if existing_price == new_price: - # Si el precio es el mismo, no actualizar el artículo - # Solo actualizar user_info si cambió - self._articles_collection.update_one( - {'platform': article.get_platform(), 'id': str(article.get_id())}, - {'$set': {'user_info': existing_user_info}} - ) - else: - # Precio diferente, actualizar artículo completo - # Mantener updatedAt para saber cuándo cambió el precio - # pero NO actualizar notified_at (ya se mantiene arriba) - self._articles_collection.update_one( - {'platform': article.get_platform(), 'id': str(article.get_id())}, - {'$set': article_data} - ) - else: - # Artículo nuevo, crear con user_info - article_data['user_info'] = [user_info_entry] - article_data['createdAt'] = datetime.utcnow() - - # Insertar nuevo artículo - self._articles_collection.insert_one(article_data) - - except Exception as e: - self.logger.error(f"Error marcando artículo como notificado en MongoDB: {e}") + # Delegar a mark_articles_as_notified para evitar duplicación de código + self.mark_articles_as_notified([article], username=username, worker_name=worker_name) def mark_articles_as_notified(self, articles, username=None, worker_name=None): """Añade múltiples artículos a la lista de artículos ya notificados en MongoDB usando estructura user_info""" @@ -230,13 +142,14 @@ class MongoDBArticleCache: if user_info_index is not None: # Actualizar user_info existente pero mantener notified_at original - existing_user_info[user_info_index].update({ - 'worker_name': worker_name or existing_user_info[user_info_index].get('worker_name'), + existing_user_info_entry = existing_user_info[user_info_index] + existing_user_info[user_info_index] = { + 'username': existing_user_info_entry.get('username'), + 'worker_name': worker_name or existing_user_info_entry.get('worker_name'), 'notified': True, - # NO actualizar notified_at, mantener el valor existente - # Mantener is_favorite existente - 'is_favorite': existing_user_info[user_info_index].get('is_favorite', False), - }) + 'notified_at': existing_user_info_entry.get('notified_at'), # Preservar notified_at original + 'is_favorite': existing_user_info_entry.get('is_favorite', False), + } else: # Añadir nuevo user_info existing_user_info.append({ @@ -251,22 +164,28 @@ class MongoDBArticleCache: existing_price = existing.get('price') new_price = article.get_price() if existing_price == new_price: - # Si el precio es el mismo, solo actualizar user_info + # Si el precio es el mismo, solo actualizar user_info y eliminar notifiedAt a nivel de artículo operations.append( UpdateOne( {'platform': platform, 'id': article_id}, - {'$set': {'user_info': existing_user_info}} + { + '$set': {'user_info': existing_user_info}, + '$unset': {'notifiedAt': '', 'notified_at': ''} + } ) ) else: # Precio diferente, actualizar artículo completo # Mantener updatedAt para saber cuándo cambió el precio - # pero NO actualizar notified_at (ya se mantiene arriba) + # Eliminar notifiedAt/notified_at a nivel de artículo (solo debe estar en user_info) article_data['user_info'] = existing_user_info operations.append( UpdateOne( {'platform': platform, 'id': article_id}, - {'$set': article_data} + { + '$set': article_data, + '$unset': {'notifiedAt': '', 'notified_at': ''} + } ) ) else: diff --git a/managers/telegram_manager.py b/managers/telegram_manager.py index 9c43854..8d12fc7 100644 --- a/managers/telegram_manager.py +++ b/managers/telegram_manager.py @@ -10,7 +10,7 @@ import json import logging import threading from datetime import datetime -from managers.article_cache import create_article_cache +from managers.mongodb_manager import create_article_cache ITEM_HTML = """ Artículo: {title} diff --git a/wallabicher.py b/wallabicher.py index 231207b..d148ab9 100644 --- a/wallabicher.py +++ b/wallabicher.py @@ -14,7 +14,7 @@ from datalayer.item_monitor import ItemMonitor from datalayer.general_monitor import GeneralMonitor from managers.worker import Worker from managers.queue_manager import QueueManager -from managers.article_cache import create_article_cache +from managers.mongodb_manager import create_article_cache def initialize_config_files(): """ diff --git a/web/backend/services/mongodb.js b/web/backend/services/mongodb.js index b1fd94b..c492744 100644 --- a/web/backend/services/mongodb.js +++ b/web/backend/services/mongodb.js @@ -15,9 +15,6 @@ let config = null; // Duración de sesión en milisegundos (24 horas) const SESSION_DURATION = 24 * 60 * 60 * 1000; -// TTL de artículos notificados en milisegundos (7 días) -const NOTIFIED_ARTICLE_TTL = 7 * 24 * 60 * 60 * 1000; - // Inicializar MongoDB si está configurado export async function initMongoDB() { try { @@ -105,7 +102,6 @@ async function createIndexes() { // Índices de compatibilidad con estructura antigua await db.collection('articles').createIndex({ username: 1 }); await db.collection('articles').createIndex({ worker_name: 1 }); - await db.collection('articles').createIndex({ notifiedAt: -1 }); console.log('✅ Índices de MongoDB creados'); } catch (error) { @@ -704,107 +700,6 @@ export async function deleteUserSessions(username) { } // Funciones para artículos -export async function saveArticle(articleData) { - if (!db) { - throw new Error('MongoDB no está disponible'); - } - - try { - const articlesCollection = db.collection('articles'); - const expiresAt = new Date(Date.now() + NOTIFIED_ARTICLE_TTL); - - // Extraer datos del artículo (sin user_info) - const { - platform, - id, - username, - worker_name, - ...articleFields - } = articleData; - - // Buscar artículo existente - const existing = await articlesCollection.findOne({ platform, id }); - - // Preparar user_info para este usuario/worker - const userInfoEntry = { - username: username || null, - worker_name: worker_name || null, - notified: true, - notified_at: new Date(), - is_favorite: false, - }; - - if (existing) { - // Artículo existe, actualizar o añadir user_info - const existingUserInfo = existing.user_info || []; - - // Buscar si ya existe un user_info para este usuario - const existingUserInfoIndex = existingUserInfo.findIndex( - ui => ui.username === username - ); - - if (existingUserInfoIndex >= 0) { - // Actualizar user_info existente pero mantener notified_at original - existingUserInfo[existingUserInfoIndex] = { - ...existingUserInfo[existingUserInfoIndex], - worker_name: worker_name || existingUserInfo[existingUserInfoIndex].worker_name, - notified: true, - // NO actualizar notified_at, mantener el valor existente - }; - } else { - // Añadir nuevo user_info - existingUserInfo.push(userInfoEntry); - } - - // Solo actualizar precio si es diferente, no actualizar fechas de notificación - const existingPrice = existing.price; - const newPrice = articleFields.price; - if (existingPrice === newPrice) { - // Si el precio es el mismo, solo actualizar user_info - await articlesCollection.updateOne( - { platform, id }, - { - $set: { - user_info: existingUserInfo, - } - } - ); - } else { - // Precio diferente, actualizar artículo completo - // Mantener updatedAt para saber cuándo cambió el precio - // pero NO actualizar notified_at (ya se mantiene arriba) - await articlesCollection.updateOne( - { platform, id }, - { - $set: { - ...articleFields, - user_info: existingUserInfo, - expiresAt, - updatedAt: new Date(), - } - } - ); - } - } else { - // Artículo nuevo, crear con user_info - await articlesCollection.insertOne({ - platform, - id, - ...articleFields, - user_info: [userInfoEntry], - expiresAt, - createdAt: new Date(), - updatedAt: new Date(), - }); - } - - return true; - } catch (error) { - console.error('Error guardando artículo:', error.message); - throw error; - } -} - export async function getArticle(platform, id) { if (!db) { return null;