Files
wallabicher/wallabicher.py
Omar Sánchez Pizarro a65bedf24d no update fechas
Signed-off-by: Omar Sánchez Pizarro <omar.sanchez@pistacero.net>
2026-01-20 12:29:22 +01:00

622 lines
29 KiB
Python

import json
import logging
from logging.handlers import RotatingFileHandler
from concurrent.futures import ThreadPoolExecutor, Future
import os
import shutil
import yaml
import time
import threading
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from datalayer.item_monitor import ItemMonitor
from datalayer.general_monitor import GeneralMonitor
from managers.worker import Worker
from managers.queue_manager import QueueManager
from managers.mongodb_manager import create_article_cache
def initialize_config_files():
"""
Inicializa los archivos de configuración si no existen,
copiando los archivos .sample correspondientes.
Nota: workers.json ya no se usa, los workers se almacenan en MongoDB.
"""
base_dir = os.path.dirname(os.path.abspath(__file__))
config_files = [
('config.yaml', 'config.sample.yaml'),
# workers.json ya no se usa, se almacena en MongoDB
]
for config_file, sample_file in config_files:
config_path = os.path.join(base_dir, config_file)
sample_path = os.path.join(base_dir, sample_file)
if not os.path.exists(config_path):
if os.path.exists(sample_path):
shutil.copy2(sample_path, config_path)
print(f"✓ Archivo '{config_file}' creado desde '{sample_file}'")
print(f" Por favor, edita '{config_file}' con tu configuración antes de continuar.")
else:
raise FileNotFoundError(
f"No se encontró ni '{config_file}' ni '{sample_file}'. "
f"Por favor, crea uno de estos archivos."
)
def configure_logger():
import os
logging.getLogger("httpx").setLevel(logging.WARNING)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(levelname)s [%(asctime)s] %(name)s - %(message)s'))
# Determinar la ruta del archivo de log
# En Docker, usar /app/logs si existe, sino usar el directorio actual
if os.path.isdir('/app/logs'):
log_path = '/app/logs/monitor.log'
else:
log_path = 'monitor.log'
# Asegurarse de que el directorio existe
log_dir = os.path.dirname(log_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
file_handler = RotatingFileHandler(log_path, maxBytes=10e6)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# Configure the root logger with both handlers
logging.basicConfig(level=logging.NOTSET,
handlers=[console_handler, file_handler])
def get_mongodb_client_for_workers():
"""Obtiene un cliente MongoDB para leer workers"""
base_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(base_dir, "config.yaml")
logger = logging.getLogger(__name__)
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
cache_config = config.get('cache', {})
if cache_config.get('type') == 'mongodb':
mongodb_config = cache_config.get('mongodb', {})
mongodb_host = os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost')
mongodb_port = int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017))
mongodb_database = os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher')
mongodb_username = os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username')
mongodb_password = os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password')
mongodb_auth_source = mongodb_config.get('auth_source', 'admin')
# Construir URL de conexión
if mongodb_username and mongodb_password:
connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}"
else:
connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/"
mongo_client = MongoClient(
connection_string,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=5000,
socketTimeoutMS=5000
)
# Verificar conexión
mongo_client.admin.command('ping')
db = mongo_client[mongodb_database]
return db
else:
logger.error("MongoDB no está configurado. Workers requieren MongoDB.")
return None
except Exception as e:
logger.error(f"Error conectando a MongoDB para workers: {e}")
return None
def get_workers_from_mongodb(username=None):
"""Obtiene workers de MongoDB para un usuario específico o todos los usuarios"""
logger = logging.getLogger(__name__)
db = get_mongodb_client_for_workers()
if db is None:
logger.error("No se pudo conectar a MongoDB para obtener workers")
return {
'general': {'title_exclude': [], 'description_exclude': []},
'items': [],
'disabled': [],
'workers_by_user': {} # Mapeo de username -> lista de items
}
try:
workers_collection = db['workers']
# Si se especifica un usuario, obtener solo sus workers
if username:
workers_data = workers_collection.find_one({ 'username': username })
if not workers_data:
logger.warning(f"No se encontraron workers para el usuario '{username}' en MongoDB")
return {
'general': {'title_exclude': [], 'description_exclude': []},
'items': [],
'disabled': [],
'workers_by_user': {username: []}
}
# Remover campos de MongoDB
workers_data.pop('_id', None)
workers_data.pop('updatedAt', None)
workers_data.pop('createdAt', None)
username_value = workers_data.pop('username')
data = workers_data
data['workers_by_user'] = {username_value: data.get('items', [])}
return data
else:
# Obtener workers de todos los usuarios
all_workers = list(workers_collection.find({}))
if not all_workers:
logger.warning("No se encontraron workers en MongoDB")
return {
'general': {'title_exclude': [], 'description_exclude': []},
'items': [],
'disabled': [],
'workers_by_user': {}
}
# Combinar workers de todos los usuarios
all_items = []
all_disabled = []
workers_by_user = {} # Mapeo de username -> lista de items
# Combinar las exclusiones generales (unión de todas)
general_title_exclude = set()
general_description_exclude = set()
for user_workers_doc in all_workers:
try:
username_from_doc = user_workers_doc.get('username')
if not username_from_doc:
continue
# Remover campos de MongoDB
user_workers = {k: v for k, v in user_workers_doc.items()
if k not in ['_id', 'username', 'updatedAt', 'createdAt']}
user_items = user_workers.get('items', [])
# Guardar el mapeo de usuario -> items
workers_by_user[username_from_doc] = user_items
# Agregar items del usuario
for item in user_items:
all_items.append(item)
# Agregar disabled del usuario
for disabled_name in user_workers.get('disabled', []):
all_disabled.append(disabled_name)
# Combinar exclusiones generales
user_general = user_workers.get('general', {})
general_title_exclude.update(user_general.get('title_exclude', []))
general_description_exclude.update(user_general.get('description_exclude', []))
logger.debug(f"Cargados {len(user_items)} workers del usuario '{username_from_doc}'")
except Exception as e:
logger.error(f"Error procesando workers del usuario '{user_workers_doc.get('username', 'unknown')}': {e}")
continue
logger.info(f"Total de workers cargados: {len(all_items)} items de {len(all_workers)} usuarios")
return {
'general': {
'title_exclude': list(general_title_exclude),
'description_exclude': list(general_description_exclude)
},
'items': all_items,
'disabled': all_disabled,
'workers_by_user': workers_by_user # Mapeo de username -> lista de items
}
except Exception as e:
logger.error(f"Error obteniendo workers de MongoDB: {e}")
return {
'general': {'title_exclude': [], 'description_exclude': []},
'items': [],
'disabled': [],
'workers_by_user': {}
}
def parse_items_to_monitor(username=None):
"""Parsea items a monitorear desde MongoDB (todos los usuarios si username es None)"""
args = get_workers_from_mongodb(username)
if 'items' not in args:
raise ValueError("Missing mandatory field: items")
items = [ItemMonitor.load_from_json(item) for item in args.get('items', [])]
general_args = GeneralMonitor.load_from_json(args.get('general', {}))
return items, general_args
def load_cache_config():
"""Carga la configuración del cache desde config.yaml"""
base_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(base_dir, "config.yaml")
logger = logging.getLogger(__name__)
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
cache_config = config.get('cache', {})
cache_type = cache_config.get('type', 'mongodb')
if cache_type == 'mongodb':
mongodb_config = cache_config.get('mongodb', {})
return {
'cache_type': 'mongodb',
'mongodb_host': os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost'),
'mongodb_port': int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)),
'mongodb_database': os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher'),
'mongodb_username': os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username'),
'mongodb_password': os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password'),
'mongodb_auth_source': mongodb_config.get('auth_source', 'admin')
}
else:
logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'mongodb' por defecto")
return {
'cache_type': 'mongodb',
'mongodb_host': 'localhost',
'mongodb_port': 27017,
'mongodb_database': 'wallabicher',
'mongodb_username': None,
'mongodb_password': None,
'mongodb_auth_source': 'admin'
}
except Exception as e:
logger.warning(f"Error cargando configuración de cache, usando valores por defecto (mongodb): {e}")
return {
'cache_type': 'mongodb',
'mongodb_host': 'localhost',
'mongodb_port': 27017,
'mongodb_database': 'wallabicher',
'mongodb_username': None,
'mongodb_password': None,
'mongodb_auth_source': 'admin'
}
class WorkerManager:
"""Gestiona workers dinámicamente, iniciando y deteniendo según configuración en MongoDB"""
def __init__(self, general_args, queue_manager):
self.logger = logging.getLogger(__name__)
self._general_args = general_args
self._queue_manager = queue_manager
self._executor = ThreadPoolExecutor(max_workers=1000)
self._workers = {} # Dict: worker_id -> {'worker': Worker, 'future': Future, 'item': ItemMonitor, 'username': str}
self._worker_to_username = {} # Mapeo de worker_id -> username
self._worker_id_to_name = {} # Mapeo de worker_id -> worker_name (para compatibilidad)
self._running = True
self._lock = threading.Lock()
self._db = get_mongodb_client_for_workers()
def load_workers_config(self):
"""Carga la configuración de workers desde MongoDB (todos los usuarios)"""
try:
config = get_workers_from_mongodb(None) # None = todos los usuarios
disabled = set(config.get('disabled', []))
items = []
workers_by_user = config.get('workers_by_user', {})
# Crear mapeos de worker_id -> username y worker_name -> username (para compatibilidad)
worker_id_to_username = {}
worker_name_to_username = {}
for username, user_items in workers_by_user.items():
for item_data in user_items:
worker_name = item_data.get('name')
worker_id = item_data.get('id') or item_data.get('worker_id')
if worker_id:
worker_id_to_username[worker_id] = username
if worker_name:
worker_name_to_username[worker_name] = username
for item_data in config.get('items', []):
item = ItemMonitor.load_from_json(item_data)
worker_id = item.get_id()
worker_name = item.get_name()
# Obtener el username del worker (priorizar por ID, luego por nombre para compatibilidad)
username = worker_id_to_username.get(worker_id) or worker_name_to_username.get(worker_name)
# Verificar si está deshabilitado (por ID o por nombre para compatibilidad)
is_disabled = worker_id in disabled or worker_name in disabled
items.append((item, not is_disabled, username))
general = GeneralMonitor.load_from_json(config.get('general', {}))
return items, general, disabled, worker_id_to_username
except Exception as e:
self.logger.error(f"Error cargando workers de MongoDB: {e}")
return [], GeneralMonitor([], [], [], [], []), set(), {}
def start_worker(self, item, general_args, username=None):
"""Inicia un worker"""
try:
worker = Worker(item, general_args, self._queue_manager, username=username)
future = self._executor.submit(worker.run)
worker_id = item.get_id()
worker_name = item.get_name()
self._workers[worker_id] = {
'worker': worker,
'future': future,
'item': item,
'username': username
}
# Actualizar mapeos
if username:
self._worker_to_username[worker_id] = username
self._worker_id_to_name[worker_id] = worker_name
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) iniciado para usuario '{username or 'unknown'}'")
return True
except Exception as e:
self.logger.error(f"Error iniciando worker '{item.get_name()}' (ID: {item.get_id()}): {e}")
return False
def stop_worker(self, worker_id):
"""Detiene un worker por su ID"""
if worker_id not in self._workers:
return False
try:
worker_data = self._workers[worker_id]
worker = worker_data['worker']
future = worker_data['future']
worker_name = self._worker_id_to_name.get(worker_id, worker_id)
# Detener el worker
worker.stop()
# Intentar cancelar el future si aún no está ejecutándose
if not future.done():
future.cancel()
# No esperamos a que termine, se detendrá automáticamente
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido")
return True
except Exception as e:
self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}")
# Asegurarse de eliminar la entrada aunque haya error
if worker_id in self._workers:
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
return False
def sync_workers(self):
"""Sincroniza los workers con la configuración en MongoDB (todos los usuarios)"""
items, general_args, disabled, worker_id_to_username = self.load_workers_config()
# Actualizar mapeo global
self._worker_to_username = worker_id_to_username
# Actualizar general_args en todos los workers activos
old_general_args = self._general_args
self._general_args = general_args
current_worker_ids = set(self._workers.keys())
enabled_worker_ids = {item.get_id() for item, enabled, username in items if enabled}
# Actualizar general_args en workers existentes
with self._lock:
for worker_data in self._workers.values():
worker_data['worker'].update_general_args(general_args)
# Detener workers que ya no existen o están deshabilitados
to_stop = current_worker_ids - enabled_worker_ids
for worker_id in list(to_stop):
with self._lock:
if worker_id in self._workers:
worker = self._workers[worker_id]['worker']
worker.stop()
future = self._workers[worker_id]['future']
if not future.done():
future.cancel()
worker_name = self._worker_id_to_name.get(worker_id, worker_id)
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido")
# Iniciar workers nuevos o que se hayan habilitado
for item, enabled, username in items:
worker_id = item.get_id()
worker_name = item.get_name()
if enabled and worker_id not in current_worker_ids:
# Nuevo worker o recién activado
with self._lock:
if worker_id not in self._workers:
self.start_worker(item, general_args, username=username)
elif enabled and worker_id in current_worker_ids:
# Worker existente, verificar si hay cambios significativos
needs_restart = False
with self._lock:
if worker_id in self._workers:
old_item = self._workers[worker_id]['item']
old_username = self._workers[worker_id].get('username')
# Comparar si hay cambios significativos o cambio de usuario
if self._has_changes(old_item, item) or old_username != username:
self.logger.info(f"Reiniciando worker '{worker_name}' (ID: {worker_id}) por cambios en configuración")
worker = self._workers[worker_id]['worker']
worker.stop()
future = self._workers[worker_id]['future']
if not future.done():
future.cancel()
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
needs_restart = True
else:
# Actualizar la referencia al item sin reiniciar
self._workers[worker_id]['item'] = item
self._workers[worker_id]['username'] = username
# Actualizar general_args en el worker
self._workers[worker_id]['worker'].update_general_args(general_args)
# Reiniciar fuera del lock para evitar deadlocks
if needs_restart:
time.sleep(0.5) # Dar tiempo para detener
with self._lock:
if worker_id not in self._workers: # Verificar que no se haya añadido en otro thread
self.start_worker(item, general_args, username=username)
def _has_changes(self, old_item, new_item):
"""Verifica si hay cambios significativos entre dos items"""
# Comparar campos importantes
return (
old_item.get_search_query() != new_item.get_search_query() or
old_item.get_min_price() != new_item.get_min_price() or
old_item.get_max_price() != new_item.get_max_price() or
old_item.get_thread_id() != new_item.get_thread_id() or
old_item.get_platform() != new_item.get_platform() or
old_item.get_check_every() != new_item.get_check_every() or
old_item.get_latitude() != new_item.get_latitude() or
old_item.get_longitude() != new_item.get_longitude() or
old_item.get_max_distance() != new_item.get_max_distance()
)
def monitor_workers_mongodb(self):
"""Monitorea los workers en MongoDB (todos los usuarios) y sincroniza workers usando polling"""
self.logger.info("Iniciando monitor de workers en MongoDB (todos los usuarios)...")
last_config_hash = None
while self._running:
try:
time.sleep(2) # Verificar cada 2 segundos
if self._db is None:
self._db = get_mongodb_client_for_workers()
if self._db is None:
time.sleep(5) # Esperar más tiempo si no hay MongoDB
continue
# Obtener configuración actual de MongoDB (todos los usuarios)
try:
workers_collection = self._db['workers']
all_workers = list(workers_collection.find({}, {'_id': 0}))
# Crear un hash combinado de todos los workers
if all_workers:
import hashlib
from datetime import datetime
# Función para convertir datetime a string para serialización JSON
def json_serial(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
all_configs_json = json.dumps(all_workers, sort_keys=True, default=json_serial)
current_config_hash = hashlib.md5(all_configs_json.encode()).hexdigest()
else:
current_config_hash = None
# Si la configuración cambió, sincronizar
if current_config_hash != last_config_hash:
if last_config_hash is not None: # Solo loguear si no es la primera vez
self.logger.info(f"Detectado cambio en workers de MongoDB (usuarios: {len(all_workers)}), sincronizando workers...")
time.sleep(0.5) # Esperar un poco para asegurar consistencia
self.sync_workers()
last_config_hash = current_config_hash
except ConnectionFailure as e:
self.logger.error(f"Error leyendo workers de MongoDB: {e}")
time.sleep(5) # Esperar más tiempo si hay error
self._db = None # Intentar reconectar
except Exception as e:
self.logger.error(f"Error monitoreando workers en MongoDB: {e}")
time.sleep(5) # Esperar más tiempo si hay error
def start_monitoring(self):
"""Inicia el monitoreo en un thread separado"""
monitor_thread = threading.Thread(target=self.monitor_workers_mongodb, daemon=True)
monitor_thread.start()
return monitor_thread
def stop_all(self):
"""Detiene todos los workers"""
self._running = False
worker_ids = list(self._workers.keys())
for worker_id in worker_ids:
try:
with self._lock:
if worker_id in self._workers:
worker = self._workers[worker_id]['worker']
worker.stop()
future = self._workers[worker_id]['future']
if not future.done():
future.cancel()
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
except Exception as e:
self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}")
self._executor.shutdown(wait=False)
self.logger.info("Todos los workers detenidos")
if __name__ == "__main__":
initialize_config_files()
configure_logger()
logger = logging.getLogger(__name__)
# Cargar configuración de cache y crear ArticleCache
cache_config = load_cache_config()
cache_type = cache_config['cache_type']
cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'}
article_cache = create_article_cache(cache_type, **cache_kwargs)
# Preparar configuración de MongoDB para TelegramManagerFactory
mongodb_config = {
'mongodb': {
'host': cache_config.get('mongodb_host', 'localhost'),
'port': cache_config.get('mongodb_port', 27017),
'database': cache_config.get('mongodb_database', 'wallabicher'),
'username': cache_config.get('mongodb_username'),
'password': cache_config.get('mongodb_password'),
'auth_source': cache_config.get('mongodb_auth_source', 'admin')
}
}
# Crear QueueManager con ArticleCache y configuración de MongoDB
queue_manager = QueueManager(article_cache, mongodb_config)
# Crear WorkerManager (carga workers de todos los usuarios)
items, general_args = parse_items_to_monitor(None) # None = todos los usuarios
worker_manager = WorkerManager(general_args, queue_manager)
# Sincronizar workers iniciales
worker_manager.sync_workers()
# Iniciar monitoreo de workers en MongoDB (todos los usuarios)
worker_manager.start_monitoring()
try:
logger.info("Sistema de monitoreo iniciado (todos los usuarios). Esperando cambios en MongoDB...")
# Mantener el programa corriendo
while True:
time.sleep(60)
# Sincronización periódica por si acaso
worker_manager.sync_workers()
except KeyboardInterrupt:
logger.info("Deteniendo sistema...")
finally:
worker_manager.stop_all()