622 lines
29 KiB
Python
622 lines
29 KiB
Python
import json
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from concurrent.futures import ThreadPoolExecutor, Future
|
|
import os
|
|
import shutil
|
|
import yaml
|
|
import time
|
|
import threading
|
|
from pymongo import MongoClient
|
|
from pymongo.errors import ConnectionFailure
|
|
|
|
from datalayer.item_monitor import ItemMonitor
|
|
from datalayer.general_monitor import GeneralMonitor
|
|
from managers.worker import Worker
|
|
from managers.queue_manager import QueueManager
|
|
from managers.article_cache import create_article_cache
|
|
|
|
def initialize_config_files():
|
|
"""
|
|
Inicializa los archivos de configuración si no existen,
|
|
copiando los archivos .sample correspondientes.
|
|
Nota: workers.json ya no se usa, los workers se almacenan en MongoDB.
|
|
"""
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
config_files = [
|
|
('config.yaml', 'config.sample.yaml'),
|
|
# workers.json ya no se usa, se almacena en MongoDB
|
|
]
|
|
|
|
for config_file, sample_file in config_files:
|
|
config_path = os.path.join(base_dir, config_file)
|
|
sample_path = os.path.join(base_dir, sample_file)
|
|
|
|
if not os.path.exists(config_path):
|
|
if os.path.exists(sample_path):
|
|
shutil.copy2(sample_path, config_path)
|
|
print(f"✓ Archivo '{config_file}' creado desde '{sample_file}'")
|
|
print(f" Por favor, edita '{config_file}' con tu configuración antes de continuar.")
|
|
else:
|
|
raise FileNotFoundError(
|
|
f"No se encontró ni '{config_file}' ni '{sample_file}'. "
|
|
f"Por favor, crea uno de estos archivos."
|
|
)
|
|
|
|
def configure_logger():
|
|
import os
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_handler.setFormatter(logging.Formatter('%(levelname)s [%(asctime)s] %(name)s - %(message)s'))
|
|
|
|
# Determinar la ruta del archivo de log
|
|
# En Docker, usar /app/logs si existe, sino usar el directorio actual
|
|
if os.path.isdir('/app/logs'):
|
|
log_path = '/app/logs/monitor.log'
|
|
else:
|
|
log_path = 'monitor.log'
|
|
|
|
# Asegurarse de que el directorio existe
|
|
log_dir = os.path.dirname(log_path)
|
|
if log_dir and not os.path.exists(log_dir):
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
file_handler = RotatingFileHandler(log_path, maxBytes=10e6)
|
|
file_handler.setLevel(logging.DEBUG)
|
|
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
|
|
# Configure the root logger with both handlers
|
|
logging.basicConfig(level=logging.NOTSET,
|
|
handlers=[console_handler, file_handler])
|
|
|
|
def get_mongodb_client_for_workers():
|
|
"""Obtiene un cliente MongoDB para leer workers"""
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
config_path = os.path.join(base_dir, "config.yaml")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
cache_config = config.get('cache', {})
|
|
|
|
if cache_config.get('type') == 'mongodb':
|
|
mongodb_config = cache_config.get('mongodb', {})
|
|
mongodb_host = os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost')
|
|
mongodb_port = int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017))
|
|
mongodb_database = os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher')
|
|
mongodb_username = os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username')
|
|
mongodb_password = os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password')
|
|
mongodb_auth_source = mongodb_config.get('auth_source', 'admin')
|
|
|
|
# Construir URL de conexión
|
|
if mongodb_username and mongodb_password:
|
|
connection_string = f"mongodb://{mongodb_username}:{mongodb_password}@{mongodb_host}:{mongodb_port}/?authSource={mongodb_auth_source}"
|
|
else:
|
|
connection_string = f"mongodb://{mongodb_host}:{mongodb_port}/"
|
|
|
|
mongo_client = MongoClient(
|
|
connection_string,
|
|
serverSelectionTimeoutMS=5000,
|
|
connectTimeoutMS=5000,
|
|
socketTimeoutMS=5000
|
|
)
|
|
|
|
# Verificar conexión
|
|
mongo_client.admin.command('ping')
|
|
db = mongo_client[mongodb_database]
|
|
return db
|
|
else:
|
|
logger.error("MongoDB no está configurado. Workers requieren MongoDB.")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error conectando a MongoDB para workers: {e}")
|
|
return None
|
|
|
|
def get_workers_from_mongodb(username=None):
|
|
"""Obtiene workers de MongoDB para un usuario específico o todos los usuarios"""
|
|
logger = logging.getLogger(__name__)
|
|
db = get_mongodb_client_for_workers()
|
|
|
|
if db is None:
|
|
logger.error("No se pudo conectar a MongoDB para obtener workers")
|
|
return {
|
|
'general': {'title_exclude': [], 'description_exclude': []},
|
|
'items': [],
|
|
'disabled': [],
|
|
'workers_by_user': {} # Mapeo de username -> lista de items
|
|
}
|
|
|
|
try:
|
|
workers_collection = db['workers']
|
|
|
|
# Si se especifica un usuario, obtener solo sus workers
|
|
if username:
|
|
workers_data = workers_collection.find_one({ 'username': username })
|
|
|
|
if not workers_data:
|
|
logger.warning(f"No se encontraron workers para el usuario '{username}' en MongoDB")
|
|
return {
|
|
'general': {'title_exclude': [], 'description_exclude': []},
|
|
'items': [],
|
|
'disabled': [],
|
|
'workers_by_user': {username: []}
|
|
}
|
|
|
|
# Remover campos de MongoDB
|
|
workers_data.pop('_id', None)
|
|
workers_data.pop('updatedAt', None)
|
|
workers_data.pop('createdAt', None)
|
|
username_value = workers_data.pop('username')
|
|
|
|
data = workers_data
|
|
data['workers_by_user'] = {username_value: data.get('items', [])}
|
|
return data
|
|
else:
|
|
# Obtener workers de todos los usuarios
|
|
all_workers = list(workers_collection.find({}))
|
|
|
|
if not all_workers:
|
|
logger.warning("No se encontraron workers en MongoDB")
|
|
return {
|
|
'general': {'title_exclude': [], 'description_exclude': []},
|
|
'items': [],
|
|
'disabled': [],
|
|
'workers_by_user': {}
|
|
}
|
|
|
|
# Combinar workers de todos los usuarios
|
|
all_items = []
|
|
all_disabled = []
|
|
workers_by_user = {} # Mapeo de username -> lista de items
|
|
# Combinar las exclusiones generales (unión de todas)
|
|
general_title_exclude = set()
|
|
general_description_exclude = set()
|
|
|
|
for user_workers_doc in all_workers:
|
|
try:
|
|
username_from_doc = user_workers_doc.get('username')
|
|
if not username_from_doc:
|
|
continue
|
|
|
|
# Remover campos de MongoDB
|
|
user_workers = {k: v for k, v in user_workers_doc.items()
|
|
if k not in ['_id', 'username', 'updatedAt', 'createdAt']}
|
|
|
|
user_items = user_workers.get('items', [])
|
|
|
|
# Guardar el mapeo de usuario -> items
|
|
workers_by_user[username_from_doc] = user_items
|
|
|
|
# Agregar items del usuario
|
|
for item in user_items:
|
|
all_items.append(item)
|
|
|
|
# Agregar disabled del usuario
|
|
for disabled_name in user_workers.get('disabled', []):
|
|
all_disabled.append(disabled_name)
|
|
|
|
# Combinar exclusiones generales
|
|
user_general = user_workers.get('general', {})
|
|
general_title_exclude.update(user_general.get('title_exclude', []))
|
|
general_description_exclude.update(user_general.get('description_exclude', []))
|
|
|
|
logger.debug(f"Cargados {len(user_items)} workers del usuario '{username_from_doc}'")
|
|
except Exception as e:
|
|
logger.error(f"Error procesando workers del usuario '{user_workers_doc.get('username', 'unknown')}': {e}")
|
|
continue
|
|
|
|
logger.info(f"Total de workers cargados: {len(all_items)} items de {len(all_workers)} usuarios")
|
|
|
|
return {
|
|
'general': {
|
|
'title_exclude': list(general_title_exclude),
|
|
'description_exclude': list(general_description_exclude)
|
|
},
|
|
'items': all_items,
|
|
'disabled': all_disabled,
|
|
'workers_by_user': workers_by_user # Mapeo de username -> lista de items
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error obteniendo workers de MongoDB: {e}")
|
|
return {
|
|
'general': {'title_exclude': [], 'description_exclude': []},
|
|
'items': [],
|
|
'disabled': [],
|
|
'workers_by_user': {}
|
|
}
|
|
|
|
def parse_items_to_monitor(username=None):
|
|
"""Parsea items a monitorear desde MongoDB (todos los usuarios si username es None)"""
|
|
args = get_workers_from_mongodb(username)
|
|
|
|
if 'items' not in args:
|
|
raise ValueError("Missing mandatory field: items")
|
|
|
|
items = [ItemMonitor.load_from_json(item) for item in args.get('items', [])]
|
|
general_args = GeneralMonitor.load_from_json(args.get('general', {}))
|
|
return items, general_args
|
|
|
|
def load_cache_config():
|
|
"""Carga la configuración del cache desde config.yaml"""
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
config_path = os.path.join(base_dir, "config.yaml")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
cache_config = config.get('cache', {})
|
|
cache_type = cache_config.get('type', 'mongodb')
|
|
|
|
if cache_type == 'mongodb':
|
|
mongodb_config = cache_config.get('mongodb', {})
|
|
return {
|
|
'cache_type': 'mongodb',
|
|
'mongodb_host': os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost'),
|
|
'mongodb_port': int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)),
|
|
'mongodb_database': os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher'),
|
|
'mongodb_username': os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username'),
|
|
'mongodb_password': os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password'),
|
|
'mongodb_auth_source': mongodb_config.get('auth_source', 'admin')
|
|
}
|
|
else:
|
|
logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'mongodb' por defecto")
|
|
return {
|
|
'cache_type': 'mongodb',
|
|
'mongodb_host': 'localhost',
|
|
'mongodb_port': 27017,
|
|
'mongodb_database': 'wallabicher',
|
|
'mongodb_username': None,
|
|
'mongodb_password': None,
|
|
'mongodb_auth_source': 'admin'
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Error cargando configuración de cache, usando valores por defecto (mongodb): {e}")
|
|
return {
|
|
'cache_type': 'mongodb',
|
|
'mongodb_host': 'localhost',
|
|
'mongodb_port': 27017,
|
|
'mongodb_database': 'wallabicher',
|
|
'mongodb_username': None,
|
|
'mongodb_password': None,
|
|
'mongodb_auth_source': 'admin'
|
|
}
|
|
|
|
class WorkerManager:
|
|
"""Gestiona workers dinámicamente, iniciando y deteniendo según configuración en MongoDB"""
|
|
|
|
def __init__(self, general_args, queue_manager):
|
|
self.logger = logging.getLogger(__name__)
|
|
self._general_args = general_args
|
|
self._queue_manager = queue_manager
|
|
self._executor = ThreadPoolExecutor(max_workers=1000)
|
|
self._workers = {} # Dict: worker_id -> {'worker': Worker, 'future': Future, 'item': ItemMonitor, 'username': str}
|
|
self._worker_to_username = {} # Mapeo de worker_id -> username
|
|
self._worker_id_to_name = {} # Mapeo de worker_id -> worker_name (para compatibilidad)
|
|
self._running = True
|
|
self._lock = threading.Lock()
|
|
self._db = get_mongodb_client_for_workers()
|
|
|
|
def load_workers_config(self):
|
|
"""Carga la configuración de workers desde MongoDB (todos los usuarios)"""
|
|
try:
|
|
config = get_workers_from_mongodb(None) # None = todos los usuarios
|
|
disabled = set(config.get('disabled', []))
|
|
items = []
|
|
workers_by_user = config.get('workers_by_user', {})
|
|
|
|
# Crear mapeos de worker_id -> username y worker_name -> username (para compatibilidad)
|
|
worker_id_to_username = {}
|
|
worker_name_to_username = {}
|
|
for username, user_items in workers_by_user.items():
|
|
for item_data in user_items:
|
|
worker_name = item_data.get('name')
|
|
worker_id = item_data.get('id') or item_data.get('worker_id')
|
|
if worker_id:
|
|
worker_id_to_username[worker_id] = username
|
|
if worker_name:
|
|
worker_name_to_username[worker_name] = username
|
|
|
|
for item_data in config.get('items', []):
|
|
item = ItemMonitor.load_from_json(item_data)
|
|
worker_id = item.get_id()
|
|
worker_name = item.get_name()
|
|
# Obtener el username del worker (priorizar por ID, luego por nombre para compatibilidad)
|
|
username = worker_id_to_username.get(worker_id) or worker_name_to_username.get(worker_name)
|
|
# Verificar si está deshabilitado (por ID o por nombre para compatibilidad)
|
|
is_disabled = worker_id in disabled or worker_name in disabled
|
|
items.append((item, not is_disabled, username))
|
|
|
|
general = GeneralMonitor.load_from_json(config.get('general', {}))
|
|
return items, general, disabled, worker_id_to_username
|
|
except Exception as e:
|
|
self.logger.error(f"Error cargando workers de MongoDB: {e}")
|
|
return [], GeneralMonitor([], [], [], [], []), set(), {}
|
|
|
|
def start_worker(self, item, general_args, username=None):
|
|
"""Inicia un worker"""
|
|
try:
|
|
worker = Worker(item, general_args, self._queue_manager, username=username)
|
|
future = self._executor.submit(worker.run)
|
|
worker_id = item.get_id()
|
|
worker_name = item.get_name()
|
|
self._workers[worker_id] = {
|
|
'worker': worker,
|
|
'future': future,
|
|
'item': item,
|
|
'username': username
|
|
}
|
|
# Actualizar mapeos
|
|
if username:
|
|
self._worker_to_username[worker_id] = username
|
|
self._worker_id_to_name[worker_id] = worker_name
|
|
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) iniciado para usuario '{username or 'unknown'}'")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Error iniciando worker '{item.get_name()}' (ID: {item.get_id()}): {e}")
|
|
return False
|
|
|
|
def stop_worker(self, worker_id):
|
|
"""Detiene un worker por su ID"""
|
|
if worker_id not in self._workers:
|
|
return False
|
|
|
|
try:
|
|
worker_data = self._workers[worker_id]
|
|
worker = worker_data['worker']
|
|
future = worker_data['future']
|
|
worker_name = self._worker_id_to_name.get(worker_id, worker_id)
|
|
|
|
# Detener el worker
|
|
worker.stop()
|
|
|
|
# Intentar cancelar el future si aún no está ejecutándose
|
|
if not future.done():
|
|
future.cancel()
|
|
|
|
# No esperamos a que termine, se detendrá automáticamente
|
|
del self._workers[worker_id]
|
|
if worker_id in self._worker_to_username:
|
|
del self._worker_to_username[worker_id]
|
|
if worker_id in self._worker_id_to_name:
|
|
del self._worker_id_to_name[worker_id]
|
|
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}")
|
|
# Asegurarse de eliminar la entrada aunque haya error
|
|
if worker_id in self._workers:
|
|
del self._workers[worker_id]
|
|
if worker_id in self._worker_to_username:
|
|
del self._worker_to_username[worker_id]
|
|
if worker_id in self._worker_id_to_name:
|
|
del self._worker_id_to_name[worker_id]
|
|
return False
|
|
|
|
def sync_workers(self):
|
|
"""Sincroniza los workers con la configuración en MongoDB (todos los usuarios)"""
|
|
items, general_args, disabled, worker_id_to_username = self.load_workers_config()
|
|
|
|
# Actualizar mapeo global
|
|
self._worker_to_username = worker_id_to_username
|
|
|
|
# Actualizar general_args en todos los workers activos
|
|
old_general_args = self._general_args
|
|
self._general_args = general_args
|
|
|
|
current_worker_ids = set(self._workers.keys())
|
|
enabled_worker_ids = {item.get_id() for item, enabled, username in items if enabled}
|
|
|
|
# Actualizar general_args en workers existentes
|
|
with self._lock:
|
|
for worker_data in self._workers.values():
|
|
worker_data['worker'].update_general_args(general_args)
|
|
|
|
# Detener workers que ya no existen o están deshabilitados
|
|
to_stop = current_worker_ids - enabled_worker_ids
|
|
for worker_id in list(to_stop):
|
|
with self._lock:
|
|
if worker_id in self._workers:
|
|
worker = self._workers[worker_id]['worker']
|
|
worker.stop()
|
|
future = self._workers[worker_id]['future']
|
|
if not future.done():
|
|
future.cancel()
|
|
worker_name = self._worker_id_to_name.get(worker_id, worker_id)
|
|
del self._workers[worker_id]
|
|
if worker_id in self._worker_to_username:
|
|
del self._worker_to_username[worker_id]
|
|
if worker_id in self._worker_id_to_name:
|
|
del self._worker_id_to_name[worker_id]
|
|
self.logger.info(f"Worker '{worker_name}' (ID: {worker_id}) detenido")
|
|
|
|
# Iniciar workers nuevos o que se hayan habilitado
|
|
for item, enabled, username in items:
|
|
worker_id = item.get_id()
|
|
worker_name = item.get_name()
|
|
if enabled and worker_id not in current_worker_ids:
|
|
# Nuevo worker o recién activado
|
|
with self._lock:
|
|
if worker_id not in self._workers:
|
|
self.start_worker(item, general_args, username=username)
|
|
elif enabled and worker_id in current_worker_ids:
|
|
# Worker existente, verificar si hay cambios significativos
|
|
needs_restart = False
|
|
with self._lock:
|
|
if worker_id in self._workers:
|
|
old_item = self._workers[worker_id]['item']
|
|
old_username = self._workers[worker_id].get('username')
|
|
# Comparar si hay cambios significativos o cambio de usuario
|
|
if self._has_changes(old_item, item) or old_username != username:
|
|
self.logger.info(f"Reiniciando worker '{worker_name}' (ID: {worker_id}) por cambios en configuración")
|
|
worker = self._workers[worker_id]['worker']
|
|
worker.stop()
|
|
future = self._workers[worker_id]['future']
|
|
if not future.done():
|
|
future.cancel()
|
|
del self._workers[worker_id]
|
|
if worker_id in self._worker_to_username:
|
|
del self._worker_to_username[worker_id]
|
|
if worker_id in self._worker_id_to_name:
|
|
del self._worker_id_to_name[worker_id]
|
|
needs_restart = True
|
|
else:
|
|
# Actualizar la referencia al item sin reiniciar
|
|
self._workers[worker_id]['item'] = item
|
|
self._workers[worker_id]['username'] = username
|
|
# Actualizar general_args en el worker
|
|
self._workers[worker_id]['worker'].update_general_args(general_args)
|
|
# Reiniciar fuera del lock para evitar deadlocks
|
|
if needs_restart:
|
|
time.sleep(0.5) # Dar tiempo para detener
|
|
with self._lock:
|
|
if worker_id not in self._workers: # Verificar que no se haya añadido en otro thread
|
|
self.start_worker(item, general_args, username=username)
|
|
|
|
def _has_changes(self, old_item, new_item):
|
|
"""Verifica si hay cambios significativos entre dos items"""
|
|
# Comparar campos importantes
|
|
return (
|
|
old_item.get_search_query() != new_item.get_search_query() or
|
|
old_item.get_min_price() != new_item.get_min_price() or
|
|
old_item.get_max_price() != new_item.get_max_price() or
|
|
old_item.get_thread_id() != new_item.get_thread_id() or
|
|
old_item.get_platform() != new_item.get_platform() or
|
|
old_item.get_check_every() != new_item.get_check_every() or
|
|
old_item.get_latitude() != new_item.get_latitude() or
|
|
old_item.get_longitude() != new_item.get_longitude() or
|
|
old_item.get_max_distance() != new_item.get_max_distance()
|
|
)
|
|
|
|
def monitor_workers_mongodb(self):
|
|
"""Monitorea los workers en MongoDB (todos los usuarios) y sincroniza workers usando polling"""
|
|
self.logger.info("Iniciando monitor de workers en MongoDB (todos los usuarios)...")
|
|
last_config_hash = None
|
|
|
|
while self._running:
|
|
try:
|
|
time.sleep(2) # Verificar cada 2 segundos
|
|
|
|
if self._db is None:
|
|
self._db = get_mongodb_client_for_workers()
|
|
if self._db is None:
|
|
time.sleep(5) # Esperar más tiempo si no hay MongoDB
|
|
continue
|
|
|
|
# Obtener configuración actual de MongoDB (todos los usuarios)
|
|
try:
|
|
workers_collection = self._db['workers']
|
|
all_workers = list(workers_collection.find({}, {'_id': 0}))
|
|
|
|
# Crear un hash combinado de todos los workers
|
|
if all_workers:
|
|
import hashlib
|
|
from datetime import datetime
|
|
# Función para convertir datetime a string para serialización JSON
|
|
def json_serial(obj):
|
|
if isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
raise TypeError(f"Type {type(obj)} not serializable")
|
|
all_configs_json = json.dumps(all_workers, sort_keys=True, default=json_serial)
|
|
current_config_hash = hashlib.md5(all_configs_json.encode()).hexdigest()
|
|
else:
|
|
current_config_hash = None
|
|
|
|
# Si la configuración cambió, sincronizar
|
|
if current_config_hash != last_config_hash:
|
|
if last_config_hash is not None: # Solo loguear si no es la primera vez
|
|
self.logger.info(f"Detectado cambio en workers de MongoDB (usuarios: {len(all_workers)}), sincronizando workers...")
|
|
time.sleep(0.5) # Esperar un poco para asegurar consistencia
|
|
self.sync_workers()
|
|
last_config_hash = current_config_hash
|
|
except ConnectionFailure as e:
|
|
self.logger.error(f"Error leyendo workers de MongoDB: {e}")
|
|
time.sleep(5) # Esperar más tiempo si hay error
|
|
self._db = None # Intentar reconectar
|
|
except Exception as e:
|
|
self.logger.error(f"Error monitoreando workers en MongoDB: {e}")
|
|
time.sleep(5) # Esperar más tiempo si hay error
|
|
|
|
def start_monitoring(self):
|
|
"""Inicia el monitoreo en un thread separado"""
|
|
monitor_thread = threading.Thread(target=self.monitor_workers_mongodb, daemon=True)
|
|
monitor_thread.start()
|
|
return monitor_thread
|
|
|
|
def stop_all(self):
|
|
"""Detiene todos los workers"""
|
|
self._running = False
|
|
worker_ids = list(self._workers.keys())
|
|
for worker_id in worker_ids:
|
|
try:
|
|
with self._lock:
|
|
if worker_id in self._workers:
|
|
worker = self._workers[worker_id]['worker']
|
|
worker.stop()
|
|
future = self._workers[worker_id]['future']
|
|
if not future.done():
|
|
future.cancel()
|
|
del self._workers[worker_id]
|
|
if worker_id in self._worker_to_username:
|
|
del self._worker_to_username[worker_id]
|
|
if worker_id in self._worker_id_to_name:
|
|
del self._worker_id_to_name[worker_id]
|
|
except Exception as e:
|
|
self.logger.error(f"Error deteniendo worker ID '{worker_id}': {e}")
|
|
|
|
self._executor.shutdown(wait=False)
|
|
self.logger.info("Todos los workers detenidos")
|
|
|
|
if __name__ == "__main__":
|
|
initialize_config_files()
|
|
configure_logger()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Cargar configuración de cache y crear ArticleCache
|
|
cache_config = load_cache_config()
|
|
cache_type = cache_config['cache_type']
|
|
cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'}
|
|
article_cache = create_article_cache(cache_type, **cache_kwargs)
|
|
|
|
# Preparar configuración de MongoDB para TelegramManagerFactory
|
|
mongodb_config = {
|
|
'mongodb': {
|
|
'host': cache_config.get('mongodb_host', 'localhost'),
|
|
'port': cache_config.get('mongodb_port', 27017),
|
|
'database': cache_config.get('mongodb_database', 'wallabicher'),
|
|
'username': cache_config.get('mongodb_username'),
|
|
'password': cache_config.get('mongodb_password'),
|
|
'auth_source': cache_config.get('mongodb_auth_source', 'admin')
|
|
}
|
|
}
|
|
|
|
# Crear QueueManager con ArticleCache y configuración de MongoDB
|
|
queue_manager = QueueManager(article_cache, mongodb_config)
|
|
|
|
# Crear WorkerManager (carga workers de todos los usuarios)
|
|
items, general_args = parse_items_to_monitor(None) # None = todos los usuarios
|
|
worker_manager = WorkerManager(general_args, queue_manager)
|
|
|
|
# Sincronizar workers iniciales
|
|
worker_manager.sync_workers()
|
|
|
|
# Iniciar monitoreo de workers en MongoDB (todos los usuarios)
|
|
worker_manager.start_monitoring()
|
|
|
|
try:
|
|
logger.info("Sistema de monitoreo iniciado (todos los usuarios). Esperando cambios en MongoDB...")
|
|
# Mantener el programa corriendo
|
|
while True:
|
|
time.sleep(60)
|
|
# Sincronización periódica por si acaso
|
|
worker_manager.sync_workers()
|
|
except KeyboardInterrupt:
|
|
logger.info("Deteniendo sistema...")
|
|
finally:
|
|
worker_manager.stop_all()
|
|
|