feat: default values, general item excludes, images, queueManager to manage multi worker messaging to telegram to prevent too many connections

Signed-off-by: Omar Sánchez Pizarro <omar.sanchez@pistacero.net>
This commit is contained in:
Omar Sánchez Pizarro
2025-10-10 00:03:44 +02:00
parent 08c1577b2a
commit 0245b603b2
9 changed files with 275 additions and 114 deletions

79
managers/queue_manager.py Normal file
View File

@@ -0,0 +1,79 @@
import queue
import threading
import time
import logging
from managers.telegram_manager import TelegramManager
MESSAGE_DELAY = 3.0 # Tiempo de espera entre mensajes en segundos
NOTIFIED_ARTICLES_LIMIT = 300 # Límite de artículos notificados a mantener en memoria
class QueueManager:
def __init__(self):
self.logger = logging.getLogger(__name__)
self._queue = queue.Queue() # Cola thread-safe
self._notified_articles = []
self._telegram_manager = TelegramManager()
self._running = True
# Iniciar el thread de procesamiento
self._processor_thread = threading.Thread(target=self._process_queue, daemon=True)
self._processor_thread.start()
def add_to_queue(self, article, search_name=None):
# Verificar si el artículo ya ha sido enviado
if article in self._notified_articles:
return
if search_name is None:
search_name = "Unknown"
self._queue.put((search_name, article))
self.logger.debug(f"Artículo añadido a la cola: {article.get_title()}")
self.add_to_notified_articles(article)
def add_to_notified_articles(self, articles):
"""Añade artículos a la lista de artículos ya notificados"""
if isinstance(articles, list):
self._notified_articles.extend(articles)
else:
self._notified_articles.append(articles)
def _process_queue(self):
self.logger.info("Procesador de cola: Iniciado")
while self._running:
try:
# Esperar hasta que haya un elemento en la cola (timeout de 1 segundo)
try:
search_name, article = self._queue.get(timeout=1.0)
except queue.Empty:
continue
# Procesar el artículo
try:
self._telegram_manager.send_telegram_article(search_name, article)
self.logger.info(f"Artículo enviado a Telegram: {article.get_title()} de {search_name}")
# Mantener solo los primeros NOTIFIED_ARTICLES_LIMIT artículos después de enviar
if len(self._notified_articles) > NOTIFIED_ARTICLES_LIMIT:
self._notified_articles = self._notified_articles[:NOTIFIED_ARTICLES_LIMIT]
except Exception as e:
self.logger.error(f"Error al enviar artículo a Telegram: {e}")
finally:
self._queue.task_done()
# Esperar antes de procesar el siguiente mensaje
if not self._queue.empty():
self.logger.debug(f"Esperando {MESSAGE_DELAY}s antes del próximo mensaje")
time.sleep(MESSAGE_DELAY)
except Exception as e:
self.logger.error(f"Error en el procesador de cola: {e}")
time.sleep(1.0)
def stop(self):
"""Detiene el procesador de cola"""
self.logger.info("Deteniendo procesador de cola...")
self._running = False
self._processor_thread.join(timeout=5.0)

View File

@@ -1,21 +1,38 @@
import asyncio
import yaml
import telegram
import re
import html
import telegram.ext
ITEM_TEXT = "- *Artículo*: {}\n" \
"- *Descripción*: {}\n" \
"- *Localidad*: {}\n" \
"- *Precio*: {} {}\n" \
"- *Acepta envíos*: {}\n" \
"[Ir al anuncio](https://es.wallapop.com/item/{})"
ITEM_HTML = """
<b>Resultados para:</b> {search_name}
<b>Artículo:</b> {title}
{description}
<b>Localidad:</b> {location}
<b>Acepta envíos:</b> {shipping}
<b>Modificado el:</b> {modified_at}
<b>{price} {currency}</b>
<a href="https://es.wallapop.com/item/{url}">Ir al anuncio</a>
"""
class TelegramManager:
def __init__(self):
token, channel = self.get_config()
token, channel, chat_id = self.get_config()
self._channel = channel
self._bot = telegram.Bot(token=token)
self._chat_id = chat_id
# Use ApplicationBuilder to create the bot application with increased timeouts
self._application = telegram.ext.ApplicationBuilder() \
.token(token) \
.connect_timeout(60) \
.read_timeout(60) \
.write_timeout(60) \
.build()
self._bot = self._application.bot
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
@@ -25,21 +42,47 @@ class TelegramManager:
config = yaml.safe_load(file)
token = config['telegram_token']
telegram_channel = config['telegram_channel']
return token, telegram_channel
telegram_chat_id = config['telegram_chat_id']
return token, telegram_channel, telegram_chat_id
def escape_markdown(self, text):
special_chars = r'_[\]()~`>#\+\-=|{}.!]'
escaped_text = re.sub(f'([{re.escape(special_chars)}])', r'\\\1', text)
return escaped_text
def escape_html(self, text):
return html.escape(str(text))
def send_telegram_article(self, article):
self._loop.run_until_complete(self.send_telegram_article_async(article))
def send_telegram_article(self, search_name, article):
self._loop.run_until_complete(self.send_telegram_article_async(search_name, article))
async def send_telegram_article_async(self, article):
message = ITEM_TEXT.format(article.get_title(), self.escape_markdown(article.get_description()),
self.escape_markdown(article.get_location()), article.get_price(),
article.get_currency(), article.get_allows_shipping(),
article.get_url())
escaped_message = self.escape_markdown(message)
await self._bot.send_message(self._channel, text=escaped_message, parse_mode="MarkdownV2")
async def send_telegram_article_async(self, search_name, article):
message = ITEM_HTML.format(
search_name=self.escape_html(search_name),
title=self.escape_html(article.get_title()),
description=self.escape_html(article.get_description()),
location=self.escape_html(article.get_location()),
price=self.escape_html(article.get_price()),
currency=self.escape_html(article.get_currency()),
shipping=self.escape_html(article.get_allows_shipping()),
modified_at=self.escape_html(article.get_modified_at()),
url=self.escape_html(article.get_url())
)
images_url = article.get_images()
media = []
for idx, image_url in enumerate(images_url):
if idx == 0:
media.append(
telegram.InputMediaPhoto(
media=image_url,
caption=message,
parse_mode="HTML"
)
)
else:
media.append(
telegram.InputMediaPhoto(
media=image_url
)
)
await self._bot.send_media_group(
chat_id=self._chat_id,
media=media
)

View File

@@ -2,21 +2,19 @@ import time
import requests
import logging
from datalayer.wallapop_article import WallapopArticle
from managers.telegram_manager import TelegramManager
import traceback
REQUEST_SLEEP_TIME = 15
REQUEST_RETRY_TIME = 3
ERROR_SLEEP_TIME = 30
NOTIFIED_ARTICLES_LIMIT = 300
REQUEST_RETRY_TIME = 5
ERROR_SLEEP_TIME = 60
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
class Worker:
def __init__(self, item_to_monitor):
def __init__(self, item_to_monitor, general_args, queue_manager):
self.logger = logging.getLogger(__name__)
self._item_monitoring = item_to_monitor
self._notified_articles = self._request_articles()
self.telegram_manager = TelegramManager()
self._general_args = general_args
self._queue_manager = queue_manager
self._queue_manager.add_to_notified_articles(self._request_articles())
def _create_url(self):
url = (
@@ -24,15 +22,24 @@ class Worker:
f"?source=search_box"
f"&keywords={self._item_monitoring._search_query}"
f"&order_by=newest"
f"&latitude={self._item_monitoring._latitude}"
f"&longitude={self._item_monitoring._longitude}"
f"&min_sale_price={self._item_monitoring._min_price}"
f"&max_sale_price={self._item_monitoring._max_price}"
f"&language=es_ES"
)
if self._item_monitoring._max_distance != "0":
url += f"&distance={self._item_monitoring._max_distance}"
# Only include latitude and longitude if both are not 0
if self._item_monitoring._latitude != 0 and self._item_monitoring._longitude != 0:
url += (
f"&latitude={self._item_monitoring._latitude}"
f"&longitude={self._item_monitoring._longitude}"
)
if self._item_monitoring._min_price != 0:
url += f"&min_sale_price={self._item_monitoring._min_price}"
if self._item_monitoring._max_price != 0:
url += f"&max_sale_price={self._item_monitoring._max_price}"
if self._item_monitoring._max_distance != 0:
url += f"&distance_in_km={self._item_monitoring._max_distance}"
if self._item_monitoring.get_condition() != "all":
url += f"&condition={self._item_monitoring.get_condition()}" # new, as_good_as_new, good, fair, has_given_it_all
@@ -70,18 +77,18 @@ class Worker:
return any(word in text for word in word_list)
def _title_has_excluded_words(self, article_title):
return self._has_words(article_title, self._item_monitoring.get_title_exclude())
return self._has_words(article_title, self._item_monitoring.get_title_exclude() + self._general_args.get_title_exclude())
def _description_has_excluded_words(self, article_description):
return self._has_words(article_description, self._item_monitoring.get_description_exclude())
return self._has_words(article_description, self._item_monitoring.get_description_exclude() + self._general_args.get_description_exclude())
def _title_has_required_words(self, article_title):
return not self._item_monitoring.get_title_must_include() \
or self._has_words(article_title, self._item_monitoring.get_title_must_include())
or self._has_words(article_title, self._item_monitoring.get_title_must_include() + self._general_args.get_title_must_include())
def _description_has_required_words(self, article_description):
return not self._item_monitoring.get_description_must_include() \
or self._has_words(article_description, self._item_monitoring.get_description_must_include())
or self._has_words(article_description, self._item_monitoring.get_description_must_include() + self._general_args.get_description_must_include())
def _title_first_word_is_excluded(self, article_title):
first_word = article_title.split()[0]
@@ -91,9 +98,6 @@ class Worker:
return False
def _meets_item_conditions(self, article):
if article in self._notified_articles:
return False
article_title = article.get_title().lower()
article_description = article.get_description().lower()
if (
@@ -105,7 +109,7 @@ class Worker:
):
return True
else:
self.logger.info(f"Excluded article: {article}")
self.logger.debug(f"Excluded article: {article}")
return False
def work(self):
@@ -114,20 +118,16 @@ class Worker:
while True:
start_time = time.time()
articles = self._request_articles()
new_articles = 0
for article in articles:
if self._meets_item_conditions(article):
try:
self.telegram_manager.send_telegram_article(article)
new_articles += 1
self._queue_manager.add_to_queue(article, self._item_monitoring.get_name())
except Exception as e:
self.logger.error(f"{self._item_monitoring.get_search_query()} worker crashed: {e}")
self._notified_articles.insert(0, article)
self._notified_articles = self._notified_articles[:NOTIFIED_ARTICLES_LIMIT]
time.sleep(REQUEST_SLEEP_TIME)
exec_times.append(time.time() - start_time)
self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}")
time.sleep(self._item_monitoring.get_check_every())
exec_times.append(time.time() - start_time - self._item_monitoring.get_check_every())
self.logger.info(
f"Worker '{self._item_monitoring.get_search_query()}': {new_articles} new articles found. "
f"Worker '{self._item_monitoring.get_name()}', "
f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, "
f"Average: {sum(exec_times) / len(exec_times):.2f}s."
)
@@ -135,9 +135,9 @@ class Worker:
def run(self):
while True:
try:
self.logger.info(f"Wallapop monitor worker started - {self._item_monitoring.get_search_query()}")
self.logger.info(f"Wallapop monitor worker started - {self._item_monitoring.get_name()}")
self.work()
except Exception as e:
self.logger.error(f"{''.join(traceback.format_exception(None, e, e.__traceback__))}")
self.logger.error(f"{self._item_monitoring.get_search_query()} worker crashed. Restarting worker...")
self.logger.error(f"{self._item_monitoring.get_name()} worker crashed. Restarting worker...")
time.sleep(ERROR_SLEEP_TIME)