rename main
This commit is contained in:
@@ -1,367 +1,367 @@
|
||||
import json
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from concurrent.futures import ThreadPoolExecutor, Future
|
||||
import os
|
||||
import shutil
|
||||
import yaml
|
||||
import time
|
||||
import threading
|
||||
|
||||
from datalayer.item_monitor import ItemMonitor
|
||||
from datalayer.general_monitor import GeneralMonitor
|
||||
from managers.worker import Worker
|
||||
from managers.queue_manager import QueueManager
|
||||
from managers.article_cache import create_article_cache
|
||||
|
||||
def initialize_config_files():
|
||||
"""
|
||||
Inicializa los archivos de configuración si no existen,
|
||||
copiando los archivos .sample correspondientes.
|
||||
"""
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
config_files = [
|
||||
('config.yaml', 'config.sample.yaml'),
|
||||
('workers.json', 'workers.sample.json')
|
||||
]
|
||||
|
||||
for config_file, sample_file in config_files:
|
||||
config_path = os.path.join(base_dir, config_file)
|
||||
sample_path = os.path.join(base_dir, sample_file)
|
||||
|
||||
if not os.path.exists(config_path):
|
||||
if os.path.exists(sample_path):
|
||||
shutil.copy2(sample_path, config_path)
|
||||
print(f"✓ Archivo '{config_file}' creado desde '{sample_file}'")
|
||||
print(f" Por favor, edita '{config_file}' con tu configuración antes de continuar.")
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"No se encontró ni '{config_file}' ni '{sample_file}'. "
|
||||
f"Por favor, crea uno de estos archivos."
|
||||
)
|
||||
|
||||
def configure_logger():
|
||||
import os
|
||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setLevel(logging.INFO)
|
||||
console_handler.setFormatter(logging.Formatter('%(levelname)s [%(asctime)s] %(name)s - %(message)s'))
|
||||
|
||||
# Determinar la ruta del archivo de log
|
||||
# En Docker, usar /app/logs si existe, sino usar el directorio actual
|
||||
if os.path.isdir('/app/logs'):
|
||||
log_path = '/app/logs/monitor.log'
|
||||
else:
|
||||
log_path = 'monitor.log'
|
||||
|
||||
# Asegurarse de que el directorio existe
|
||||
log_dir = os.path.dirname(log_path)
|
||||
if log_dir and not os.path.exists(log_dir):
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
|
||||
file_handler = RotatingFileHandler(log_path, maxBytes=10e6)
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||||
|
||||
# Configure the root logger with both handlers
|
||||
logging.basicConfig(level=logging.NOTSET,
|
||||
handlers=[console_handler, file_handler])
|
||||
|
||||
def parse_items_to_monitor():
|
||||
import os
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
workers_path = os.path.join(base_dir, "workers.json")
|
||||
with open(workers_path) as f:
|
||||
args = json.load(f)
|
||||
if 'items' not in args:
|
||||
raise ValueError("Missing mandatory field: items")
|
||||
items = [ItemMonitor.load_from_json(item) for item in args['items']]
|
||||
general_args = GeneralMonitor.load_from_json(args['general'])
|
||||
return items, general_args
|
||||
|
||||
def load_cache_config():
|
||||
"""Carga la configuración del cache desde config.yaml"""
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
config_path = os.path.join(base_dir, "config.yaml")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
with open(config_path, 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
cache_config = config.get('cache', {})
|
||||
cache_type = cache_config.get('type', 'redis')
|
||||
|
||||
if cache_type == 'redis':
|
||||
redis_config = cache_config.get('redis', {})
|
||||
return {
|
||||
'cache_type': 'redis',
|
||||
'redis_host': redis_config.get('host', 'localhost'),
|
||||
'redis_port': redis_config.get('port', 6379),
|
||||
'redis_db': redis_config.get('db', 0),
|
||||
'redis_password': redis_config.get('password')
|
||||
}
|
||||
else:
|
||||
logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'redis' por defecto")
|
||||
return {
|
||||
'cache_type': 'redis',
|
||||
'redis_host': 'localhost',
|
||||
'redis_port': 6379,
|
||||
'redis_db': 0,
|
||||
'redis_password': None
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Error cargando configuración de cache, usando valores por defecto (redis): {e}")
|
||||
return {
|
||||
'cache_type': 'redis',
|
||||
'redis_host': 'localhost',
|
||||
'redis_port': 6379,
|
||||
'redis_db': 0,
|
||||
'redis_password': None
|
||||
}
|
||||
|
||||
class WorkerManager:
|
||||
"""Gestiona workers dinámicamente, iniciando y deteniendo según workers.json"""
|
||||
|
||||
def __init__(self, general_args, queue_manager):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._general_args = general_args
|
||||
self._queue_manager = queue_manager
|
||||
self._executor = ThreadPoolExecutor(max_workers=1000)
|
||||
self._workers = {} # Dict: worker_name -> {'worker': Worker, 'future': Future, 'item': ItemMonitor}
|
||||
self._running = True
|
||||
self._lock = threading.Lock()
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
self._workers_path = os.path.join(base_dir, "workers.json")
|
||||
|
||||
def load_workers_config(self):
|
||||
"""Carga la configuración de workers desde workers.json"""
|
||||
try:
|
||||
with open(self._workers_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
disabled = set(config.get('disabled', []))
|
||||
items = []
|
||||
for item_data in config.get('items', []):
|
||||
item = ItemMonitor.load_from_json(item_data)
|
||||
items.append((item, item.get_name() not in disabled))
|
||||
general = GeneralMonitor.load_from_json(config.get('general', {}))
|
||||
return items, general, disabled
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error cargando workers.json: {e}")
|
||||
return [], GeneralMonitor([], [], [], [], []), set()
|
||||
|
||||
def start_worker(self, item, general_args):
|
||||
"""Inicia un worker"""
|
||||
try:
|
||||
worker = Worker(item, general_args, self._queue_manager)
|
||||
future = self._executor.submit(worker.run)
|
||||
self._workers[item.get_name()] = {
|
||||
'worker': worker,
|
||||
'future': future,
|
||||
'item': item
|
||||
}
|
||||
self.logger.info(f"Worker '{item.get_name()}' iniciado")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error iniciando worker '{item.get_name()}': {e}")
|
||||
return False
|
||||
|
||||
def stop_worker(self, worker_name):
|
||||
"""Detiene un worker"""
|
||||
if worker_name not in self._workers:
|
||||
return False
|
||||
|
||||
try:
|
||||
worker_data = self._workers[worker_name]
|
||||
worker = worker_data['worker']
|
||||
future = worker_data['future']
|
||||
|
||||
# Detener el worker
|
||||
worker.stop()
|
||||
|
||||
# Intentar cancelar el future si aún no está ejecutándose
|
||||
if not future.done():
|
||||
future.cancel()
|
||||
|
||||
# No esperamos a que termine, se detendrá automáticamente
|
||||
del self._workers[worker_name]
|
||||
self.logger.info(f"Worker '{worker_name}' detenido")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error deteniendo worker '{worker_name}': {e}")
|
||||
# Asegurarse de eliminar la entrada aunque haya error
|
||||
if worker_name in self._workers:
|
||||
del self._workers[worker_name]
|
||||
return False
|
||||
|
||||
def sync_workers(self):
|
||||
"""Sincroniza los workers con workers.json"""
|
||||
items, general_args, disabled = self.load_workers_config()
|
||||
|
||||
# Actualizar general_args en todos los workers activos
|
||||
old_general_args = self._general_args
|
||||
self._general_args = general_args
|
||||
|
||||
current_worker_names = set(self._workers.keys())
|
||||
enabled_worker_names = {item.get_name() for item, enabled in items if enabled}
|
||||
|
||||
# Actualizar general_args en workers existentes
|
||||
with self._lock:
|
||||
for worker_data in self._workers.values():
|
||||
worker_data['worker'].update_general_args(general_args)
|
||||
|
||||
# Detener workers que ya no existen o están deshabilitados
|
||||
to_stop = current_worker_names - enabled_worker_names
|
||||
for worker_name in list(to_stop):
|
||||
with self._lock:
|
||||
if worker_name in self._workers:
|
||||
worker = self._workers[worker_name]['worker']
|
||||
worker.stop()
|
||||
future = self._workers[worker_name]['future']
|
||||
if not future.done():
|
||||
future.cancel()
|
||||
del self._workers[worker_name]
|
||||
self.logger.info(f"Worker '{worker_name}' detenido")
|
||||
|
||||
# Iniciar workers nuevos o que se hayan habilitado
|
||||
for item, enabled in items:
|
||||
worker_name = item.get_name()
|
||||
if enabled and worker_name not in current_worker_names:
|
||||
# Nuevo worker o recién activado
|
||||
with self._lock:
|
||||
if worker_name not in self._workers:
|
||||
self.start_worker(item, general_args)
|
||||
elif enabled and worker_name in current_worker_names:
|
||||
# Worker existente, verificar si hay cambios significativos
|
||||
needs_restart = False
|
||||
with self._lock:
|
||||
if worker_name in self._workers:
|
||||
old_item = self._workers[worker_name]['item']
|
||||
# Comparar si hay cambios significativos
|
||||
if self._has_changes(old_item, item):
|
||||
self.logger.info(f"Reiniciando worker '{worker_name}' por cambios en configuración")
|
||||
worker = self._workers[worker_name]['worker']
|
||||
worker.stop()
|
||||
future = self._workers[worker_name]['future']
|
||||
if not future.done():
|
||||
future.cancel()
|
||||
del self._workers[worker_name]
|
||||
needs_restart = True
|
||||
else:
|
||||
# Actualizar la referencia al item sin reiniciar
|
||||
self._workers[worker_name]['item'] = item
|
||||
# Actualizar general_args en el worker
|
||||
self._workers[worker_name]['worker'].update_general_args(general_args)
|
||||
# Reiniciar fuera del lock para evitar deadlocks
|
||||
if needs_restart:
|
||||
time.sleep(0.5) # Dar tiempo para detener
|
||||
with self._lock:
|
||||
if worker_name not in self._workers: # Verificar que no se haya añadido en otro thread
|
||||
self.start_worker(item, general_args)
|
||||
|
||||
def _has_changes(self, old_item, new_item):
|
||||
"""Verifica si hay cambios significativos entre dos items"""
|
||||
# Comparar campos importantes
|
||||
return (
|
||||
old_item.get_search_query() != new_item.get_search_query() or
|
||||
old_item.get_min_price() != new_item.get_min_price() or
|
||||
old_item.get_max_price() != new_item.get_max_price() or
|
||||
old_item.get_thread_id() != new_item.get_thread_id() or
|
||||
old_item.get_platform() != new_item.get_platform() or
|
||||
old_item.get_check_every() != new_item.get_check_every() or
|
||||
old_item.get_latitude() != new_item.get_latitude() or
|
||||
old_item.get_longitude() != new_item.get_longitude() or
|
||||
old_item.get_max_distance() != new_item.get_max_distance()
|
||||
)
|
||||
|
||||
def monitor_workers_file(self):
|
||||
"""Monitorea el archivo workers.json y sincroniza workers usando polling"""
|
||||
self.logger.info("Iniciando monitor de workers.json...")
|
||||
last_mtime = 0
|
||||
|
||||
try:
|
||||
if os.path.exists(self._workers_path):
|
||||
last_mtime = os.path.getmtime(self._workers_path)
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Error obteniendo mtime inicial: {e}")
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
time.sleep(2) # Verificar cada 2 segundos
|
||||
|
||||
if not os.path.exists(self._workers_path):
|
||||
continue
|
||||
|
||||
current_mtime = os.path.getmtime(self._workers_path)
|
||||
if current_mtime != last_mtime:
|
||||
self.logger.info("Detectado cambio en workers.json, sincronizando workers...")
|
||||
time.sleep(0.5) # Esperar un poco para que se termine de escribir el archivo
|
||||
self.sync_workers()
|
||||
last_mtime = current_mtime
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error monitoreando workers.json: {e}")
|
||||
time.sleep(5) # Esperar más tiempo si hay error
|
||||
|
||||
def start_monitoring(self):
|
||||
"""Inicia el monitoreo en un thread separado"""
|
||||
monitor_thread = threading.Thread(target=self.monitor_workers_file, daemon=True)
|
||||
monitor_thread.start()
|
||||
return monitor_thread
|
||||
|
||||
def stop_all(self):
|
||||
"""Detiene todos los workers"""
|
||||
self._running = False
|
||||
worker_names = list(self._workers.keys())
|
||||
for worker_name in worker_names:
|
||||
try:
|
||||
with self._lock:
|
||||
if worker_name in self._workers:
|
||||
worker = self._workers[worker_name]['worker']
|
||||
worker.stop()
|
||||
future = self._workers[worker_name]['future']
|
||||
if not future.done():
|
||||
future.cancel()
|
||||
del self._workers[worker_name]
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error deteniendo worker '{worker_name}': {e}")
|
||||
|
||||
self._executor.shutdown(wait=False)
|
||||
self.logger.info("Todos los workers detenidos")
|
||||
|
||||
if __name__ == "__main__":
|
||||
initialize_config_files()
|
||||
configure_logger()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Cargar configuración de cache y crear ArticleCache
|
||||
cache_config = load_cache_config()
|
||||
cache_type = cache_config['cache_type']
|
||||
cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'}
|
||||
article_cache = create_article_cache(cache_type, **cache_kwargs)
|
||||
|
||||
# Crear QueueManager con ArticleCache
|
||||
queue_manager = QueueManager(article_cache)
|
||||
|
||||
# Crear WorkerManager
|
||||
items, general_args = parse_items_to_monitor()
|
||||
worker_manager = WorkerManager(general_args, queue_manager)
|
||||
|
||||
# Sincronizar workers iniciales
|
||||
worker_manager.sync_workers()
|
||||
|
||||
# Iniciar monitoreo del archivo workers.json
|
||||
worker_manager.start_monitoring()
|
||||
|
||||
try:
|
||||
logger.info("Sistema de monitoreo iniciado. Esperando cambios en workers.json...")
|
||||
# Mantener el programa corriendo
|
||||
while True:
|
||||
time.sleep(60)
|
||||
# Sincronización periódica por si acaso
|
||||
worker_manager.sync_workers()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Deteniendo sistema...")
|
||||
finally:
|
||||
worker_manager.stop_all()
|
||||
|
||||
import json
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from concurrent.futures import ThreadPoolExecutor, Future
|
||||
import os
|
||||
import shutil
|
||||
import yaml
|
||||
import time
|
||||
import threading
|
||||
|
||||
from datalayer.item_monitor import ItemMonitor
|
||||
from datalayer.general_monitor import GeneralMonitor
|
||||
from managers.worker import Worker
|
||||
from managers.queue_manager import QueueManager
|
||||
from managers.article_cache import create_article_cache
|
||||
|
||||
def initialize_config_files():
|
||||
"""
|
||||
Inicializa los archivos de configuración si no existen,
|
||||
copiando los archivos .sample correspondientes.
|
||||
"""
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
config_files = [
|
||||
('config.yaml', 'config.sample.yaml'),
|
||||
('workers.json', 'workers.sample.json')
|
||||
]
|
||||
|
||||
for config_file, sample_file in config_files:
|
||||
config_path = os.path.join(base_dir, config_file)
|
||||
sample_path = os.path.join(base_dir, sample_file)
|
||||
|
||||
if not os.path.exists(config_path):
|
||||
if os.path.exists(sample_path):
|
||||
shutil.copy2(sample_path, config_path)
|
||||
print(f"✓ Archivo '{config_file}' creado desde '{sample_file}'")
|
||||
print(f" Por favor, edita '{config_file}' con tu configuración antes de continuar.")
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"No se encontró ni '{config_file}' ni '{sample_file}'. "
|
||||
f"Por favor, crea uno de estos archivos."
|
||||
)
|
||||
|
||||
def configure_logger():
|
||||
import os
|
||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setLevel(logging.INFO)
|
||||
console_handler.setFormatter(logging.Formatter('%(levelname)s [%(asctime)s] %(name)s - %(message)s'))
|
||||
|
||||
# Determinar la ruta del archivo de log
|
||||
# En Docker, usar /app/logs si existe, sino usar el directorio actual
|
||||
if os.path.isdir('/app/logs'):
|
||||
log_path = '/app/logs/monitor.log'
|
||||
else:
|
||||
log_path = 'monitor.log'
|
||||
|
||||
# Asegurarse de que el directorio existe
|
||||
log_dir = os.path.dirname(log_path)
|
||||
if log_dir and not os.path.exists(log_dir):
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
|
||||
file_handler = RotatingFileHandler(log_path, maxBytes=10e6)
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||||
|
||||
# Configure the root logger with both handlers
|
||||
logging.basicConfig(level=logging.NOTSET,
|
||||
handlers=[console_handler, file_handler])
|
||||
|
||||
def parse_items_to_monitor():
|
||||
import os
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
workers_path = os.path.join(base_dir, "workers.json")
|
||||
with open(workers_path) as f:
|
||||
args = json.load(f)
|
||||
if 'items' not in args:
|
||||
raise ValueError("Missing mandatory field: items")
|
||||
items = [ItemMonitor.load_from_json(item) for item in args['items']]
|
||||
general_args = GeneralMonitor.load_from_json(args['general'])
|
||||
return items, general_args
|
||||
|
||||
def load_cache_config():
|
||||
"""Carga la configuración del cache desde config.yaml"""
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
config_path = os.path.join(base_dir, "config.yaml")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
with open(config_path, 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
cache_config = config.get('cache', {})
|
||||
cache_type = cache_config.get('type', 'redis')
|
||||
|
||||
if cache_type == 'redis':
|
||||
redis_config = cache_config.get('redis', {})
|
||||
return {
|
||||
'cache_type': 'redis',
|
||||
'redis_host': redis_config.get('host', 'localhost'),
|
||||
'redis_port': redis_config.get('port', 6379),
|
||||
'redis_db': redis_config.get('db', 0),
|
||||
'redis_password': redis_config.get('password')
|
||||
}
|
||||
else:
|
||||
logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'redis' por defecto")
|
||||
return {
|
||||
'cache_type': 'redis',
|
||||
'redis_host': 'localhost',
|
||||
'redis_port': 6379,
|
||||
'redis_db': 0,
|
||||
'redis_password': None
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Error cargando configuración de cache, usando valores por defecto (redis): {e}")
|
||||
return {
|
||||
'cache_type': 'redis',
|
||||
'redis_host': 'localhost',
|
||||
'redis_port': 6379,
|
||||
'redis_db': 0,
|
||||
'redis_password': None
|
||||
}
|
||||
|
||||
class WorkerManager:
|
||||
"""Gestiona workers dinámicamente, iniciando y deteniendo según workers.json"""
|
||||
|
||||
def __init__(self, general_args, queue_manager):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._general_args = general_args
|
||||
self._queue_manager = queue_manager
|
||||
self._executor = ThreadPoolExecutor(max_workers=1000)
|
||||
self._workers = {} # Dict: worker_name -> {'worker': Worker, 'future': Future, 'item': ItemMonitor}
|
||||
self._running = True
|
||||
self._lock = threading.Lock()
|
||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
self._workers_path = os.path.join(base_dir, "workers.json")
|
||||
|
||||
def load_workers_config(self):
|
||||
"""Carga la configuración de workers desde workers.json"""
|
||||
try:
|
||||
with open(self._workers_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
disabled = set(config.get('disabled', []))
|
||||
items = []
|
||||
for item_data in config.get('items', []):
|
||||
item = ItemMonitor.load_from_json(item_data)
|
||||
items.append((item, item.get_name() not in disabled))
|
||||
general = GeneralMonitor.load_from_json(config.get('general', {}))
|
||||
return items, general, disabled
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error cargando workers.json: {e}")
|
||||
return [], GeneralMonitor([], [], [], [], []), set()
|
||||
|
||||
def start_worker(self, item, general_args):
|
||||
"""Inicia un worker"""
|
||||
try:
|
||||
worker = Worker(item, general_args, self._queue_manager)
|
||||
future = self._executor.submit(worker.run)
|
||||
self._workers[item.get_name()] = {
|
||||
'worker': worker,
|
||||
'future': future,
|
||||
'item': item
|
||||
}
|
||||
self.logger.info(f"Worker '{item.get_name()}' iniciado")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error iniciando worker '{item.get_name()}': {e}")
|
||||
return False
|
||||
|
||||
def stop_worker(self, worker_name):
|
||||
"""Detiene un worker"""
|
||||
if worker_name not in self._workers:
|
||||
return False
|
||||
|
||||
try:
|
||||
worker_data = self._workers[worker_name]
|
||||
worker = worker_data['worker']
|
||||
future = worker_data['future']
|
||||
|
||||
# Detener el worker
|
||||
worker.stop()
|
||||
|
||||
# Intentar cancelar el future si aún no está ejecutándose
|
||||
if not future.done():
|
||||
future.cancel()
|
||||
|
||||
# No esperamos a que termine, se detendrá automáticamente
|
||||
del self._workers[worker_name]
|
||||
self.logger.info(f"Worker '{worker_name}' detenido")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error deteniendo worker '{worker_name}': {e}")
|
||||
# Asegurarse de eliminar la entrada aunque haya error
|
||||
if worker_name in self._workers:
|
||||
del self._workers[worker_name]
|
||||
return False
|
||||
|
||||
def sync_workers(self):
|
||||
"""Sincroniza los workers con workers.json"""
|
||||
items, general_args, disabled = self.load_workers_config()
|
||||
|
||||
# Actualizar general_args en todos los workers activos
|
||||
old_general_args = self._general_args
|
||||
self._general_args = general_args
|
||||
|
||||
current_worker_names = set(self._workers.keys())
|
||||
enabled_worker_names = {item.get_name() for item, enabled in items if enabled}
|
||||
|
||||
# Actualizar general_args en workers existentes
|
||||
with self._lock:
|
||||
for worker_data in self._workers.values():
|
||||
worker_data['worker'].update_general_args(general_args)
|
||||
|
||||
# Detener workers que ya no existen o están deshabilitados
|
||||
to_stop = current_worker_names - enabled_worker_names
|
||||
for worker_name in list(to_stop):
|
||||
with self._lock:
|
||||
if worker_name in self._workers:
|
||||
worker = self._workers[worker_name]['worker']
|
||||
worker.stop()
|
||||
future = self._workers[worker_name]['future']
|
||||
if not future.done():
|
||||
future.cancel()
|
||||
del self._workers[worker_name]
|
||||
self.logger.info(f"Worker '{worker_name}' detenido")
|
||||
|
||||
# Iniciar workers nuevos o que se hayan habilitado
|
||||
for item, enabled in items:
|
||||
worker_name = item.get_name()
|
||||
if enabled and worker_name not in current_worker_names:
|
||||
# Nuevo worker o recién activado
|
||||
with self._lock:
|
||||
if worker_name not in self._workers:
|
||||
self.start_worker(item, general_args)
|
||||
elif enabled and worker_name in current_worker_names:
|
||||
# Worker existente, verificar si hay cambios significativos
|
||||
needs_restart = False
|
||||
with self._lock:
|
||||
if worker_name in self._workers:
|
||||
old_item = self._workers[worker_name]['item']
|
||||
# Comparar si hay cambios significativos
|
||||
if self._has_changes(old_item, item):
|
||||
self.logger.info(f"Reiniciando worker '{worker_name}' por cambios en configuración")
|
||||
worker = self._workers[worker_name]['worker']
|
||||
worker.stop()
|
||||
future = self._workers[worker_name]['future']
|
||||
if not future.done():
|
||||
future.cancel()
|
||||
del self._workers[worker_name]
|
||||
needs_restart = True
|
||||
else:
|
||||
# Actualizar la referencia al item sin reiniciar
|
||||
self._workers[worker_name]['item'] = item
|
||||
# Actualizar general_args en el worker
|
||||
self._workers[worker_name]['worker'].update_general_args(general_args)
|
||||
# Reiniciar fuera del lock para evitar deadlocks
|
||||
if needs_restart:
|
||||
time.sleep(0.5) # Dar tiempo para detener
|
||||
with self._lock:
|
||||
if worker_name not in self._workers: # Verificar que no se haya añadido en otro thread
|
||||
self.start_worker(item, general_args)
|
||||
|
||||
def _has_changes(self, old_item, new_item):
|
||||
"""Verifica si hay cambios significativos entre dos items"""
|
||||
# Comparar campos importantes
|
||||
return (
|
||||
old_item.get_search_query() != new_item.get_search_query() or
|
||||
old_item.get_min_price() != new_item.get_min_price() or
|
||||
old_item.get_max_price() != new_item.get_max_price() or
|
||||
old_item.get_thread_id() != new_item.get_thread_id() or
|
||||
old_item.get_platform() != new_item.get_platform() or
|
||||
old_item.get_check_every() != new_item.get_check_every() or
|
||||
old_item.get_latitude() != new_item.get_latitude() or
|
||||
old_item.get_longitude() != new_item.get_longitude() or
|
||||
old_item.get_max_distance() != new_item.get_max_distance()
|
||||
)
|
||||
|
||||
def monitor_workers_file(self):
|
||||
"""Monitorea el archivo workers.json y sincroniza workers usando polling"""
|
||||
self.logger.info("Iniciando monitor de workers.json...")
|
||||
last_mtime = 0
|
||||
|
||||
try:
|
||||
if os.path.exists(self._workers_path):
|
||||
last_mtime = os.path.getmtime(self._workers_path)
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Error obteniendo mtime inicial: {e}")
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
time.sleep(2) # Verificar cada 2 segundos
|
||||
|
||||
if not os.path.exists(self._workers_path):
|
||||
continue
|
||||
|
||||
current_mtime = os.path.getmtime(self._workers_path)
|
||||
if current_mtime != last_mtime:
|
||||
self.logger.info("Detectado cambio en workers.json, sincronizando workers...")
|
||||
time.sleep(0.5) # Esperar un poco para que se termine de escribir el archivo
|
||||
self.sync_workers()
|
||||
last_mtime = current_mtime
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error monitoreando workers.json: {e}")
|
||||
time.sleep(5) # Esperar más tiempo si hay error
|
||||
|
||||
def start_monitoring(self):
|
||||
"""Inicia el monitoreo en un thread separado"""
|
||||
monitor_thread = threading.Thread(target=self.monitor_workers_file, daemon=True)
|
||||
monitor_thread.start()
|
||||
return monitor_thread
|
||||
|
||||
def stop_all(self):
|
||||
"""Detiene todos los workers"""
|
||||
self._running = False
|
||||
worker_names = list(self._workers.keys())
|
||||
for worker_name in worker_names:
|
||||
try:
|
||||
with self._lock:
|
||||
if worker_name in self._workers:
|
||||
worker = self._workers[worker_name]['worker']
|
||||
worker.stop()
|
||||
future = self._workers[worker_name]['future']
|
||||
if not future.done():
|
||||
future.cancel()
|
||||
del self._workers[worker_name]
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error deteniendo worker '{worker_name}': {e}")
|
||||
|
||||
self._executor.shutdown(wait=False)
|
||||
self.logger.info("Todos los workers detenidos")
|
||||
|
||||
if __name__ == "__main__":
|
||||
initialize_config_files()
|
||||
configure_logger()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Cargar configuración de cache y crear ArticleCache
|
||||
cache_config = load_cache_config()
|
||||
cache_type = cache_config['cache_type']
|
||||
cache_kwargs = {k: v for k, v in cache_config.items() if k != 'cache_type'}
|
||||
article_cache = create_article_cache(cache_type, **cache_kwargs)
|
||||
|
||||
# Crear QueueManager con ArticleCache
|
||||
queue_manager = QueueManager(article_cache)
|
||||
|
||||
# Crear WorkerManager
|
||||
items, general_args = parse_items_to_monitor()
|
||||
worker_manager = WorkerManager(general_args, queue_manager)
|
||||
|
||||
# Sincronizar workers iniciales
|
||||
worker_manager.sync_workers()
|
||||
|
||||
# Iniciar monitoreo del archivo workers.json
|
||||
worker_manager.start_monitoring()
|
||||
|
||||
try:
|
||||
logger.info("Sistema de monitoreo iniciado. Esperando cambios en workers.json...")
|
||||
# Mantener el programa corriendo
|
||||
while True:
|
||||
time.sleep(60)
|
||||
# Sincronización periódica por si acaso
|
||||
worker_manager.sync_workers()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Deteniendo sistema...")
|
||||
finally:
|
||||
worker_manager.stop_all()
|
||||
|
||||
Reference in New Issue
Block a user