289 lines
11 KiB
Python
289 lines
11 KiB
Python
import asyncio
|
||
import yaml
|
||
import telegram
|
||
import html
|
||
import telegram.ext
|
||
from telegram.ext import CommandHandler, CallbackQueryHandler
|
||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
||
import os
|
||
import json
|
||
import logging
|
||
import threading
|
||
from datetime import datetime
|
||
|
||
ITEM_HTML = """
|
||
<b>Artículo:</b> {title}
|
||
<b>Plataforma:</b> {platform}
|
||
|
||
{description}
|
||
|
||
<b>Localidad:</b> {location}
|
||
<b>Acepta envíos:</b> {shipping}
|
||
<b>Modificado el:</b> {modified_at}
|
||
|
||
<b>{price} {currency}</b>
|
||
"""
|
||
|
||
FAVORITES_FILE = "favorites.json"
|
||
|
||
class TelegramManager:
|
||
def __init__(self):
|
||
self.logger = logging.getLogger(__name__)
|
||
token, channel = self.get_config()
|
||
self._channel = channel
|
||
# Use ApplicationBuilder to create the bot application with increased timeouts
|
||
self._application = telegram.ext.ApplicationBuilder() \
|
||
.token(token) \
|
||
.connect_timeout(60) \
|
||
.read_timeout(60) \
|
||
.write_timeout(60) \
|
||
.build()
|
||
self._bot = self._application.bot
|
||
|
||
self._loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(self._loop)
|
||
|
||
# Inicializar archivo de favoritos
|
||
self._favorites_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), FAVORITES_FILE)
|
||
self._ensure_favorites_file()
|
||
|
||
# Añadir handlers para comandos y callbacks
|
||
self._add_handlers()
|
||
|
||
# Iniciar polling en un thread separado
|
||
self._start_polling()
|
||
|
||
def get_config(self):
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
config_file = os.path.join(os.path.dirname(base_dir), 'config.yaml')
|
||
with open(config_file, 'r') as file:
|
||
config = yaml.safe_load(file)
|
||
token = config['telegram_token']
|
||
telegram_channel = config['telegram_channel']
|
||
return token, telegram_channel
|
||
|
||
def escape_html(self, text):
|
||
return html.escape(str(text))
|
||
|
||
def send_telegram_article(self, search_name, article, thread_id=None):
|
||
self._loop.run_until_complete(self.send_telegram_article_async(search_name, article, thread_id))
|
||
|
||
async def send_telegram_article_async(self, search_name, article, thread_id=None):
|
||
message = ITEM_HTML.format(
|
||
search_name=self.escape_html(search_name),
|
||
title=self.escape_html(article.get_title()),
|
||
platform=self.escape_html(article.get_platform()),
|
||
description=self.escape_html(article.get_description()),
|
||
location=self.escape_html(article.get_location()),
|
||
price=self.escape_html(article.get_price()),
|
||
currency=self.escape_html(article.get_currency()),
|
||
shipping=self.escape_html(article.get_allows_shipping()),
|
||
modified_at=self.escape_html(article.get_modified_at()),
|
||
url=self.escape_html(article.get_url())
|
||
)
|
||
images_url = article.get_images()
|
||
|
||
# Envía solo la primera imagen usando sendPhoto en vez de un álbum/media group
|
||
first_image_url = images_url[0] if images_url else None
|
||
|
||
|
||
# Crear botones inline para el primer mensaje del grupo
|
||
keyboard = [
|
||
[
|
||
InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{article.get_id()}_{search_name}"),
|
||
InlineKeyboardButton("Ir al anuncio", url=f"{article.get_url()}")
|
||
]
|
||
]
|
||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||
|
||
# Enviar un mensaje adicional con los botones (reply al primer mensaje del grupo)
|
||
await self._bot.send_photo(
|
||
chat_id=self._channel,
|
||
photo=first_image_url,
|
||
caption=message,
|
||
parse_mode="HTML",
|
||
reply_markup=reply_markup,
|
||
message_thread_id=thread_id
|
||
)
|
||
|
||
def _ensure_favorites_file(self):
|
||
"""Crea el archivo de favoritos si no existe"""
|
||
if not os.path.exists(self._favorites_file):
|
||
with open(self._favorites_file, 'w') as f:
|
||
json.dump([], f)
|
||
self.logger.info(f"Archivo de favoritos creado: {self._favorites_file}")
|
||
|
||
def _load_favorites(self):
|
||
"""Carga los favoritos desde el archivo JSON"""
|
||
try:
|
||
with open(self._favorites_file, 'r') as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
self.logger.error(f"Error al cargar favoritos: {e}")
|
||
return []
|
||
|
||
def _save_favorites(self, favorites):
|
||
"""Guarda los favoritos en el archivo JSON"""
|
||
try:
|
||
with open(self._favorites_file, 'w') as f:
|
||
json.dump(favorites, f, indent=2, ensure_ascii=False)
|
||
self.logger.info(f"Favoritos guardados: {len(favorites)} items")
|
||
except Exception as e:
|
||
self.logger.error(f"Error al guardar favoritos: {e}")
|
||
|
||
def _add_handlers(self):
|
||
"""Añade los handlers para comandos y callbacks"""
|
||
self._application.add_handler(CommandHandler("favs", self.handle_favs_command))
|
||
self._application.add_handler(CallbackQueryHandler(self.handle_favorite_callback, pattern="^fav_"))
|
||
self._application.add_handler(CallbackQueryHandler(self.handle_unfavorite_callback, pattern="^unfav_"))
|
||
self.logger.info("Handlers de comandos y callbacks añadidos")
|
||
|
||
def _start_polling(self):
|
||
"""Inicia el bot en modo polling en un thread separado"""
|
||
def run_polling():
|
||
try:
|
||
self.logger.info("Iniciando polling de Telegram bot...")
|
||
# Crear un nuevo event loop para este thread
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
# Iniciar polling
|
||
loop.run_until_complete(self._application.initialize())
|
||
loop.create_task(self._application.start())
|
||
loop.create_task(self._application.updater.start_polling(allowed_updates=["message", "callback_query"]))
|
||
loop.run_forever()
|
||
except Exception as e:
|
||
self.logger.error(f"Error en polling: {e}")
|
||
|
||
polling_thread = threading.Thread(target=run_polling, daemon=True)
|
||
polling_thread.start()
|
||
self.logger.info("Thread de polling iniciado")
|
||
|
||
async def handle_favorite_callback(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE):
|
||
"""Maneja el callback cuando se presiona el botón de favoritos"""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
# Extraer el ID del artículo y el nombre de búsqueda del callback_data
|
||
callback_data = query.data
|
||
parts = callback_data.split("_", 2) # fav_ID_search_name
|
||
|
||
if len(parts) < 3:
|
||
await query.edit_message_text("❌ Error al procesar favorito")
|
||
return
|
||
|
||
article_id = parts[1]
|
||
search_name = parts[2]
|
||
|
||
# Ahora el mensaje original es el mismo mensaje del keyboard
|
||
original_message = query.message
|
||
|
||
if not original_message:
|
||
await query.edit_message_text("❌ No se pudo encontrar el mensaje original")
|
||
return
|
||
|
||
# Extraer información del caption
|
||
caption = original_message.caption or ""
|
||
|
||
# Crear el objeto favorito
|
||
favorite = {
|
||
"id": article_id,
|
||
"search_name": search_name,
|
||
"caption": caption,
|
||
"date_added": datetime.now().isoformat(),
|
||
"message_link": f"https://t.me/c/{str(self._channel).replace('-100', '')}/{original_message.message_id}"
|
||
}
|
||
|
||
# Cargar favoritos existentes
|
||
favorites = self._load_favorites()
|
||
|
||
# Verificar si ya existe
|
||
if any(fav["id"] == article_id for fav in favorites):
|
||
await query.message.reply_text("ℹ️ Este artículo ya está en favoritos")
|
||
return
|
||
|
||
# Añadir nuevo favorito
|
||
favorites.append(favorite)
|
||
self._save_favorites(favorites)
|
||
|
||
# Actualizar el mensaje del botón
|
||
new_keyboard = [
|
||
[InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{article_id}")],
|
||
[InlineKeyboardButton("🗑️ Quitar de favoritos", callback_data=f"unfav_{article_id}")]
|
||
]
|
||
await query.edit_message_text(
|
||
reply_markup=InlineKeyboardMarkup(new_keyboard)
|
||
)
|
||
|
||
self.logger.info(f"Artículo {article_id} añadido a favoritos")
|
||
|
||
async def handle_favs_command(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE):
|
||
"""Maneja el comando /favs para mostrar los favoritos"""
|
||
favorites = self._load_favorites()
|
||
|
||
if not favorites:
|
||
await update.message.reply_text("📭 No tienes favoritos guardados aún")
|
||
return
|
||
|
||
# Crear mensaje con lista de favoritos
|
||
message = f"⭐ <b>Tus Favoritos ({len(favorites)})</b>\n\n"
|
||
|
||
for idx, fav in enumerate(favorites, 1):
|
||
# Extraer título del caption (está después de "Artículo:")
|
||
caption_lines = fav["caption"].split("\n")
|
||
title = "Sin título"
|
||
for line in caption_lines:
|
||
if line.startswith("<b>Artículo:</b>"):
|
||
title = line.replace("<b>Artículo:</b>", "").strip()
|
||
break
|
||
|
||
message += f"{idx}. {title}\n"
|
||
message += f" 📂 Búsqueda: <i>{fav['search_name']}</i>\n"
|
||
message += f" 🔗 <a href='{fav['message_link']}'>Ver mensaje</a>\n\n"
|
||
|
||
# Dividir el mensaje si es muy largo
|
||
if len(message) > 4000:
|
||
chunks = [message[i:i+4000] for i in range(0, len(message), 4000)]
|
||
for chunk in chunks:
|
||
await update.message.reply_text(chunk, parse_mode="HTML", disable_web_page_preview=True)
|
||
else:
|
||
await update.message.reply_text(message, parse_mode="HTML", disable_web_page_preview=True)
|
||
|
||
async def handle_unfavorite_callback(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE):
|
||
"""Maneja el callback cuando se presiona el botón de quitar de favoritos"""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
# Extraer el ID del artículo del callback_data
|
||
callback_data = query.data
|
||
parts = callback_data.split("_", 1) # unfav_ID
|
||
|
||
if len(parts) < 2:
|
||
await query.edit_message_text("❌ Error al procesar")
|
||
return
|
||
|
||
article_id = parts[1]
|
||
|
||
# Cargar favoritos existentes
|
||
favorites = self._load_favorites()
|
||
|
||
# Buscar y eliminar el favorito
|
||
original_count = len(favorites)
|
||
favorites = [fav for fav in favorites if fav["id"] != article_id]
|
||
|
||
if len(favorites) == original_count:
|
||
await query.edit_message_text("ℹ️ Este artículo no estaba en favoritos")
|
||
return
|
||
|
||
# Guardar favoritos actualizados
|
||
self._save_favorites(favorites)
|
||
|
||
# Restaurar el botón original
|
||
keyboard = [
|
||
[InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{article_id}_unknown")]
|
||
]
|
||
await query.edit_message_text(
|
||
text="💾 Acciones",
|
||
reply_markup=InlineKeyboardMarkup(keyboard)
|
||
)
|
||
|
||
self.logger.info(f"Artículo {article_id} eliminado de favoritos") |