import asyncio import yaml import telegram import html import telegram.ext from telegram.ext import CommandHandler, CallbackQueryHandler from telegram import InlineKeyboardButton, InlineKeyboardMarkup import os import json import logging import threading from datetime import datetime from managers.article_cache import create_article_cache ITEM_HTML = """ Artículo: {title} Plataforma: {platform} {description} Localidad: {location} Acepta envíos: {shipping} Modificado el: {modified_at} {price} {currency} """ class TelegramManager: def __init__(self): self.logger = logging.getLogger(__name__) token, channel = self.get_config() self._channel = channel # Use ApplicationBuilder to create the bot application with increased timeouts self._application = telegram.ext.ApplicationBuilder() \ .token(token) \ .connect_timeout(60) \ .read_timeout(60) \ .write_timeout(60) \ .build() self._bot = self._application.bot self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) # Inicializar Redis para favoritos self._article_cache = self._init_redis_cache() # Añadir handlers para comandos y callbacks self._add_handlers() # Iniciar polling en un thread separado self._start_polling() def get_config(self): base_dir = os.path.dirname(os.path.abspath(__file__)) config_file = os.path.join(os.path.dirname(base_dir), 'config.yaml') with open(config_file, 'r') as file: config = yaml.safe_load(file) token = config['telegram_token'] telegram_channel = config['telegram_channel'] return token, telegram_channel def _init_redis_cache(self): """Inicializa Redis cache para favoritos""" try: base_dir = os.path.dirname(os.path.abspath(__file__)) config_file = os.path.join(os.path.dirname(base_dir), 'config.yaml') with open(config_file, 'r') as file: config = yaml.safe_load(file) cache_config = config.get('cache', {}) if cache_config.get('type') == 'redis': redis_config = cache_config.get('redis', {}) return create_article_cache( cache_type='redis', redis_host=redis_config.get('host', 'localhost'), redis_port=redis_config.get('port', 6379), redis_db=redis_config.get('db', 0), redis_password=redis_config.get('password') ) else: self.logger.warning("Redis no configurado para favoritos, se requiere Redis") return None except Exception as e: self.logger.error(f"Error inicializando Redis para favoritos: {e}") return None def escape_html(self, text): return html.escape(str(text)) def send_telegram_article(self, search_name, article, thread_id=None): self._loop.run_until_complete(self.send_telegram_article_async(search_name, article, thread_id)) async def send_telegram_article_async(self, search_name, article, thread_id=None): message = ITEM_HTML.format( search_name=self.escape_html(search_name), title=self.escape_html(article.get_title()), platform=self.escape_html(article.get_platform()), description=self.escape_html(article.get_description()), location=self.escape_html(article.get_location()), price=self.escape_html(article.get_price()), currency=self.escape_html(article.get_currency()), shipping=self.escape_html(article.get_allows_shipping()), modified_at=self.escape_html(article.get_modified_at()), url=self.escape_html(article.get_url()) ) images_url = article.get_images() # Envía solo la primera imagen usando sendPhoto en vez de un álbum/media group first_image_url = images_url[0] if images_url else None # Verificar si el artículo ya es favorito is_favorite = False if self._article_cache: is_favorite = self._article_cache.is_favorite(article.get_platform(), article.get_id()) # Crear botones inline para el primer mensaje del grupo if is_favorite: keyboard = [ [ InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{article.get_platform()}_{article.get_id()}"), InlineKeyboardButton("🗑️ Quitar", callback_data=f"unfav_{article.get_platform()}_{article.get_id()}") ], [ InlineKeyboardButton("Ir al anuncio", url=f"{article.get_url()}") ] ] else: keyboard = [ [ InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{article.get_platform()}_{article.get_id()}_{search_name}"), InlineKeyboardButton("Ir al anuncio", url=f"{article.get_url()}") ] ] reply_markup = InlineKeyboardMarkup(keyboard) # Enviar un mensaje adicional con los botones (reply al primer mensaje del grupo) await self._bot.send_photo( chat_id=self._channel, photo=first_image_url, caption=message, parse_mode="HTML", reply_markup=reply_markup, message_thread_id=thread_id ) def _add_handlers(self): """Añade los handlers para comandos y callbacks""" self._application.add_handler(CommandHandler("favs", self.handle_favs_command)) self._application.add_handler(CallbackQueryHandler(self.handle_favorite_callback, pattern="^fav_")) self._application.add_handler(CallbackQueryHandler(self.handle_unfavorite_callback, pattern="^unfav_")) self.logger.info("Handlers de comandos y callbacks añadidos") def _start_polling(self): """Inicia el bot en modo polling en un thread separado""" def run_polling(): try: self.logger.info("Iniciando polling de Telegram bot...") # Crear un nuevo event loop para este thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Iniciar polling loop.run_until_complete(self._application.initialize()) loop.create_task(self._application.start()) loop.create_task(self._application.updater.start_polling(allowed_updates=["message", "callback_query"])) loop.run_forever() except Exception as e: self.logger.error(f"Error en polling: {e}") polling_thread = threading.Thread(target=run_polling, daemon=True) polling_thread.start() self.logger.info("Thread de polling iniciado") async def handle_favorite_callback(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE): """Maneja el callback cuando se presiona el botón de favoritos""" query = update.callback_query await query.answer() if not self._article_cache: await query.edit_message_text("❌ Redis no está disponible para favoritos") return # Extraer plataforma, ID del artículo y nombre de búsqueda del callback_data # Formato: fav_platform_id_search_name callback_data = query.data parts = callback_data.split("_", 3) if len(parts) < 4: await query.edit_message_text("❌ Error al procesar favorito") return platform = parts[1] article_id = parts[2] search_name = parts[3] if len(parts) > 3 else "Unknown" # Verificar si ya es favorito if self._article_cache.is_favorite(platform, article_id): await query.message.reply_text("ℹ️ Este artículo ya está en favoritos") return # Obtener la URL del artículo desde Redis url = "" try: redis_client = self._article_cache._redis_client key = f"notified:{platform}:{article_id}" value = redis_client.get(key) if value: article_data = json.loads(value) url = article_data.get('url', '') except Exception as e: self.logger.debug(f"Error obteniendo URL: {e}") # Marcar como favorito en Redis success = self._article_cache.set_favorite(platform, article_id, is_favorite=True) if not success: await query.edit_message_text("❌ No se pudo encontrar el artículo en Redis") return # Actualizar el mensaje del botón if url: new_keyboard = [ [ InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{platform}_{article_id}"), InlineKeyboardButton("🗑️ Quitar", callback_data=f"unfav_{platform}_{article_id}") ], [ InlineKeyboardButton("Ir al anuncio", url=url) ] ] else: new_keyboard = [ [ InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{platform}_{article_id}"), InlineKeyboardButton("🗑️ Quitar", callback_data=f"unfav_{platform}_{article_id}") ] ] await query.edit_message_reply_markup( reply_markup=InlineKeyboardMarkup(new_keyboard) ) self.logger.info(f"Artículo {article_id} ({platform}) marcado como favorito") async def handle_favs_command(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE): """Maneja el comando /favs para mostrar los favoritos""" if not self._article_cache: await update.message.reply_text("❌ Redis no está disponible para favoritos") return favorites = self._article_cache.get_favorites() if not favorites: await update.message.reply_text("📭 No tienes favoritos guardados aún") return # Crear mensaje con lista de favoritos message = f"⭐ Tus Favoritos ({len(favorites)})\n\n" for idx, fav in enumerate(favorites, 1): title = fav.get('title', 'Sin título') platform = fav.get('platform', 'N/A').upper() price = fav.get('price', 'N/A') currency = fav.get('currency', '€') url = fav.get('url', '#') message += f"{idx}. {self.escape_html(title)}\n" message += f" 🏷️ {platform} - {price} {currency}\n" message += f" 🔗 Ver anuncio\n\n" # Dividir el mensaje si es muy largo if len(message) > 4000: chunks = [message[i:i+4000] for i in range(0, len(message), 4000)] for chunk in chunks: await update.message.reply_text(chunk, parse_mode="HTML", disable_web_page_preview=True) else: await update.message.reply_text(message, parse_mode="HTML", disable_web_page_preview=True) async def handle_unfavorite_callback(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE): """Maneja el callback cuando se presiona el botón de quitar de favoritos""" query = update.callback_query await query.answer() if not self._article_cache: await query.edit_message_text("❌ Redis no está disponible para favoritos") return # Extraer plataforma e ID del artículo del callback_data # Formato: unfav_platform_id callback_data = query.data parts = callback_data.split("_", 2) if len(parts) < 3: await query.edit_message_text("❌ Error al procesar") return platform = parts[1] article_id = parts[2] # Obtener la URL del artículo desde Redis antes de desmarcar url = "" try: redis_client = self._article_cache._redis_client key = f"notified:{platform}:{article_id}" value = redis_client.get(key) if value: article_data = json.loads(value) url = article_data.get('url', '') except Exception as e: self.logger.debug(f"Error obteniendo URL: {e}") # Desmarcar como favorito en Redis success = self._article_cache.set_favorite(platform, article_id, is_favorite=False) if not success: await query.edit_message_text("ℹ️ Este artículo no estaba en favoritos") return # Restaurar el botón original if url: keyboard = [ [ InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{platform}_{article_id}_Unknown"), InlineKeyboardButton("Ir al anuncio", url=url) ] ] else: keyboard = [ [ InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{platform}_{article_id}_Unknown") ] ] await query.edit_message_reply_markup( reply_markup=InlineKeyboardMarkup(keyboard) ) self.logger.info(f"Artículo {article_id} ({platform}) eliminado de favoritos")