Files
wallabicher/managers/queue_manager.py
Omar Sánchez Pizarro 9939c4d9ed Enhance caching mechanism and logging configuration
- Updated .gitignore to include additional IDE and OS files, as well as log and web build directories.
- Expanded config.sample.yaml to include cache configuration options for memory and Redis.
- Modified wallamonitor.py to load cache configuration and initialize ArticleCache.
- Refactored QueueManager to utilize ArticleCache for tracking notified articles.
- Improved logging setup to dynamically determine log file path based on environment.
2026-01-19 19:42:12 +01:00

74 lines
3.0 KiB
Python

import queue
import threading
import time
import logging
from managers.telegram_manager import TelegramManager
MESSAGE_DELAY = 3.0 # Tiempo de espera entre mensajes en segundos
RETRY_TIMES = 3
class QueueManager:
def __init__(self, article_cache):
self.logger = logging.getLogger(__name__)
self._queue = queue.Queue() # Cola thread-safe
self._telegram_manager = TelegramManager()
self._article_cache = article_cache
self._running = True
# Iniciar el thread de procesamiento
self._processor_thread = threading.Thread(target=self._process_queue, daemon=True)
self._processor_thread.start()
def add_to_queue(self, article, search_name=None, thread_id=None, retry_times=RETRY_TIMES):
# Verificar si el artículo ya ha sido enviado
if self._article_cache.is_article_notified(article):
return
if search_name is None:
search_name = "Unknown"
self._queue.put((search_name, article, thread_id, retry_times))
self.logger.debug(f"Artículo añadido a la cola: {article.get_title()}")
self._article_cache.mark_article_as_notified(article)
def add_to_notified_articles(self, articles):
"""Añade artículos a la lista de artículos ya notificados"""
self._article_cache.mark_articles_as_notified(articles)
def _process_queue(self):
self.logger.info("Procesador de cola: Iniciado")
while self._running:
try:
# Esperar hasta que haya un elemento en la cola (timeout de 1 segundo)
try:
search_name, article, thread_id, retry_times = self._queue.get(timeout=1.0)
except queue.Empty:
continue
# Procesar el artículo
try:
self._telegram_manager.send_telegram_article(search_name, article, thread_id)
except Exception as e:
self.logger.error(f"Error al enviar artículo a Telegram: {e}")
if retry_times > 0:
self._queue.put((search_name, article, thread_id, retry_times - 1))
else:
self.logger.error(f"Artículo no enviado después de {RETRY_TIMES} intentos")
finally:
self._queue.task_done()
# Esperar antes de procesar el siguiente mensaje
if not self._queue.empty():
self.logger.debug(f"Esperando {MESSAGE_DELAY}s antes del próximo mensaje")
time.sleep(MESSAGE_DELAY)
except Exception as e:
self.logger.error(f"Error en el procesador de cola: {e}")
time.sleep(1.0)
def stop(self):
"""Detiene el procesador de cola"""
self.logger.info("Deteniendo procesador de cola...")
self._running = False
self._processor_thread.join(timeout=5.0)