383 lines
17 KiB
Python
383 lines
17 KiB
Python
import logging
|
|
from pymongo import MongoClient
|
|
from pymongo.operations import UpdateOne
|
|
from pymongo.errors import ConnectionFailure, DuplicateKeyError
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
|
|
NOTIFIED_ARTICLE_TTL_DAYS = 7 # TTL de 7 días
|
|
|
|
class MongoDBArticleCache:
|
|
"""Maneja el cache de artículos notificados usando MongoDB"""
|
|
|
|
def __init__(self, mongodb_host='localhost', mongodb_port=27017, mongodb_database='wallabicher',
|
|
mongodb_username=None, mongodb_password=None, mongodb_auth_source='admin'):
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# Inicializar conexión MongoDB
|
|
try:
|
|
# Construir URL de conexión
|
|
if mongodb_username and mongodb_password:
|
|
connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}"
|
|
else:
|
|
connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/"
|
|
|
|
self._mongo_client = MongoClient(
|
|
connection_string,
|
|
serverSelectionTimeoutMS=5000,
|
|
connectTimeoutMS=5000,
|
|
socketTimeoutMS=5000
|
|
)
|
|
|
|
# Verificar conexión
|
|
self._mongo_client.admin.command('ping')
|
|
self._db = self._mongo_client[mongodb_database]
|
|
self._articles_collection = self._db['articles']
|
|
self._workers_collection = self._db['workers']
|
|
|
|
# Crear índices
|
|
self._create_indexes()
|
|
|
|
self.logger.info(f"Conectado a MongoDB en {mongodb_host}:{mongodb_port} (db={mongodb_database})")
|
|
except ConnectionFailure as e:
|
|
self.logger.error(f"Error conectando a MongoDB: {e}")
|
|
self.logger.error("MongoDB no está disponible. El sistema no podrá evitar duplicados sin MongoDB.")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.error(f"Error inesperado inicializando MongoDB: {e}")
|
|
raise
|
|
|
|
def _create_indexes(self):
|
|
"""Crea los índices necesarios en MongoDB"""
|
|
try:
|
|
# Índices para artículos
|
|
self._articles_collection.create_index([('platform', 1), ('id', 1)], unique=True)
|
|
self._articles_collection.create_index([('username', 1)])
|
|
self._articles_collection.create_index([('worker_name', 1)])
|
|
self._articles_collection.create_index([('expiresAt', 1)], expireAfterSeconds=0)
|
|
|
|
# Índices para workers
|
|
self._workers_collection.create_index([('username', 1)], unique=True)
|
|
|
|
self.logger.debug("Índices de MongoDB creados")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error creando índices de MongoDB: {e}")
|
|
|
|
def _get_article_key(self, article):
|
|
"""Genera una clave única para un artículo (solo para compatibilidad)"""
|
|
return f"notified:{article.get_platform()}:{article.get_id()}"
|
|
|
|
def is_article_notified(self, article):
|
|
"""Verifica si un artículo ya ha sido notificado"""
|
|
try:
|
|
count = self._articles_collection.count_documents({
|
|
'platform': article.get_platform(),
|
|
'id': str(article.get_id())
|
|
})
|
|
return count > 0
|
|
except Exception as e:
|
|
self.logger.error(f"Error verificando artículo en MongoDB: {e}")
|
|
return False
|
|
|
|
def mark_article_as_notified(self, article, username=None, worker_name=None):
|
|
"""Marca un artículo como notificado en MongoDB con TTL usando estructura user_info"""
|
|
# Delegar a mark_articles_as_notified para evitar duplicación de código
|
|
self.mark_articles_as_notified([article], username=username, worker_name=worker_name)
|
|
|
|
def mark_articles_as_notified(self, articles, username=None, worker_name=None):
|
|
"""Añade múltiples artículos a la lista de artículos ya notificados en MongoDB usando estructura user_info"""
|
|
if not articles:
|
|
return
|
|
|
|
article_list = articles if isinstance(articles, list) else [articles]
|
|
|
|
try:
|
|
expires_at = datetime.utcnow() + timedelta(days=NOTIFIED_ARTICLE_TTL_DAYS)
|
|
now = datetime.utcnow()
|
|
|
|
operations = []
|
|
for article in article_list:
|
|
platform = article.get_platform()
|
|
article_id = str(article.get_id())
|
|
|
|
# Consultar MongoDB directamente para cada artículo
|
|
# Esto asegura que siempre encontremos el artículo si existe,
|
|
# independientemente de la lista pasada como parámetro
|
|
existing = self._articles_collection.find_one({
|
|
'platform': platform,
|
|
'id': article_id
|
|
})
|
|
|
|
# Campos base del artículo (sin user_info, sin createdAt/updatedAt)
|
|
base_fields = {
|
|
'platform': platform,
|
|
'id': article_id,
|
|
'title': article.get_title(),
|
|
'description': article._description,
|
|
'price': article.get_price(),
|
|
'currency': article.get_currency(),
|
|
'location': article.get_location(),
|
|
'allows_shipping': article._allows_shipping,
|
|
'url': article.get_url(),
|
|
'images': article.get_images(),
|
|
'modified_at': article.get_modified_at(),
|
|
'expiresAt': expires_at,
|
|
}
|
|
|
|
# Preparar user_info basándose en lo que existe realmente en MongoDB
|
|
if existing:
|
|
# Artículo existe, actualizar user_info
|
|
existing_user_info = existing.get('user_info', [])
|
|
|
|
# Buscar si ya existe user_info para este usuario
|
|
user_info_index = None
|
|
for i, ui in enumerate(existing_user_info):
|
|
if ui.get('username') == username:
|
|
user_info_index = i
|
|
break
|
|
|
|
if user_info_index is not None:
|
|
# Actualizar user_info existente pero mantener notified_at original
|
|
existing_user_info_entry = existing_user_info[user_info_index]
|
|
existing_user_info[user_info_index] = {
|
|
'username': existing_user_info_entry.get('username'),
|
|
'worker_name': worker_name or existing_user_info_entry.get('worker_name'),
|
|
'notified': True,
|
|
'notified_at': existing_user_info_entry.get('notified_at'), # Preservar notified_at original
|
|
'is_favorite': existing_user_info_entry.get('is_favorite', False),
|
|
}
|
|
else:
|
|
# Añadir nuevo user_info
|
|
existing_user_info.append({
|
|
'username': username,
|
|
'worker_name': worker_name,
|
|
'notified': True,
|
|
'notified_at': now,
|
|
'is_favorite': False,
|
|
})
|
|
|
|
# Solo actualizar precio si es diferente, no actualizar fechas de notificación
|
|
existing_price = existing.get('price')
|
|
new_price = article.get_price()
|
|
if existing_price == new_price:
|
|
# Si el precio es el mismo, solo actualizar user_info y eliminar notifiedAt a nivel de artículo
|
|
operations.append(
|
|
UpdateOne(
|
|
{'platform': platform, 'id': article_id},
|
|
{
|
|
'$set': {'user_info': existing_user_info},
|
|
'$unset': {'notifiedAt': '', 'notified_at': ''}
|
|
}
|
|
)
|
|
)
|
|
else:
|
|
# Precio diferente, actualizar campos del artículo + user_info
|
|
# Mantener createdAt/updatedAt originales (no se incluyen en el $set)
|
|
update_data = dict(base_fields)
|
|
update_data['user_info'] = existing_user_info
|
|
operations.append(
|
|
UpdateOne(
|
|
{'platform': platform, 'id': article_id},
|
|
{
|
|
'$set': update_data,
|
|
'$unset': {'notifiedAt': '', 'notified_at': ''}
|
|
}
|
|
)
|
|
)
|
|
else:
|
|
# Artículo nuevo
|
|
new_article_data = dict(base_fields)
|
|
new_article_data['user_info'] = [{
|
|
'username': username,
|
|
'worker_name': worker_name,
|
|
'notified': True,
|
|
'notified_at': now,
|
|
'is_favorite': False,
|
|
}]
|
|
|
|
# Usar $setOnInsert para createdAt y updatedAt
|
|
# Solo se establecerán si el documento es nuevo (insert), no si ya existe (update)
|
|
# Si el documento ya existe, $setOnInsert NO se ejecuta, por lo que updatedAt no se actualiza
|
|
operations.append(
|
|
UpdateOne(
|
|
{'platform': platform, 'id': article_id},
|
|
{
|
|
'$set': new_article_data,
|
|
'$setOnInsert': {
|
|
'createdAt': now,
|
|
'updatedAt': now
|
|
}
|
|
},
|
|
upsert=True
|
|
)
|
|
)
|
|
|
|
if operations:
|
|
self._articles_collection.bulk_write(operations, ordered=False)
|
|
self.logger.debug(f"{len(operations)} artículos añadidos/actualizados en MongoDB")
|
|
except Exception as e:
|
|
self.logger.error(f"Error añadiendo artículos a MongoDB: {e}")
|
|
|
|
def set_favorite(self, platform, article_id, username, is_favorite=True):
|
|
"""Marca o desmarca un artículo como favorito en MongoDB para un usuario específico"""
|
|
try:
|
|
if not username:
|
|
self.logger.error("username es requerido para marcar favoritos")
|
|
return False
|
|
|
|
article = self._articles_collection.find_one({
|
|
'platform': platform,
|
|
'id': str(article_id)
|
|
})
|
|
|
|
if not article:
|
|
self.logger.error(f"Artículo no encontrado: {platform}:{article_id}")
|
|
return False
|
|
|
|
user_info_list = article.get('user_info', [])
|
|
user_info_index = None
|
|
|
|
# Buscar user_info para este usuario
|
|
for i, ui in enumerate(user_info_list):
|
|
if ui.get('username') == username:
|
|
user_info_index = i
|
|
break
|
|
|
|
if user_info_index is not None:
|
|
# Actualizar user_info existente
|
|
user_info_list[user_info_index]['is_favorite'] = is_favorite
|
|
else:
|
|
# Si no existe user_info para este usuario, crear uno
|
|
user_info_list.append({
|
|
'username': username,
|
|
'worker_name': None,
|
|
'notified': False,
|
|
'notified_at': None,
|
|
'is_favorite': is_favorite,
|
|
})
|
|
|
|
self._articles_collection.update_one(
|
|
{'platform': platform, 'id': str(article_id)},
|
|
{
|
|
'$set': {
|
|
'user_info': user_info_list
|
|
}
|
|
}
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Error marcando favorito en MongoDB: {e}")
|
|
return False
|
|
|
|
def get_favorites(self, username=None):
|
|
"""Obtiene todos los artículos marcados como favoritos, opcionalmente filtrados por usuario"""
|
|
try:
|
|
# Buscar artículos que tengan favoritos en user_info
|
|
query = {'user_info.is_favorite': True}
|
|
if username:
|
|
query['user_info.username'] = username
|
|
|
|
articles = list(self._articles_collection.find(query))
|
|
|
|
# Filtrar y transformar para devolver solo los favoritos relevantes
|
|
favorites = []
|
|
for article in articles:
|
|
user_info_list = article.get('user_info', [])
|
|
|
|
for user_info in user_info_list:
|
|
if user_info.get('is_favorite') and (not username or user_info.get('username') == username):
|
|
fav = dict(article)
|
|
fav['_id'] = str(fav['_id'])
|
|
fav['username'] = user_info.get('username')
|
|
fav['worker_name'] = user_info.get('worker_name')
|
|
fav['is_favorite'] = True
|
|
|
|
# Convertir fechas a timestamps/isoformat
|
|
notified_at = user_info.get('notified_at')
|
|
if isinstance(notified_at, datetime):
|
|
fav['notifiedAt'] = notified_at.isoformat()
|
|
elif 'notifiedAt' in fav and isinstance(fav['notifiedAt'], datetime):
|
|
fav['notifiedAt'] = fav['notifiedAt'].isoformat()
|
|
|
|
if 'expiresAt' in fav and isinstance(fav['expiresAt'], datetime):
|
|
fav['expiresAt'] = fav['expiresAt'].isoformat()
|
|
|
|
favorites.append(fav)
|
|
|
|
return favorites
|
|
except Exception as e:
|
|
self.logger.error(f"Error obteniendo favoritos de MongoDB: {e}")
|
|
return []
|
|
|
|
def is_favorite(self, platform, article_id, username=None):
|
|
"""Verifica si un artículo es favorito para un usuario específico"""
|
|
try:
|
|
article = self._articles_collection.find_one({
|
|
'platform': platform,
|
|
'id': str(article_id)
|
|
})
|
|
|
|
if not article:
|
|
return False
|
|
|
|
# Buscar en user_info si hay username
|
|
if username:
|
|
user_info_list = article.get('user_info', [])
|
|
for ui in user_info_list:
|
|
if ui.get('username') == username:
|
|
return ui.get('is_favorite', False)
|
|
return False
|
|
else:
|
|
# Sin username, verificar si hay algún favorito (compatibilidad)
|
|
user_info_list = article.get('user_info', [])
|
|
for ui in user_info_list:
|
|
if ui.get('is_favorite'):
|
|
return True
|
|
# Compatibilidad con estructura antigua
|
|
return article.get('is_favorite', False)
|
|
except Exception as e:
|
|
self.logger.error(f"Error verificando favorito en MongoDB: {e}")
|
|
return False
|
|
|
|
def clear_cache(self):
|
|
"""Elimina toda la caché de artículos notificados en MongoDB"""
|
|
try:
|
|
result = self._articles_collection.delete_many({})
|
|
count = result.deleted_count
|
|
self.logger.info(f"Cache de MongoDB limpiado: {count} artículos eliminados")
|
|
return count
|
|
except Exception as e:
|
|
self.logger.error(f"Error limpiando cache de MongoDB: {e}")
|
|
return 0
|
|
|
|
|
|
def create_article_cache(cache_type='mongodb', **kwargs):
|
|
"""
|
|
Factory function para crear el cache de artículos usando MongoDB.
|
|
|
|
Args:
|
|
cache_type: 'mongodb' (solo MongoDB está soportado)
|
|
**kwargs: Argumentos para MongoDB:
|
|
- mongodb_host: host de MongoDB (default: 'localhost')
|
|
- mongodb_port: puerto de MongoDB (default: 27017)
|
|
- mongodb_database: base de datos (default: 'wallabicher')
|
|
- mongodb_username: usuario de MongoDB (opcional)
|
|
- mongodb_password: contraseña de MongoDB (opcional)
|
|
- mongodb_auth_source: base de datos para autenticación (default: 'admin')
|
|
|
|
Returns:
|
|
MongoDBArticleCache
|
|
"""
|
|
if cache_type != 'mongodb':
|
|
raise ValueError(f"Tipo de cache desconocido: {cache_type}. Solo se soporta 'mongodb'")
|
|
|
|
return MongoDBArticleCache(
|
|
mongodb_host=kwargs.get('mongodb_host', 'localhost'),
|
|
mongodb_port=kwargs.get('mongodb_port', 27017),
|
|
mongodb_database=kwargs.get('mongodb_database', 'wallabicher'),
|
|
mongodb_username=kwargs.get('mongodb_username'),
|
|
mongodb_password=kwargs.get('mongodb_password'),
|
|
mongodb_auth_source=kwargs.get('mongodb_auth_source', 'admin')
|
|
)
|
|
|