Files
wallabicher/managers/queue_manager.py
Omar Sánchez Pizarro fa79e53950 inline keyboard and move to channel with threads
Signed-off-by: Omar Sánchez Pizarro <omar.sanchez@pistacero.net>
2025-10-10 02:45:55 +02:00

78 lines
3.1 KiB
Python

import queue
import threading
import time
import logging
from managers.telegram_manager import TelegramManager
MESSAGE_DELAY = 3.0 # Tiempo de espera entre mensajes en segundos
NOTIFIED_ARTICLES_LIMIT = 300 # Límite de artículos notificados a mantener en memoria
class QueueManager:
def __init__(self):
self.logger = logging.getLogger(__name__)
self._queue = queue.Queue() # Cola thread-safe
self._notified_articles = []
self._telegram_manager = TelegramManager()
self._running = True
# Iniciar el thread de procesamiento
self._processor_thread = threading.Thread(target=self._process_queue, daemon=True)
self._processor_thread.start()
def add_to_queue(self, article, search_name=None, thread_id=None):
# Verificar si el artículo ya ha sido enviado
if article in self._notified_articles:
return
if search_name is None:
search_name = "Unknown"
self._queue.put((search_name, article, thread_id))
self.logger.debug(f"Artículo añadido a la cola: {article.get_title()}")
self.add_to_notified_articles(article)
def add_to_notified_articles(self, articles):
"""Añade artículos a la lista de artículos ya notificados"""
if isinstance(articles, list):
self._notified_articles.extend(articles)
else:
self._notified_articles.append(articles)
def _process_queue(self):
self.logger.info("Procesador de cola: Iniciado")
while self._running:
try:
# Esperar hasta que haya un elemento en la cola (timeout de 1 segundo)
try:
search_name, article, thread_id = self._queue.get(timeout=1.0)
except queue.Empty:
continue
# Procesar el artículo
try:
self._telegram_manager.send_telegram_article(search_name, article, thread_id)
# Mantener solo los primeros NOTIFIED_ARTICLES_LIMIT artículos después de enviar
if len(self._notified_articles) > NOTIFIED_ARTICLES_LIMIT:
self._notified_articles = self._notified_articles[:NOTIFIED_ARTICLES_LIMIT]
except Exception as e:
self.logger.error(f"Error al enviar artículo a Telegram: {e}")
finally:
self._queue.task_done()
# Esperar antes de procesar el siguiente mensaje
if not self._queue.empty():
self.logger.debug(f"Esperando {MESSAGE_DELAY}s antes del próximo mensaje")
time.sleep(MESSAGE_DELAY)
except Exception as e:
self.logger.error(f"Error en el procesador de cola: {e}")
time.sleep(1.0)
def stop(self):
"""Detiene el procesador de cola"""
self.logger.info("Deteniendo procesador de cola...")
self._running = False
self._processor_thread.join(timeout=5.0)