import logging from pymongo import MongoClient from pymongo.operations import UpdateOne from pymongo.errors import ConnectionFailure, DuplicateKeyError from datetime import datetime, timedelta import json NOTIFIED_ARTICLE_TTL_DAYS = 7 # TTL de 7 días class MongoDBArticleCache: """Maneja el cache de artículos notificados usando MongoDB""" def __init__(self, mongodb_host='localhost', mongodb_port=27017, mongodb_database='wallabicher', mongodb_username=None, mongodb_password=None, mongodb_auth_source='admin'): self.logger = logging.getLogger(__name__) # Inicializar conexión MongoDB try: # Construir URL de conexión if mongodb_username and mongodb_password: connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}" else: connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/" self._mongo_client = MongoClient( connection_string, serverSelectionTimeoutMS=5000, connectTimeoutMS=5000, socketTimeoutMS=5000 ) # Verificar conexión self._mongo_client.admin.command('ping') self._db = self._mongo_client[mongodb_database] self._articles_collection = self._db['articles'] self._workers_collection = self._db['workers'] # Crear índices self._create_indexes() self.logger.info(f"Conectado a MongoDB en {mongodb_host}:{mongodb_port} (db={mongodb_database})") except ConnectionFailure as e: self.logger.error(f"Error conectando a MongoDB: {e}") self.logger.error("MongoDB no está disponible. El sistema no podrá evitar duplicados sin MongoDB.") raise except Exception as e: self.logger.error(f"Error inesperado inicializando MongoDB: {e}") raise def _create_indexes(self): """Crea los índices necesarios en MongoDB""" try: # Índices para artículos self._articles_collection.create_index([('platform', 1), ('id', 1)], unique=True) self._articles_collection.create_index([('username', 1)]) self._articles_collection.create_index([('worker_name', 1)]) self._articles_collection.create_index([('expiresAt', 1)], expireAfterSeconds=0) # Índices para workers self._workers_collection.create_index([('username', 1)], unique=True) self.logger.debug("Índices de MongoDB creados") except Exception as e: self.logger.warning(f"Error creando índices de MongoDB: {e}") def _get_article_key(self, article): """Genera una clave única para un artículo (solo para compatibilidad)""" return f"notified:{article.get_platform()}:{article.get_id()}" def is_article_notified(self, article): """Verifica si un artículo ya ha sido notificado""" try: count = self._articles_collection.count_documents({ 'platform': article.get_platform(), 'id': str(article.get_id()) }) return count > 0 except Exception as e: self.logger.error(f"Error verificando artículo en MongoDB: {e}") return False def mark_article_as_notified(self, article, username=None, worker_name=None): """Marca un artículo como notificado en MongoDB con TTL usando estructura user_info""" # Delegar a mark_articles_as_notified para evitar duplicación de código self.mark_articles_as_notified([article], username=username, worker_name=worker_name) def mark_articles_as_notified(self, articles, username=None, worker_name=None): """Añade múltiples artículos a la lista de artículos ya notificados en MongoDB usando estructura user_info""" if not articles: return article_list = articles if isinstance(articles, list) else [articles] try: expires_at = datetime.utcnow() + timedelta(days=NOTIFIED_ARTICLE_TTL_DAYS) now = datetime.utcnow() # Obtener todos los artículos existentes de una vez para optimizar article_ids = [(a.get_platform(), str(a.get_id())) for a in article_list] existing_articles = { (a['platform'], a['id']): a for a in self._articles_collection.find({ '$or': [{'platform': p, 'id': i} for p, i in article_ids] }) } operations = [] for article in article_list: platform = article.get_platform() article_id = str(article.get_id()) key = (platform, article_id) # Preparar datos del artículo (sin user_info) article_data = { 'platform': platform, 'id': article_id, 'title': article.get_title(), 'description': article._description, 'price': article.get_price(), 'currency': article.get_currency(), 'location': article.get_location(), 'allows_shipping': article._allows_shipping, 'url': article.get_url(), 'images': article.get_images(), 'modified_at': article.get_modified_at(), 'expiresAt': expires_at, 'updatedAt': now, } # Preparar user_info existing = existing_articles.get(key) if existing: # Artículo existe, actualizar user_info existing_user_info = existing.get('user_info', []) # Buscar si ya existe user_info para este usuario user_info_index = None for i, ui in enumerate(existing_user_info): if ui.get('username') == username: user_info_index = i break if user_info_index is not None: # Actualizar user_info existente pero mantener notified_at original existing_user_info_entry = existing_user_info[user_info_index] existing_user_info[user_info_index] = { 'username': existing_user_info_entry.get('username'), 'worker_name': worker_name or existing_user_info_entry.get('worker_name'), 'notified': True, 'notified_at': existing_user_info_entry.get('notified_at'), # Preservar notified_at original 'is_favorite': existing_user_info_entry.get('is_favorite', False), } else: # Añadir nuevo user_info existing_user_info.append({ 'username': username, 'worker_name': worker_name, 'notified': True, 'notified_at': now, 'is_favorite': False, }) # Solo actualizar precio si es diferente, no actualizar fechas de notificación existing_price = existing.get('price') new_price = article.get_price() if existing_price == new_price: # Si el precio es el mismo, solo actualizar user_info y eliminar notifiedAt a nivel de artículo operations.append( UpdateOne( {'platform': platform, 'id': article_id}, { '$set': {'user_info': existing_user_info}, '$unset': {'notifiedAt': '', 'notified_at': ''} } ) ) else: # Precio diferente, actualizar artículo completo # Mantener updatedAt para saber cuándo cambió el precio # Eliminar notifiedAt/notified_at a nivel de artículo (solo debe estar en user_info) article_data['user_info'] = existing_user_info operations.append( UpdateOne( {'platform': platform, 'id': article_id}, { '$set': article_data, '$unset': {'notifiedAt': '', 'notified_at': ''} } ) ) else: # Artículo nuevo article_data['user_info'] = [{ 'username': username, 'worker_name': worker_name, 'notified': True, 'notified_at': now, 'is_favorite': False, }] article_data['createdAt'] = now operations.append( UpdateOne( {'platform': platform, 'id': article_id}, { '$set': article_data, }, upsert=True ) ) if operations: self._articles_collection.bulk_write(operations, ordered=False) self.logger.debug(f"{len(operations)} artículos añadidos/actualizados en MongoDB") except Exception as e: self.logger.error(f"Error añadiendo artículos a MongoDB: {e}") def set_favorite(self, platform, article_id, username, is_favorite=True): """Marca o desmarca un artículo como favorito en MongoDB para un usuario específico""" try: if not username: self.logger.error("username es requerido para marcar favoritos") return False article = self._articles_collection.find_one({ 'platform': platform, 'id': str(article_id) }) if not article: self.logger.error(f"Artículo no encontrado: {platform}:{article_id}") return False user_info_list = article.get('user_info', []) user_info_index = None # Buscar user_info para este usuario for i, ui in enumerate(user_info_list): if ui.get('username') == username: user_info_index = i break if user_info_index is not None: # Actualizar user_info existente user_info_list[user_info_index]['is_favorite'] = is_favorite else: # Si no existe user_info para este usuario, crear uno user_info_list.append({ 'username': username, 'worker_name': None, 'notified': False, 'notified_at': None, 'is_favorite': is_favorite, }) self._articles_collection.update_one( {'platform': platform, 'id': str(article_id)}, { '$set': { 'user_info': user_info_list, 'updatedAt': datetime.utcnow() } } ) return True except Exception as e: self.logger.error(f"Error marcando favorito en MongoDB: {e}") return False def get_favorites(self, username=None): """Obtiene todos los artículos marcados como favoritos, opcionalmente filtrados por usuario""" try: # Buscar artículos que tengan favoritos en user_info query = {'user_info.is_favorite': True} if username: query['user_info.username'] = username articles = list(self._articles_collection.find(query)) # Filtrar y transformar para devolver solo los favoritos relevantes favorites = [] for article in articles: user_info_list = article.get('user_info', []) for user_info in user_info_list: if user_info.get('is_favorite') and (not username or user_info.get('username') == username): fav = dict(article) fav['_id'] = str(fav['_id']) fav['username'] = user_info.get('username') fav['worker_name'] = user_info.get('worker_name') fav['is_favorite'] = True # Convertir fechas a timestamps/isoformat notified_at = user_info.get('notified_at') if isinstance(notified_at, datetime): fav['notifiedAt'] = notified_at.isoformat() elif 'notifiedAt' in fav and isinstance(fav['notifiedAt'], datetime): fav['notifiedAt'] = fav['notifiedAt'].isoformat() if 'expiresAt' in fav and isinstance(fav['expiresAt'], datetime): fav['expiresAt'] = fav['expiresAt'].isoformat() favorites.append(fav) return favorites except Exception as e: self.logger.error(f"Error obteniendo favoritos de MongoDB: {e}") return [] def is_favorite(self, platform, article_id, username=None): """Verifica si un artículo es favorito para un usuario específico""" try: article = self._articles_collection.find_one({ 'platform': platform, 'id': str(article_id) }) if not article: return False # Buscar en user_info si hay username if username: user_info_list = article.get('user_info', []) for ui in user_info_list: if ui.get('username') == username: return ui.get('is_favorite', False) return False else: # Sin username, verificar si hay algún favorito (compatibilidad) user_info_list = article.get('user_info', []) for ui in user_info_list: if ui.get('is_favorite'): return True # Compatibilidad con estructura antigua return article.get('is_favorite', False) except Exception as e: self.logger.error(f"Error verificando favorito en MongoDB: {e}") return False def clear_cache(self): """Elimina toda la caché de artículos notificados en MongoDB""" try: result = self._articles_collection.delete_many({}) count = result.deleted_count self.logger.info(f"Cache de MongoDB limpiado: {count} artículos eliminados") return count except Exception as e: self.logger.error(f"Error limpiando cache de MongoDB: {e}") return 0 def create_article_cache(cache_type='mongodb', **kwargs): """ Factory function para crear el cache de artículos usando MongoDB. Args: cache_type: 'mongodb' (solo MongoDB está soportado) **kwargs: Argumentos para MongoDB: - mongodb_host: host de MongoDB (default: 'localhost') - mongodb_port: puerto de MongoDB (default: 27017) - mongodb_database: base de datos (default: 'wallabicher') - mongodb_username: usuario de MongoDB (opcional) - mongodb_password: contraseña de MongoDB (opcional) - mongodb_auth_source: base de datos para autenticación (default: 'admin') Returns: MongoDBArticleCache """ if cache_type != 'mongodb': raise ValueError(f"Tipo de cache desconocido: {cache_type}. Solo se soporta 'mongodb'") return MongoDBArticleCache( mongodb_host=kwargs.get('mongodb_host', 'localhost'), mongodb_port=kwargs.get('mongodb_port', 27017), mongodb_database=kwargs.get('mongodb_database', 'wallabicher'), mongodb_username=kwargs.get('mongodb_username'), mongodb_password=kwargs.get('mongodb_password'), mongodb_auth_source=kwargs.get('mongodb_auth_source', 'admin') )