From 0245b603b2fcf81afbd19044aee068231c453823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Omar=20S=C3=A1nchez=20Pizarro?= Date: Fri, 10 Oct 2025 00:03:44 +0200 Subject: [PATCH] feat: default values, general item excludes, images, queueManager to manage multi worker messaging to telegram to prevent too many connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Omar Sánchez Pizarro --- args.json | 30 ------------ datalayer/general_monitor.py | 32 ++++++++++++ datalayer/item_monitor.py | 43 +++++++++++------ datalayer/wallapop_article.py | 27 ++++++++--- managers/queue_manager.py | 79 ++++++++++++++++++++++++++++++ managers/telegram_manager.py | 91 ++++++++++++++++++++++++++--------- managers/worker.py | 66 ++++++++++++------------- requirements.txt | 1 + wallamonitor.py | 20 +++++--- 9 files changed, 275 insertions(+), 114 deletions(-) delete mode 100644 args.json create mode 100644 datalayer/general_monitor.py create mode 100644 managers/queue_manager.py diff --git a/args.json b/args.json deleted file mode 100644 index 0aa028e..0000000 --- a/args.json +++ /dev/null @@ -1,30 +0,0 @@ -[ - { - "search_query": "ps4", - "latitude": "40.4165", - "longitude": "-3.70256", - "max_distance":"0", - "condition": "all", - "min_price": "20", - "max_price": "80", - "title_exclude" : ["DualShock", "Volante"], - "description_exclude": [], - "title_must_include" : [], - "title_first_word_exclude": "juego", - "description_must_include" : [] - }, - { - "search_query": "3ds", - "max_distance":"0", - "latitude": "40.4165", - "longitude": "-3.70256", - "condition": "all", - "min_price": "15", - "max_price": "70", - "title_first_word_exclude": "juego", - "title_exclude" : ["pokemon", "Pokemon"], - "description_exclude": [], - "title_must_include" : [], - "description_must_include" : [] - } -] diff --git a/datalayer/general_monitor.py b/datalayer/general_monitor.py new file mode 100644 index 0000000..5250e1d --- /dev/null +++ b/datalayer/general_monitor.py @@ -0,0 +1,32 @@ +class GeneralMonitor: + def __init__(self, title_exclude, description_exclude, title_must_include, description_must_include, title_first_word_exclude): + self._title_exclude = title_exclude + self._description_exclude = description_exclude + self._title_must_include = title_must_include + self._description_must_include = description_must_include + self._title_first_word_exclude = title_first_word_exclude + + @classmethod + def load_from_json(cls, json_data): + return cls( + json_data.get('title_exclude', []), + json_data.get('description_exclude', []), + json_data.get('title_must_include', []), + json_data.get('description_must_include', []), + json_data.get('title_first_word_exclude', []) + ) + + def get_title_exclude(self): + return self._title_exclude + + def get_description_exclude(self): + return self._description_exclude + + def get_title_must_include(self): + return self._title_must_include + + def get_description_must_include(self): + return self._description_must_include + + def get_title_first_word_exclude(self): + return self._title_first_word_exclude \ No newline at end of file diff --git a/datalayer/item_monitor.py b/datalayer/item_monitor.py index a3ddd87..512abd2 100644 --- a/datalayer/item_monitor.py +++ b/datalayer/item_monitor.py @@ -1,9 +1,10 @@ class ItemMonitor: - def __init__(self, search_query, latitude, longitude, max_distance, + def __init__(self, name,search_query, latitude, longitude, max_distance, condition, min_price, max_price, title_exclude, description_exclude, title_must_include, description_must_include, - title_first_word_exclude): + title_first_word_exclude, check_every): + self._name = name self._search_query = search_query self._latitude = latitude self._longitude = longitude @@ -16,24 +17,33 @@ class ItemMonitor: self._title_must_include = title_must_include self._description_must_include = description_must_include self._title_first_word_exclude = title_first_word_exclude - + self._check_every = check_every @classmethod def load_from_json(cls, json_data): + # search_query is mandatory + if 'search_query' not in json_data: + raise ValueError("Missing mandatory field: search_query") + return cls( + json_data['name'], json_data['search_query'], - json_data['latitude'], - json_data['longitude'], - json_data['max_distance'], - json_data['condition'], - json_data['min_price'], - json_data['max_price'], - json_data['title_exclude'], - json_data['description_exclude'], - json_data['title_must_include'], - json_data['description_must_include'], - json_data['title_first_word_exclude'] + json_data.get('latitude', 0), + json_data.get('longitude', 0), + json_data.get('max_distance', 0), + json_data.get('condition', 'all'), + json_data.get('min_price', 0), + json_data.get('max_price', 0), + json_data.get('title_exclude', []), + json_data.get('description_exclude', []), + json_data.get('title_must_include', []), + json_data.get('description_must_include', []), + json_data.get('title_first_word_exclude', []), + json_data.get('check_every', 30) ) + def get_name(self): + return self._name + def get_search_query(self): return self._search_query @@ -68,4 +78,7 @@ class ItemMonitor: return self._description_must_include def get_title_first_word_exclude(self): - return self._title_first_word_exclude \ No newline at end of file + return self._title_first_word_exclude + + def get_check_every(self): + return self._check_every \ No newline at end of file diff --git a/datalayer/wallapop_article.py b/datalayer/wallapop_article.py index e4f83fa..2157fd1 100644 --- a/datalayer/wallapop_article.py +++ b/datalayer/wallapop_article.py @@ -1,6 +1,8 @@ +import datetime +import pandas as pd class WallapopArticle: - def __init__(self, id, title, description, price, currency, location, allows_shipping, url): + def __init__(self, id, title, description, price, currency, location, allows_shipping, url, images, modified_at): self._id = id self._title = title self._description = description @@ -9,7 +11,8 @@ class WallapopArticle: self._location = location self._allows_shipping = allows_shipping self._url = url - + self._images = images + self._modified_at = modified_at @classmethod def load_from_json(cls, json_data): return cls( @@ -20,7 +23,9 @@ class WallapopArticle: json_data['price']['currency'], json_data['location']['city'], json_data['shipping']['user_allows_shipping'], - json_data['web_slug'] + json_data['web_slug'], + json_data['images'], + json_data['modified_at'] ) def get_id(self): @@ -30,7 +35,8 @@ class WallapopArticle: return self._title def get_description(self): - return self._description + #return only 500 characters + return self._description[:500] + "..." if len(self._description) > 500 else self._description def get_price(self): return self._price @@ -42,14 +48,23 @@ class WallapopArticle: return self._location def get_allows_shipping(self): - return self._allows_shipping + return "✅" if self._allows_shipping else "❌" def get_url(self): return self._url + def get_images(self): + return [img['urls']['medium'] for img in self._images[:3]] + + def get_modified_at(self): + # Convert timestamp in milliseconds to datetime string "YYYY-MM-DD HH:MM:SS" + ts = int(self._modified_at) + dt = datetime.date.fromtimestamp(ts / 1000) + return dt.strftime("%Y-%m-%d %H:%M:%S") + def __eq__(self, article): return self.get_id() == article.get_id() def __str__(self): return f"Article(id={self._id}, title='{self._title}', " \ - f"price={self._price} {self._currency}, url='{self._url}')" \ No newline at end of file + f"price={self._price} {self._currency}, url='{self._url}', modified_at='{self._modified_at}')" \ No newline at end of file diff --git a/managers/queue_manager.py b/managers/queue_manager.py new file mode 100644 index 0000000..18aa9f8 --- /dev/null +++ b/managers/queue_manager.py @@ -0,0 +1,79 @@ +import queue +import threading +import time +import logging +from managers.telegram_manager import TelegramManager + +MESSAGE_DELAY = 3.0 # Tiempo de espera entre mensajes en segundos +NOTIFIED_ARTICLES_LIMIT = 300 # Límite de artículos notificados a mantener en memoria + +class QueueManager: + def __init__(self): + self.logger = logging.getLogger(__name__) + self._queue = queue.Queue() # Cola thread-safe + self._notified_articles = [] + self._telegram_manager = TelegramManager() + self._running = True + + # Iniciar el thread de procesamiento + self._processor_thread = threading.Thread(target=self._process_queue, daemon=True) + self._processor_thread.start() + + def add_to_queue(self, article, search_name=None): + # Verificar si el artículo ya ha sido enviado + if article in self._notified_articles: + return + + if search_name is None: + search_name = "Unknown" + self._queue.put((search_name, article)) + self.logger.debug(f"Artículo añadido a la cola: {article.get_title()}") + + self.add_to_notified_articles(article) + + def add_to_notified_articles(self, articles): + """Añade artículos a la lista de artículos ya notificados""" + if isinstance(articles, list): + self._notified_articles.extend(articles) + else: + self._notified_articles.append(articles) + + def _process_queue(self): + self.logger.info("Procesador de cola: Iniciado") + + while self._running: + try: + # Esperar hasta que haya un elemento en la cola (timeout de 1 segundo) + try: + search_name, article = self._queue.get(timeout=1.0) + except queue.Empty: + continue + + # Procesar el artículo + try: + self._telegram_manager.send_telegram_article(search_name, article) + self.logger.info(f"Artículo enviado a Telegram: {article.get_title()} de {search_name}") + + # Mantener solo los primeros NOTIFIED_ARTICLES_LIMIT artículos después de enviar + if len(self._notified_articles) > NOTIFIED_ARTICLES_LIMIT: + self._notified_articles = self._notified_articles[:NOTIFIED_ARTICLES_LIMIT] + + except Exception as e: + self.logger.error(f"Error al enviar artículo a Telegram: {e}") + finally: + self._queue.task_done() + + # Esperar antes de procesar el siguiente mensaje + if not self._queue.empty(): + self.logger.debug(f"Esperando {MESSAGE_DELAY}s antes del próximo mensaje") + time.sleep(MESSAGE_DELAY) + + except Exception as e: + self.logger.error(f"Error en el procesador de cola: {e}") + time.sleep(1.0) + + def stop(self): + """Detiene el procesador de cola""" + self.logger.info("Deteniendo procesador de cola...") + self._running = False + self._processor_thread.join(timeout=5.0) \ No newline at end of file diff --git a/managers/telegram_manager.py b/managers/telegram_manager.py index 6d7a2df..bb77177 100644 --- a/managers/telegram_manager.py +++ b/managers/telegram_manager.py @@ -1,21 +1,38 @@ import asyncio import yaml import telegram -import re +import html +import telegram.ext -ITEM_TEXT = "- *Artículo*: {}\n" \ - "- *Descripción*: {}\n" \ - "- *Localidad*: {}\n" \ - "- *Precio*: {} {}\n" \ - "- *Acepta envíos*: {}\n" \ - "[Ir al anuncio](https://es.wallapop.com/item/{})" +ITEM_HTML = """ +Resultados para: {search_name} +Artículo: {title} +{description} + +Localidad: {location} +Acepta envíos: {shipping} +Modificado el: {modified_at} + +{price} {currency} + +Ir al anuncio +""" class TelegramManager: def __init__(self): - token, channel = self.get_config() + token, channel, chat_id = self.get_config() self._channel = channel - self._bot = telegram.Bot(token=token) + self._chat_id = chat_id + # Use ApplicationBuilder to create the bot application with increased timeouts + self._application = telegram.ext.ApplicationBuilder() \ + .token(token) \ + .connect_timeout(60) \ + .read_timeout(60) \ + .write_timeout(60) \ + .build() + self._bot = self._application.bot + self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) @@ -25,21 +42,47 @@ class TelegramManager: config = yaml.safe_load(file) token = config['telegram_token'] telegram_channel = config['telegram_channel'] - return token, telegram_channel + telegram_chat_id = config['telegram_chat_id'] + return token, telegram_channel, telegram_chat_id - - def escape_markdown(self, text): - special_chars = r'_[\]()~`>#\+\-=|{}.!]' - escaped_text = re.sub(f'([{re.escape(special_chars)}])', r'\\\1', text) - return escaped_text + def escape_html(self, text): + return html.escape(str(text)) - def send_telegram_article(self, article): - self._loop.run_until_complete(self.send_telegram_article_async(article)) + def send_telegram_article(self, search_name, article): + self._loop.run_until_complete(self.send_telegram_article_async(search_name, article)) - async def send_telegram_article_async(self, article): - message = ITEM_TEXT.format(article.get_title(), self.escape_markdown(article.get_description()), - self.escape_markdown(article.get_location()), article.get_price(), - article.get_currency(), article.get_allows_shipping(), - article.get_url()) - escaped_message = self.escape_markdown(message) - await self._bot.send_message(self._channel, text=escaped_message, parse_mode="MarkdownV2") \ No newline at end of file + async def send_telegram_article_async(self, search_name, article): + message = ITEM_HTML.format( + search_name=self.escape_html(search_name), + title=self.escape_html(article.get_title()), + description=self.escape_html(article.get_description()), + location=self.escape_html(article.get_location()), + price=self.escape_html(article.get_price()), + currency=self.escape_html(article.get_currency()), + shipping=self.escape_html(article.get_allows_shipping()), + modified_at=self.escape_html(article.get_modified_at()), + url=self.escape_html(article.get_url()) + ) + images_url = article.get_images() + + media = [] + for idx, image_url in enumerate(images_url): + if idx == 0: + media.append( + telegram.InputMediaPhoto( + media=image_url, + caption=message, + parse_mode="HTML" + ) + ) + else: + media.append( + telegram.InputMediaPhoto( + media=image_url + ) + ) + + await self._bot.send_media_group( + chat_id=self._chat_id, + media=media + ) \ No newline at end of file diff --git a/managers/worker.py b/managers/worker.py index d4a6dfb..a8bafa4 100644 --- a/managers/worker.py +++ b/managers/worker.py @@ -2,21 +2,19 @@ import time import requests import logging from datalayer.wallapop_article import WallapopArticle -from managers.telegram_manager import TelegramManager import traceback -REQUEST_SLEEP_TIME = 15 -REQUEST_RETRY_TIME = 3 -ERROR_SLEEP_TIME = 30 -NOTIFIED_ARTICLES_LIMIT = 300 +REQUEST_RETRY_TIME = 5 +ERROR_SLEEP_TIME = 60 USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36' class Worker: - def __init__(self, item_to_monitor): + def __init__(self, item_to_monitor, general_args, queue_manager): self.logger = logging.getLogger(__name__) self._item_monitoring = item_to_monitor - self._notified_articles = self._request_articles() - self.telegram_manager = TelegramManager() + self._general_args = general_args + self._queue_manager = queue_manager + self._queue_manager.add_to_notified_articles(self._request_articles()) def _create_url(self): url = ( @@ -24,15 +22,24 @@ class Worker: f"?source=search_box" f"&keywords={self._item_monitoring._search_query}" f"&order_by=newest" - f"&latitude={self._item_monitoring._latitude}" - f"&longitude={self._item_monitoring._longitude}" - f"&min_sale_price={self._item_monitoring._min_price}" - f"&max_sale_price={self._item_monitoring._max_price}" f"&language=es_ES" ) - if self._item_monitoring._max_distance != "0": - url += f"&distance={self._item_monitoring._max_distance}" + # Only include latitude and longitude if both are not 0 + if self._item_monitoring._latitude != 0 and self._item_monitoring._longitude != 0: + url += ( + f"&latitude={self._item_monitoring._latitude}" + f"&longitude={self._item_monitoring._longitude}" + ) + + if self._item_monitoring._min_price != 0: + url += f"&min_sale_price={self._item_monitoring._min_price}" + + if self._item_monitoring._max_price != 0: + url += f"&max_sale_price={self._item_monitoring._max_price}" + + if self._item_monitoring._max_distance != 0: + url += f"&distance_in_km={self._item_monitoring._max_distance}" if self._item_monitoring.get_condition() != "all": url += f"&condition={self._item_monitoring.get_condition()}" # new, as_good_as_new, good, fair, has_given_it_all @@ -70,18 +77,18 @@ class Worker: return any(word in text for word in word_list) def _title_has_excluded_words(self, article_title): - return self._has_words(article_title, self._item_monitoring.get_title_exclude()) + return self._has_words(article_title, self._item_monitoring.get_title_exclude() + self._general_args.get_title_exclude()) def _description_has_excluded_words(self, article_description): - return self._has_words(article_description, self._item_monitoring.get_description_exclude()) + return self._has_words(article_description, self._item_monitoring.get_description_exclude() + self._general_args.get_description_exclude()) def _title_has_required_words(self, article_title): return not self._item_monitoring.get_title_must_include() \ - or self._has_words(article_title, self._item_monitoring.get_title_must_include()) + or self._has_words(article_title, self._item_monitoring.get_title_must_include() + self._general_args.get_title_must_include()) def _description_has_required_words(self, article_description): return not self._item_monitoring.get_description_must_include() \ - or self._has_words(article_description, self._item_monitoring.get_description_must_include()) + or self._has_words(article_description, self._item_monitoring.get_description_must_include() + self._general_args.get_description_must_include()) def _title_first_word_is_excluded(self, article_title): first_word = article_title.split()[0] @@ -91,9 +98,6 @@ class Worker: return False def _meets_item_conditions(self, article): - if article in self._notified_articles: - return False - article_title = article.get_title().lower() article_description = article.get_description().lower() if ( @@ -105,7 +109,7 @@ class Worker: ): return True else: - self.logger.info(f"Excluded article: {article}") + self.logger.debug(f"Excluded article: {article}") return False def work(self): @@ -114,20 +118,16 @@ class Worker: while True: start_time = time.time() articles = self._request_articles() - new_articles = 0 for article in articles: if self._meets_item_conditions(article): try: - self.telegram_manager.send_telegram_article(article) - new_articles += 1 + self._queue_manager.add_to_queue(article, self._item_monitoring.get_name()) except Exception as e: - self.logger.error(f"{self._item_monitoring.get_search_query()} worker crashed: {e}") - self._notified_articles.insert(0, article) - self._notified_articles = self._notified_articles[:NOTIFIED_ARTICLES_LIMIT] - time.sleep(REQUEST_SLEEP_TIME) - exec_times.append(time.time() - start_time) + self.logger.error(f"{self._item_monitoring.get_name()} worker crashed: {e}") + time.sleep(self._item_monitoring.get_check_every()) + exec_times.append(time.time() - start_time - self._item_monitoring.get_check_every()) self.logger.info( - f"Worker '{self._item_monitoring.get_search_query()}': {new_articles} new articles found. " + f"Worker '{self._item_monitoring.get_name()}', " f"Execution time stats - Last: {exec_times[-1]:.2f}s, Max: {max(exec_times):.2f}s, " f"Average: {sum(exec_times) / len(exec_times):.2f}s." ) @@ -135,9 +135,9 @@ class Worker: def run(self): while True: try: - self.logger.info(f"Wallapop monitor worker started - {self._item_monitoring.get_search_query()}") + self.logger.info(f"Wallapop monitor worker started - {self._item_monitoring.get_name()}") self.work() except Exception as e: self.logger.error(f"{''.join(traceback.format_exception(None, e, e.__traceback__))}") - self.logger.error(f"{self._item_monitoring.get_search_query()} worker crashed. Restarting worker...") + self.logger.error(f"{self._item_monitoring.get_name()} worker crashed. Restarting worker...") time.sleep(ERROR_SLEEP_TIME) diff --git a/requirements.txt b/requirements.txt index 3d0c53a..3ce1447 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ python-telegram-bot==21.6 PyYAML==6.0.2 Requests==2.32.3 +Pandas \ No newline at end of file diff --git a/wallamonitor.py b/wallamonitor.py index fb2f6b6..0fd7c3c 100644 --- a/wallamonitor.py +++ b/wallamonitor.py @@ -2,9 +2,12 @@ import json import logging from logging.handlers import RotatingFileHandler from concurrent.futures import ThreadPoolExecutor +import threading from datalayer.item_monitor import ItemMonitor +from datalayer.general_monitor import GeneralMonitor from managers.worker import Worker +from managers.queue_manager import QueueManager def configure_logger(): logging.getLogger("httpx").setLevel(logging.WARNING) @@ -22,17 +25,22 @@ def configure_logger(): handlers=[console_handler, file_handler]) def parse_items_to_monitor(): - with open("args.json") as f: + with open("workers.json") as f: args = json.load(f) - items = [ItemMonitor.load_from_json(item) for item in args] - return items + if 'items' not in args: + raise ValueError("Missing mandatory field: items") + items = [ItemMonitor.load_from_json(item) for item in args['items']] + general_args = GeneralMonitor.load_from_json(args['general']) + return items, general_args if __name__ == "__main__": configure_logger() - items = parse_items_to_monitor() + items, general_args = parse_items_to_monitor() - with ThreadPoolExecutor(max_workers=10) as executor: + queue_manager = QueueManager() + + with ThreadPoolExecutor(max_workers=1000) as executor: for item in items: - worker = Worker(item) + worker = Worker(item, general_args, queue_manager) executor.submit(worker.run)