Signed-off-by: Omar Sánchez Pizarro <omar.sanchez@pistacero.net>
This commit is contained in:
Omar Sánchez Pizarro
2026-01-20 03:22:56 +01:00
parent 81bf0675ed
commit d28710b927
12 changed files with 1166 additions and 310 deletions

View File

@@ -7,6 +7,8 @@ import shutil
import yaml
import time
import threading
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from datalayer.item_monitor import ItemMonitor
from datalayer.general_monitor import GeneralMonitor
@@ -18,12 +20,13 @@ def initialize_config_files():
"""
Inicializa los archivos de configuración si no existen,
copiando los archivos .sample correspondientes.
Nota: workers.json ya no se usa, los workers se almacenan en MongoDB.
"""
base_dir = os.path.dirname(os.path.abspath(__file__))
config_files = [
('config.yaml', 'config.sample.yaml'),
('workers.json', 'workers.sample.json')
# workers.json ya no se usa, se almacena en MongoDB
]
for config_file, sample_file in config_files:
@@ -69,16 +72,172 @@ def configure_logger():
logging.basicConfig(level=logging.NOTSET,
handlers=[console_handler, file_handler])
def parse_items_to_monitor():
import os
def get_mongodb_client_for_workers():
"""Obtiene un cliente MongoDB para leer workers"""
base_dir = os.path.dirname(os.path.abspath(__file__))
workers_path = os.path.join(base_dir, "workers.json")
with open(workers_path) as f:
args = json.load(f)
if 'items' not in args:
raise ValueError("Missing mandatory field: items")
items = [ItemMonitor.load_from_json(item) for item in args['items']]
general_args = GeneralMonitor.load_from_json(args['general'])
config_path = os.path.join(base_dir, "config.yaml")
logger = logging.getLogger(__name__)
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
cache_config = config.get('cache', {})
if cache_config.get('type') == 'mongodb':
mongodb_config = cache_config.get('mongodb', {})
mongodb_host = os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost')
mongodb_port = int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017))
mongodb_database = os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher')
mongodb_username = os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username')
mongodb_password = os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password')
mongodb_auth_source = mongodb_config.get('auth_source', 'admin')
# Construir URL de conexión
if mongodb_username and mongodb_password:
connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}"
else:
connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/"
mongo_client = MongoClient(
connection_string,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=5000,
socketTimeoutMS=5000
)
# Verificar conexión
mongo_client.admin.command('ping')
db = mongo_client[mongodb_database]
return db
else:
logger.error("MongoDB no está configurado. Workers requieren MongoDB.")
return None
except Exception as e:
logger.error(f"Error conectando a MongoDB para workers: {e}")
return None
def get_workers_from_mongodb(username=None):
"""Obtiene workers de MongoDB para un usuario específico o todos los usuarios"""
logger = logging.getLogger(__name__)
db = get_mongodb_client_for_workers()
if db is None:
logger.error("No se pudo conectar a MongoDB para obtener workers")
return {
'general': {'title_exclude': [], 'description_exclude': []},
'items': [],
'disabled': [],
'workers_by_user': {} # Mapeo de username -> lista de items
}
try:
workers_collection = db['workers']
# Si se especifica un usuario, obtener solo sus workers
if username:
workers_data = workers_collection.find_one({ 'username': username })
if not workers_data:
logger.warning(f"No se encontraron workers para el usuario '{username}' en MongoDB")
return {
'general': {'title_exclude': [], 'description_exclude': []},
'items': [],
'disabled': [],
'workers_by_user': {username: []}
}
# Remover campos de MongoDB
workers_data.pop('_id', None)
workers_data.pop('updatedAt', None)
workers_data.pop('createdAt', None)
username_value = workers_data.pop('username')
data = workers_data
data['workers_by_user'] = {username_value: data.get('items', [])}
return data
else:
# Obtener workers de todos los usuarios
all_workers = list(workers_collection.find({}))
if not all_workers:
logger.warning("No se encontraron workers en MongoDB")
return {
'general': {'title_exclude': [], 'description_exclude': []},
'items': [],
'disabled': [],
'workers_by_user': {}
}
# Combinar workers de todos los usuarios
all_items = []
all_disabled = []
workers_by_user = {} # Mapeo de username -> lista de items
# Combinar las exclusiones generales (unión de todas)
general_title_exclude = set()
general_description_exclude = set()
for user_workers_doc in all_workers:
try:
username_from_doc = user_workers_doc.get('username')
if not username_from_doc:
continue
# Remover campos de MongoDB
user_workers = {k: v for k, v in user_workers_doc.items()
if k not in ['_id', 'username', 'updatedAt', 'createdAt']}
user_items = user_workers.get('items', [])
# Guardar el mapeo de usuario -> items
workers_by_user[username_from_doc] = user_items
# Agregar items del usuario
for item in user_items:
all_items.append(item)
# Agregar disabled del usuario
for disabled_name in user_workers.get('disabled', []):
all_disabled.append(disabled_name)
# Combinar exclusiones generales
user_general = user_workers.get('general', {})
general_title_exclude.update(user_general.get('title_exclude', []))
general_description_exclude.update(user_general.get('description_exclude', []))
logger.debug(f"Cargados {len(user_items)} workers del usuario '{username_from_doc}'")
except Exception as e:
logger.error(f"Error procesando workers del usuario '{user_workers_doc.get('username', 'unknown')}': {e}")
continue
logger.info(f"Total de workers cargados: {len(all_items)} items de {len(all_workers)} usuarios")
return {
'general': {
'title_exclude': list(general_title_exclude),
'description_exclude': list(general_description_exclude)
},
'items': all_items,
'disabled': all_disabled,
'workers_by_user': workers_by_user # Mapeo de username -> lista de items
}
except Exception as e:
logger.error(f"Error obteniendo workers de MongoDB: {e}")
return {
'general': {'title_exclude': [], 'description_exclude': []},
'items': [],
'disabled': [],
'workers_by_user': {}
}
def parse_items_to_monitor(username=None):
"""Parsea items a monitorear desde MongoDB (todos los usuarios si username es None)"""
args = get_workers_from_mongodb(username)
if 'items' not in args:
raise ValueError("Missing mandatory field: items")
items = [ItemMonitor.load_from_json(item) for item in args.get('items', [])]
general_args = GeneralMonitor.load_from_json(args.get('general', {}))
return items, general_args
def load_cache_config():
@@ -91,91 +250,126 @@ def load_cache_config():
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
cache_config = config.get('cache', {})
cache_type = cache_config.get('type', 'redis')
cache_type = cache_config.get('type', 'mongodb')
if cache_type == 'redis':
redis_config = cache_config.get('redis', {})
if cache_type == 'mongodb':
mongodb_config = cache_config.get('mongodb', {})
return {
'cache_type': 'redis',
'redis_host': redis_config.get('host', 'localhost'),
'redis_port': redis_config.get('port', 6379),
'redis_db': redis_config.get('db', 0),
'redis_password': redis_config.get('password')
'cache_type': 'mongodb',
'mongodb_host': os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost'),
'mongodb_port': int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)),
'mongodb_database': os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher'),
'mongodb_username': os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username'),
'mongodb_password': os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password'),
'mongodb_auth_source': mongodb_config.get('auth_source', 'admin')
}
else:
logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'redis' por defecto")
logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'mongodb' por defecto")
return {
'cache_type': 'redis',
'redis_host': 'localhost',
'redis_port': 6379,
'redis_db': 0,
'redis_password': None
'cache_type': 'mongodb',
'mongodb_host': 'localhost',
'mongodb_port': 27017,
'mongodb_database': 'wallabicher',
'mongodb_username': None,
'mongodb_password': None,
'mongodb_auth_source': 'admin'
}
except Exception as e:
logger.warning(f"Error cargando configuración de cache, usando valores por defecto (redis): {e}")
logger.warning(f"Error cargando configuración de cache, usando valores por defecto (mongodb): {e}")
return {
'cache_type': 'redis',
'redis_host': 'localhost',
'redis_port': 6379,
'redis_db': 0,
'redis_password': None
'cache_type': 'mongodb',
'mongodb_host': 'localhost',
'mongodb_port': 27017,
'mongodb_database': 'wallabicher',
'mongodb_username': None,
'mongodb_password': None,
'mongodb_auth_source': 'admin'
}
class WorkerManager:
"""Gestiona workers dinámicamente, iniciando y deteniendo según workers.json"""
"""Gestiona workers dinámicamente, iniciando y deteniendo según configuración en MongoDB"""
def __init__(self, general_args, queue_manager):
self.logger = logging.getLogger(__name__)
self._general_args = general_args
self._queue_manager = queue_manager
self._executor = ThreadPoolExecutor(max_workers=1000)
self._workers = {} # Dict: worker_name -> {'worker': Worker, 'future': Future, 'item': ItemMonitor}
self._workers = {} # Dict: worker_id -> {'worker': Worker, 'future': Future, 'item': ItemMonitor, 'username': str}
self._worker_to_username = {} # Mapeo de worker_id -> username
self._worker_id_to_name = {} # Mapeo de worker_id -> worker_name (para compatibilidad)
self._running = True
self._lock = threading.Lock()
base_dir = os.path.dirname(os.path.abspath(__file__))
self._workers_path = os.path.join(base_dir, "workers.json")
self._db = get_mongodb_client_for_workers()
def load_workers_config(self):
"""Carga la configuración de workers desde workers.json"""
"""Carga la configuración de workers desde MongoDB (todos los usuarios)"""
try:
with open(self._workers_path, 'r') as f:
config = json.load(f)
config = get_workers_from_mongodb(None) # None = todos los usuarios
disabled = set(config.get('disabled', []))
items = []
workers_by_user = config.get('workers_by_user', {})
# Crear mapeos de worker_id -> username y worker_name -> username (para compatibilidad)
worker_id_to_username = {}
worker_name_to_username = {}
for username, user_items in workers_by_user.items():
for item_data in user_items:
worker_name = item_data.get('name')
worker_id = item_data.get('id') or item_data.get('worker_id')
if worker_id:
worker_id_to_username[worker_id] = username
if worker_name:
worker_name_to_username[worker_name] = username
for item_data in config.get('items', []):
item = ItemMonitor.load_from_json(item_data)
items.append((item, item.get_name() not in disabled))
worker_id = item.get_id()
worker_name = item.get_name()
# Obtener el username del worker (priorizar por ID, luego por nombre para compatibilidad)
username = worker_id_to_username.get(worker_id) or worker_name_to_username.get(worker_name)
# Verificar si está deshabilitado (por ID o por nombre para compatibilidad)
is_disabled = worker_id in disabled or worker_name in disabled
items.append((item, not is_disabled, username))
general = GeneralMonitor.load_from_json(config.get('general', {}))
return items, general, disabled
return items, general, disabled, worker_id_to_username
except Exception as e:
self.logger.error(f"Error cargando workers.json: {e}")
return [], GeneralMonitor([], [], [], [], []), set()
self.logger.error(f"Error cargando workers de MongoDB: {e}")
return [], GeneralMonitor([], [], [], [], []), set(), {}
def start_worker(self, item, general_args):
def start_worker(self, item, general_args, username=None):
"""Inicia un worker"""
try:
worker = Worker(item, general_args, self._queue_manager)
worker = Worker(item, general_args, self._queue_manager, username=username)
future = self._executor.submit(worker.run)
self._workers[item.get_name()] = {
worker_id = item.get_id()
worker_name = item.get_name()
self._workers[worker_id] = {
'worker': worker,
'future': future,
'item': item
'item': item,
'username': username
}
self.logger.info(f"Worker '{item.get_name()}' iniciado")
# Actualizar mapeos
if username:
self._worker_to_username[worker_id] = username
self._worker_id_to_name[worker_id] = worker_name
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) iniciado para usuario '{username or 'unknown'}'")
return True
except Exception as e:
self.logger.error(f"Error iniciando worker '{item.get_name()}': {e}")
self.logger.error(f"Error iniciando worker '{item.get_name()}' (ID: {item.get_id()}): {e}")
return False
def stop_worker(self, worker_name):
"""Detiene un worker"""
if worker_name not in self._workers:
def stop_worker(self, worker_id):
"""Detiene un worker por su ID"""
if worker_id not in self._workers:
return False
try:
worker_data = self._workers[worker_name]
worker_data = self._workers[worker_id]
worker = worker_data['worker']
future = worker_data['future']
worker_name = self._worker_id_to_name.get(worker_id, worker_id)
# Detener el worker
worker.stop()
@@ -185,26 +379,37 @@ class WorkerManager:
future.cancel()
# No esperamos a que termine, se detendrá automáticamente
del self._workers[worker_name]
self.logger.info(f"Worker '{worker_name}' detenido")
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido")
return True
except Exception as e:
self.logger.error(f"Error deteniendo worker '{worker_name}': {e}")
self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}")
# Asegurarse de eliminar la entrada aunque haya error
if worker_name in self._workers:
del self._workers[worker_name]
if worker_id in self._workers:
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
return False
def sync_workers(self):
"""Sincroniza los workers con workers.json"""
items, general_args, disabled = self.load_workers_config()
"""Sincroniza los workers con la configuración en MongoDB (todos los usuarios)"""
items, general_args, disabled, worker_id_to_username = self.load_workers_config()
# Actualizar mapeo global
self._worker_to_username = worker_id_to_username
# Actualizar general_args en todos los workers activos
old_general_args = self._general_args
self._general_args = general_args
current_worker_names = set(self._workers.keys())
enabled_worker_names = {item.get_name() for item, enabled in items if enabled}
current_worker_ids = set(self._workers.keys())
enabled_worker_ids = {item.get_id() for item, enabled, username in items if enabled}
# Actualizar general_args en workers existentes
with self._lock:
@@ -212,53 +417,65 @@ class WorkerManager:
worker_data['worker'].update_general_args(general_args)
# Detener workers que ya no existen o están deshabilitados
to_stop = current_worker_names - enabled_worker_names
for worker_name in list(to_stop):
to_stop = current_worker_ids - enabled_worker_ids
for worker_id in list(to_stop):
with self._lock:
if worker_name in self._workers:
worker = self._workers[worker_name]['worker']
if worker_id in self._workers:
worker = self._workers[worker_id]['worker']
worker.stop()
future = self._workers[worker_name]['future']
future = self._workers[worker_id]['future']
if not future.done():
future.cancel()
del self._workers[worker_name]
self.logger.info(f"Worker '{worker_name}' detenido")
worker_name = self._worker_id_to_name.get(worker_id, worker_id)
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido")
# Iniciar workers nuevos o que se hayan habilitado
for item, enabled in items:
for item, enabled, username in items:
worker_id = item.get_id()
worker_name = item.get_name()
if enabled and worker_name not in current_worker_names:
if enabled and worker_id not in current_worker_ids:
# Nuevo worker o recién activado
with self._lock:
if worker_name not in self._workers:
self.start_worker(item, general_args)
elif enabled and worker_name in current_worker_names:
if worker_id not in self._workers:
self.start_worker(item, general_args, username=username)
elif enabled and worker_id in current_worker_ids:
# Worker existente, verificar si hay cambios significativos
needs_restart = False
with self._lock:
if worker_name in self._workers:
old_item = self._workers[worker_name]['item']
# Comparar si hay cambios significativos
if self._has_changes(old_item, item):
self.logger.info(f"Reiniciando worker '{worker_name}' por cambios en configuración")
worker = self._workers[worker_name]['worker']
if worker_id in self._workers:
old_item = self._workers[worker_id]['item']
old_username = self._workers[worker_id].get('username')
# Comparar si hay cambios significativos o cambio de usuario
if self._has_changes(old_item, item) or old_username != username:
self.logger.info(f"Reiniciando worker '{worker_name}' (ID: {worker_id}) por cambios en configuración")
worker = self._workers[worker_id]['worker']
worker.stop()
future = self._workers[worker_name]['future']
future = self._workers[worker_id]['future']
if not future.done():
future.cancel()
del self._workers[worker_name]
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
needs_restart = True
else:
# Actualizar la referencia al item sin reiniciar
self._workers[worker_name]['item'] = item
self._workers[worker_id]['item'] = item
self._workers[worker_id]['username'] = username
# Actualizar general_args en el worker
self._workers[worker_name]['worker'].update_general_args(general_args)
self._workers[worker_id]['worker'].update_general_args(general_args)
# Reiniciar fuera del lock para evitar deadlocks
if needs_restart:
time.sleep(0.5) # Dar tiempo para detener
with self._lock:
if worker_name not in self._workers: # Verificar que no se haya añadido en otro thread
self.start_worker(item, general_args)
if worker_id not in self._workers: # Verificar que no se haya añadido en otro thread
self.start_worker(item, general_args, username=username)
def _has_changes(self, old_item, new_item):
"""Verifica si hay cambios significativos entre dos items"""
@@ -275,56 +492,81 @@ class WorkerManager:
old_item.get_max_distance() != new_item.get_max_distance()
)
def monitor_workers_file(self):
"""Monitorea el archivo workers.json y sincroniza workers usando polling"""
self.logger.info("Iniciando monitor de workers.json...")
last_mtime = 0
try:
if os.path.exists(self._workers_path):
last_mtime = os.path.getmtime(self._workers_path)
except Exception as e:
self.logger.warning(f"Error obteniendo mtime inicial: {e}")
def monitor_workers_mongodb(self):
"""Monitorea los workers en MongoDB (todos los usuarios) y sincroniza workers usando polling"""
self.logger.info("Iniciando monitor de workers en MongoDB (todos los usuarios)...")
last_config_hash = None
while self._running:
try:
time.sleep(2) # Verificar cada 2 segundos
if not os.path.exists(self._workers_path):
continue
if self._db is None:
self._db = get_mongodb_client_for_workers()
if self._db is None:
time.sleep(5) # Esperar más tiempo si no hay MongoDB
continue
current_mtime = os.path.getmtime(self._workers_path)
if current_mtime != last_mtime:
self.logger.info("Detectado cambio en workers.json, sincronizando workers...")
time.sleep(0.5) # Esperar un poco para que se termine de escribir el archivo
self.sync_workers()
last_mtime = current_mtime
# Obtener configuración actual de MongoDB (todos los usuarios)
try:
workers_collection = self._db['workers']
all_workers = list(workers_collection.find({}, {'_id': 0}))
# Crear un hash combinado de todos los workers
if all_workers:
import hashlib
from datetime import datetime
# Función para convertir datetime a string para serialización JSON
def json_serial(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
all_configs_json = json.dumps(all_workers, sort_keys=True, default=json_serial)
current_config_hash = hashlib.md5(all_configs_json.encode()).hexdigest()
else:
current_config_hash = None
# Si la configuración cambió, sincronizar
if current_config_hash != last_config_hash:
if last_config_hash is not None: # Solo loguear si no es la primera vez
self.logger.info(f"Detectado cambio en workers de MongoDB (usuarios: {len(all_workers)}), sincronizando workers...")
time.sleep(0.5) # Esperar un poco para asegurar consistencia
self.sync_workers()
last_config_hash = current_config_hash
except ConnectionFailure as e:
self.logger.error(f"Error leyendo workers de MongoDB: {e}")
time.sleep(5) # Esperar más tiempo si hay error
self._db = None # Intentar reconectar
except Exception as e:
self.logger.error(f"Error monitoreando workers.json: {e}")
self.logger.error(f"Error monitoreando workers en MongoDB: {e}")
time.sleep(5) # Esperar más tiempo si hay error
def start_monitoring(self):
"""Inicia el monitoreo en un thread separado"""
monitor_thread = threading.Thread(target=self.monitor_workers_file, daemon=True)
monitor_thread = threading.Thread(target=self.monitor_workers_mongodb, daemon=True)
monitor_thread.start()
return monitor_thread
def stop_all(self):
"""Detiene todos los workers"""
self._running = False
worker_names = list(self._workers.keys())
for worker_name in worker_names:
worker_ids = list(self._workers.keys())
for worker_id in worker_ids:
try:
with self._lock:
if worker_name in self._workers:
worker = self._workers[worker_name]['worker']
if worker_id in self._workers:
worker = self._workers[worker_id]['worker']
worker.stop()
future = self._workers[worker_name]['future']
future = self._workers[worker_id]['future']
if not future.done():
future.cancel()
del self._workers[worker_name]
del self._workers[worker_id]
if worker_id in self._worker_to_username:
del self._worker_to_username[worker_id]
if worker_id in self._worker_id_to_name:
del self._worker_id_to_name[worker_id]
except Exception as e:
self.logger.error(f"Error deteniendo worker '{worker_name}': {e}")
self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}")
self._executor.shutdown(wait=False)
self.logger.info("Todos los workers detenidos")
@@ -340,21 +582,33 @@ if __name__ == "__main__":
cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'}
article_cache = create_article_cache(cache_type, **cache_kwargs)
# Crear QueueManager con ArticleCache
queue_manager = QueueManager(article_cache)
# Preparar configuración de MongoDB para TelegramManagerFactory
mongodb_config = {
'mongodb': {
'host': cache_config.get('mongodb_host', 'localhost'),
'port': cache_config.get('mongodb_port', 27017),
'database': cache_config.get('mongodb_database', 'wallabicher'),
'username': cache_config.get('mongodb_username'),
'password': cache_config.get('mongodb_password'),
'auth_source': cache_config.get('mongodb_auth_source', 'admin')
}
}
# Crear WorkerManager
items, general_args = parse_items_to_monitor()
# Crear QueueManager con ArticleCache y configuración de MongoDB
queue_manager = QueueManager(article_cache, mongodb_config)
# Crear WorkerManager (carga workers de todos los usuarios)
items, general_args = parse_items_to_monitor(None) # None = todos los usuarios
worker_manager = WorkerManager(general_args, queue_manager)
# Sincronizar workers iniciales
worker_manager.sync_workers()
# Iniciar monitoreo del archivo workers.json
# Iniciar monitoreo de workers en MongoDB (todos los usuarios)
worker_manager.start_monitoring()
try:
logger.info("Sistema de monitoreo iniciado. Esperando cambios en workers.json...")
logger.info("Sistema de monitoreo iniciado (todos los usuarios). Esperando cambios en MongoDB...")
# Mantener el programa corriendo
while True:
time.sleep(60)