445 lines
19 KiB
Python
445 lines
19 KiB
Python
import asyncio
|
||
import yaml
|
||
import telegram
|
||
import html
|
||
import telegram.ext
|
||
from telegram.ext import CommandHandler, CallbackQueryHandler
|
||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
||
import os
|
||
import json
|
||
import logging
|
||
import threading
|
||
from datetime import datetime
|
||
from managers.article_cache import create_article_cache
|
||
|
||
ITEM_HTML = """
|
||
<b>Artículo:</b> {title}
|
||
<b>Plataforma:</b> {platform}
|
||
|
||
{description}
|
||
|
||
<b>Localidad:</b> {location}
|
||
<b>Acepta envíos:</b> {shipping}
|
||
<b>Modificado el:</b> {modified_at}
|
||
|
||
<b>{price} {currency}</b>
|
||
"""
|
||
|
||
class TelegramManager:
|
||
def __init__(self, token=None, channel=None, enable_polling=True, username=None):
|
||
"""
|
||
Inicializa TelegramManager con configuración específica
|
||
|
||
Args:
|
||
token: Token del bot de Telegram (si None, intenta leer de config.yaml)
|
||
channel: Canal de Telegram (si None, intenta leer de config.yaml)
|
||
enable_polling: Si iniciar el polling del bot
|
||
username: Usuario propietario de esta configuración (para logging)
|
||
"""
|
||
self.logger = logging.getLogger(__name__)
|
||
self._username = username
|
||
|
||
# Si no se proporcionan token/channel, intentar leer de config.yaml (compatibilidad)
|
||
if token is None or channel is None:
|
||
token, channel, enable_polling = self._get_config_from_file()
|
||
|
||
if not token or not channel:
|
||
raise ValueError("Token y channel de Telegram son requeridos")
|
||
|
||
self._channel = channel
|
||
self._token = token
|
||
|
||
# Use ApplicationBuilder to create the bot application with increased timeouts
|
||
self._application = telegram.ext.ApplicationBuilder() \
|
||
.token(token) \
|
||
.connect_timeout(60) \
|
||
.read_timeout(60) \
|
||
.write_timeout(60) \
|
||
.build()
|
||
self._bot = self._application.bot
|
||
|
||
self._loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(self._loop)
|
||
|
||
# Inicializar MongoDB para favoritos
|
||
self._article_cache = self._init_mongodb_cache()
|
||
|
||
# Añadir handlers para comandos y callbacks
|
||
self._add_handlers()
|
||
|
||
# Iniciar polling en un thread separado solo si está habilitado
|
||
if enable_polling:
|
||
self._start_polling()
|
||
else:
|
||
self.logger.info(f"Polling deshabilitado por configuración{' para usuario ' + username if username else ''}")
|
||
|
||
def _get_config_from_file(self):
|
||
"""Lee configuración de config.yaml (para compatibilidad hacia atrás)"""
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
config_file = os.path.join(os.path.dirname(base_dir), 'config.yaml')
|
||
try:
|
||
with open(config_file, 'r') as file:
|
||
config = yaml.safe_load(file)
|
||
token = config.get('telegram_token')
|
||
telegram_channel = config.get('telegram_channel')
|
||
enable_polling = config.get('enable_polling', True)
|
||
return token, telegram_channel, enable_polling
|
||
except Exception as e:
|
||
self.logger.warning(f"No se pudo leer config.yaml: {e}")
|
||
return None, None, False
|
||
|
||
def _init_mongodb_cache(self):
|
||
"""Inicializa MongoDB cache para favoritos"""
|
||
try:
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
config_file = os.path.join(os.path.dirname(base_dir), 'config.yaml')
|
||
with open(config_file, 'r') as file:
|
||
config = yaml.safe_load(file)
|
||
|
||
cache_config = config.get('cache', {})
|
||
if cache_config.get('type') == 'mongodb':
|
||
mongodb_config = cache_config.get('mongodb', {})
|
||
return create_article_cache(
|
||
cache_type='mongodb',
|
||
mongodb_host=os.environ.get('MONGODB_HOST') or mongodb_config.get('host', 'localhost'),
|
||
mongodb_port=int(os.environ.get('MONGODB_PORT') or mongodb_config.get('port', 27017)),
|
||
mongodb_database=os.environ.get('MONGODB_DATABASE') or mongodb_config.get('database', 'wallabicher'),
|
||
mongodb_username=os.environ.get('MONGODB_USERNAME') or mongodb_config.get('username'),
|
||
mongodb_password=os.environ.get('MONGODB_PASSWORD') or mongodb_config.get('password'),
|
||
mongodb_auth_source=mongodb_config.get('auth_source', 'admin')
|
||
)
|
||
else:
|
||
self.logger.warning("MongoDB no configurado para favoritos, se requiere MongoDB")
|
||
return None
|
||
except Exception as e:
|
||
self.logger.error(f"Error inicializando MongoDB para favoritos: {e}")
|
||
return None
|
||
|
||
def escape_html(self, text):
|
||
return html.escape(str(text))
|
||
|
||
async def get_forum_topics_async(self):
|
||
"""Obtiene los topics/hilos del canal/grupo"""
|
||
try:
|
||
# Intentar obtener forum topics usando la API
|
||
try:
|
||
# Usar get_forum_topics si está disponible (python-telegram-bot 20+)
|
||
result = await self._bot.get_forum_topics(chat_id=self._channel, limit=100)
|
||
topics = []
|
||
if hasattr(result, 'topics') and result.topics:
|
||
for topic in result.topics:
|
||
topics.append({
|
||
'id': topic.message_thread_id,
|
||
'name': getattr(topic, 'name', f'Thread {topic.message_thread_id}'),
|
||
'icon_color': getattr(topic, 'icon_color', None),
|
||
'icon_custom_emoji_id': getattr(topic, 'icon_custom_emoji_id', None),
|
||
})
|
||
return topics
|
||
except AttributeError:
|
||
# Si get_forum_topics no existe, usar método alternativo
|
||
return await self._get_topics_from_messages()
|
||
except Exception as e:
|
||
self.logger.error(f"Error obteniendo forum topics: {e}")
|
||
return await self._get_topics_from_messages()
|
||
|
||
async def _get_topics_from_messages(self):
|
||
"""Obtiene topics desde mensajes recientes (workaround)"""
|
||
try:
|
||
topics_dict = {}
|
||
# Intentar obtener algunos mensajes recientes
|
||
# Nota: Esto solo funciona si hay mensajes recientes con thread_id
|
||
self.logger.info("Obteniendo topics desde mensajes recientes...")
|
||
return []
|
||
except Exception as e:
|
||
self.logger.error(f"Error obteniendo topics desde mensajes: {e}")
|
||
return []
|
||
|
||
def get_forum_topics(self):
|
||
"""Versión síncrona para obtener forum topics"""
|
||
return self._loop.run_until_complete(self.get_forum_topics_async())
|
||
|
||
def send_telegram_article(self, search_name, article, thread_id=None):
|
||
self._loop.run_until_complete(self.send_telegram_article_async(search_name, article, thread_id))
|
||
|
||
async def send_telegram_article_async(self, search_name, article, thread_id=None):
|
||
message = ITEM_HTML.format(
|
||
search_name=self.escape_html(search_name),
|
||
title=self.escape_html(article.get_title()),
|
||
platform=self.escape_html(article.get_platform()),
|
||
description=self.escape_html(article.get_description()),
|
||
location=self.escape_html(article.get_location()),
|
||
price=self.escape_html(article.get_price()),
|
||
currency=self.escape_html(article.get_currency()),
|
||
shipping=self.escape_html(article.get_allows_shipping()),
|
||
modified_at=self.escape_html(article.get_modified_at()),
|
||
url=self.escape_html(article.get_url())
|
||
)
|
||
images_url = article.get_images()
|
||
|
||
# Envía solo la primera imagen usando sendPhoto en vez de un álbum/media group
|
||
first_image_url = images_url[0] if images_url else None
|
||
|
||
|
||
# Verificar si el artículo ya es favorito
|
||
is_favorite = False
|
||
if self._article_cache:
|
||
is_favorite = self._article_cache.is_favorite(article.get_platform(), article.get_id())
|
||
|
||
# Crear botones inline para el primer mensaje del grupo
|
||
if is_favorite:
|
||
keyboard = [
|
||
[
|
||
InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{article.get_platform()}_{article.get_id()}"),
|
||
InlineKeyboardButton("🗑️ Quitar", callback_data=f"unfav_{article.get_platform()}_{article.get_id()}")
|
||
],
|
||
[
|
||
InlineKeyboardButton("Ir al anuncio", url=f"{article.get_url()}")
|
||
]
|
||
]
|
||
else:
|
||
keyboard = [
|
||
[
|
||
InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{article.get_platform()}_{article.get_id()}_{search_name}"),
|
||
InlineKeyboardButton("Ir al anuncio", url=f"{article.get_url()}")
|
||
]
|
||
]
|
||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||
|
||
# Enviar un mensaje adicional con los botones (reply al primer mensaje del grupo)
|
||
await self._bot.send_photo(
|
||
chat_id=self._channel,
|
||
photo=first_image_url,
|
||
caption=message,
|
||
parse_mode="HTML",
|
||
reply_markup=reply_markup,
|
||
message_thread_id=thread_id
|
||
)
|
||
|
||
|
||
def _add_handlers(self):
|
||
"""Añade los handlers para comandos y callbacks"""
|
||
self._application.add_handler(CommandHandler("favs", self.handle_favs_command))
|
||
self._application.add_handler(CommandHandler("threads", self.handle_threads_command))
|
||
self._application.add_handler(CallbackQueryHandler(self.handle_favorite_callback, pattern="^fav_"))
|
||
self._application.add_handler(CallbackQueryHandler(self.handle_unfavorite_callback, pattern="^unfav_"))
|
||
self.logger.info("Handlers de comandos y callbacks añadidos")
|
||
|
||
def _start_polling(self):
|
||
"""Inicia el bot en modo polling en un thread separado"""
|
||
def run_polling():
|
||
try:
|
||
user_info = f" para usuario '{self._username}'" if self._username else ""
|
||
self.logger.info(f"Iniciando polling de Telegram bot{user_info}...")
|
||
# Crear un nuevo event loop para este thread
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
# Iniciar polling
|
||
loop.run_until_complete(self._application.initialize())
|
||
loop.create_task(self._application.start())
|
||
loop.create_task(self._application.updater.start_polling(allowed_updates=["message", "callback_query"]))
|
||
loop.run_forever()
|
||
except Exception as e:
|
||
user_info = f" (usuario: {self._username})" if self._username else ""
|
||
self.logger.error(f"Error en polling{user_info}: {e}")
|
||
|
||
polling_thread = threading.Thread(target=run_polling, daemon=True)
|
||
polling_thread.start()
|
||
user_info = f" para usuario '{self._username}'" if self._username else ""
|
||
self.logger.info(f"Thread de polling iniciado{user_info}")
|
||
|
||
async def handle_favorite_callback(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE):
|
||
"""Maneja el callback cuando se presiona el botón de favoritos"""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
if not self._article_cache:
|
||
await query.edit_message_text("❌ MongoDB no está disponible para favoritos")
|
||
return
|
||
|
||
# Extraer plataforma, ID del artículo y nombre de búsqueda del callback_data
|
||
# Formato: fav_platform_id_search_name
|
||
callback_data = query.data
|
||
parts = callback_data.split("_", 3)
|
||
|
||
if len(parts) < 4:
|
||
await query.edit_message_text("❌ Error al procesar favorito")
|
||
return
|
||
|
||
platform = parts[1]
|
||
article_id = parts[2]
|
||
search_name = parts[3] if len(parts) > 3 else "Unknown"
|
||
|
||
# Verificar si ya es favorito
|
||
if self._article_cache.is_favorite(platform, article_id):
|
||
await query.message.reply_text("ℹ️ Este artículo ya está en favoritos")
|
||
return
|
||
|
||
# Obtener la URL del artículo desde MongoDB
|
||
url = ""
|
||
try:
|
||
article = self._article_cache._articles_collection.find_one({
|
||
'platform': platform,
|
||
'id': str(article_id)
|
||
})
|
||
if article:
|
||
url = article.get('url', '')
|
||
except Exception as e:
|
||
self.logger.debug(f"Error obteniendo URL: {e}")
|
||
|
||
# Marcar como favorito en MongoDB
|
||
success = self._article_cache.set_favorite(platform, article_id, is_favorite=True)
|
||
|
||
if not success:
|
||
await query.edit_message_text("❌ No se pudo encontrar el artículo en MongoDB")
|
||
return
|
||
|
||
# Actualizar el mensaje del botón
|
||
if url:
|
||
new_keyboard = [
|
||
[
|
||
InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{platform}_{article_id}"),
|
||
InlineKeyboardButton("🗑️ Quitar", callback_data=f"unfav_{platform}_{article_id}")
|
||
],
|
||
[
|
||
InlineKeyboardButton("Ir al anuncio", url=url)
|
||
]
|
||
]
|
||
else:
|
||
new_keyboard = [
|
||
[
|
||
InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{platform}_{article_id}"),
|
||
InlineKeyboardButton("🗑️ Quitar", callback_data=f"unfav_{platform}_{article_id}")
|
||
]
|
||
]
|
||
await query.edit_message_reply_markup(
|
||
reply_markup=InlineKeyboardMarkup(new_keyboard)
|
||
)
|
||
|
||
self.logger.info(f"Artículo {article_id} ({platform}) marcado como favorito")
|
||
|
||
async def handle_threads_command(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE):
|
||
"""Maneja el comando /threads para mostrar los threads disponibles"""
|
||
try:
|
||
topics = await self.get_forum_topics_async()
|
||
|
||
if not topics:
|
||
await update.message.reply_text(
|
||
"📋 No se pudieron obtener los threads automáticamente.\n\n"
|
||
"💡 <b>Cómo obtener el Thread ID:</b>\n"
|
||
"1. Haz clic derecho en el tema/hilo\n"
|
||
"2. Selecciona 'Copiar enlace del tema'\n"
|
||
"3. El número al final de la URL es el Thread ID\n\n"
|
||
"Ejemplo: <code>t.me/c/1234567890/8</code> → Thread ID = <b>8</b>",
|
||
parse_mode="HTML"
|
||
)
|
||
return
|
||
|
||
message = f"📋 <b>Threads Disponibles ({len(topics)})</b>\n\n"
|
||
for topic in topics:
|
||
name = topic.get('name', f'Thread {topic["id"]}')
|
||
thread_id = topic['id']
|
||
message += f"• <b>{self.escape_html(name)}</b> → Thread ID: <code>{thread_id}</code>\n"
|
||
|
||
await update.message.reply_text(message, parse_mode="HTML")
|
||
except Exception as e:
|
||
self.logger.error(f"Error en comando /threads: {e}")
|
||
await update.message.reply_text(
|
||
"❌ Error obteniendo threads. Usa el método manual:\n\n"
|
||
"1. Haz clic derecho en el tema\n"
|
||
"2. Copia el enlace del tema\n"
|
||
"3. El número al final es el Thread ID"
|
||
)
|
||
|
||
async def handle_favs_command(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE):
|
||
"""Maneja el comando /favs para mostrar los favoritos"""
|
||
if not self._article_cache:
|
||
await update.message.reply_text("❌ MongoDB no está disponible para favoritos")
|
||
return
|
||
|
||
favorites = self._article_cache.get_favorites()
|
||
|
||
if not favorites:
|
||
await update.message.reply_text("📭 No tienes favoritos guardados aún")
|
||
return
|
||
|
||
# Crear mensaje con lista de favoritos
|
||
message = f"⭐ <b>Tus Favoritos ({len(favorites)})</b>\n\n"
|
||
|
||
for idx, fav in enumerate(favorites, 1):
|
||
title = fav.get('title', 'Sin título')
|
||
platform = fav.get('platform', 'N/A').upper()
|
||
price = fav.get('price', 'N/A')
|
||
currency = fav.get('currency', '€')
|
||
url = fav.get('url', '#')
|
||
|
||
message += f"{idx}. <b>{self.escape_html(title)}</b>\n"
|
||
message += f" 🏷️ {platform} - {price} {currency}\n"
|
||
message += f" 🔗 <a href='{url}'>Ver anuncio</a>\n\n"
|
||
|
||
# Dividir el mensaje si es muy largo
|
||
if len(message) > 4000:
|
||
chunks = [message[i:i+4000] for i in range(0, len(message), 4000)]
|
||
for chunk in chunks:
|
||
await update.message.reply_text(chunk, parse_mode="HTML", disable_web_page_preview=True)
|
||
else:
|
||
await update.message.reply_text(message, parse_mode="HTML", disable_web_page_preview=True)
|
||
|
||
async def handle_unfavorite_callback(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE):
|
||
"""Maneja el callback cuando se presiona el botón de quitar de favoritos"""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
if not self._article_cache:
|
||
await query.edit_message_text("❌ MongoDB no está disponible para favoritos")
|
||
return
|
||
|
||
# Extraer plataforma e ID del artículo del callback_data
|
||
# Formato: unfav_platform_id
|
||
callback_data = query.data
|
||
parts = callback_data.split("_", 2)
|
||
|
||
if len(parts) < 3:
|
||
await query.edit_message_text("❌ Error al procesar")
|
||
return
|
||
|
||
platform = parts[1]
|
||
article_id = parts[2]
|
||
|
||
# Obtener la URL del artículo desde MongoDB antes de desmarcar
|
||
url = ""
|
||
try:
|
||
article = self._article_cache._articles_collection.find_one({
|
||
'platform': platform,
|
||
'id': str(article_id)
|
||
})
|
||
if article:
|
||
url = article.get('url', '')
|
||
except Exception as e:
|
||
self.logger.debug(f"Error obteniendo URL: {e}")
|
||
|
||
# Desmarcar como favorito en MongoDB
|
||
success = self._article_cache.set_favorite(platform, article_id, is_favorite=False)
|
||
|
||
if not success:
|
||
await query.edit_message_text("ℹ️ Este artículo no estaba en favoritos")
|
||
return
|
||
|
||
# Restaurar el botón original
|
||
if url:
|
||
keyboard = [
|
||
[
|
||
InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{platform}_{article_id}_Unknown"),
|
||
InlineKeyboardButton("Ir al anuncio", url=url)
|
||
]
|
||
]
|
||
else:
|
||
keyboard = [
|
||
[
|
||
InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{platform}_{article_id}_Unknown")
|
||
]
|
||
]
|
||
await query.edit_message_reply_markup(
|
||
reply_markup=InlineKeyboardMarkup(keyboard)
|
||
)
|
||
|
||
self.logger.info(f"Artículo {article_id} ({platform}) eliminado de favoritos") |