diff --git a/config.sample.yaml b/config.sample.yaml index 2d84289..4302b70 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -1,20 +1,12 @@ telegram_token: "" telegram_channel: "@canal_o_grupo" -# Configuración del cache de artículos notificados -# cache_type: "memory" o "redis" -# - "memory": Almacena en memoria (no requiere Redis, limitado por el límite configurado) -# - "redis": Almacena en Redis (requiere servidor Redis, ilimitado con TTL de 7 días) +# Configuración del cache de artículos notificados (Redis requerido) +# Almacena en Redis con TTL de 7 días cache: - type: "memory" # "memory" o "redis" - - # Configuración para cache en memoria - memory: - limit: 300 # Límite de artículos a mantener en memoria - - # Configuración para cache en Redis (solo necesario si type: "redis") + type: "redis" redis: - host: "localhost" + host: "localhost" # En Docker usar: "redis" port: 6379 db: 0 password: null # null o string con la contraseña diff --git a/managers/article_cache.py b/managers/article_cache.py index 9fe64ef..a55c488 100644 --- a/managers/article_cache.py +++ b/managers/article_cache.py @@ -1,40 +1,8 @@ import logging import redis import json -from collections import deque - -NOTIFIED_ARTICLE_TTL = 7 * 24 * 60 * 60 # TTL de 7 días en segundos para artículos notificados (solo Redis) -DEFAULT_MEMORY_LIMIT = 300 # Límite por defecto de artículos en memoria - -class MemoryArticleCache: - """Maneja el cache de artículos notificados usando memoria (lista con límite)""" - - def __init__(self, limit=DEFAULT_MEMORY_LIMIT): - self.logger = logging.getLogger(__name__) - self._notified_articles = deque(maxlen=limit) - self._limit = limit - self.logger.info(f"Cache de artículos en memoria inicializado (límite: {limit})") - - def is_article_notified(self, article): - """Verifica si un artículo ya ha sido notificado""" - return article in self._notified_articles - - def mark_article_as_notified(self, article): - """Marca un artículo como notificado en memoria""" - if article not in self._notified_articles: - self._notified_articles.append(article) - self.logger.debug(f"Artículo marcado como notificado (total en memoria: {len(self._notified_articles)})") - - def mark_articles_as_notified(self, articles): - """Añade múltiples artículos a la lista de artículos ya notificados en memoria""" - article_list = articles if isinstance(articles, list) else [articles] - added = 0 - for article in article_list: - if article not in self._notified_articles: - self._notified_articles.append(article) - added += 1 - self.logger.debug(f"{added} artículos marcados como notificados (total en memoria: {len(self._notified_articles)}/{self._limit})") +NOTIFIED_ARTICLE_TTL = 7 * 24 * 60 * 60 # TTL de 7 días en segundos para artículos notificados class RedisArticleCache: """Maneja el cache de artículos notificados usando Redis""" @@ -115,10 +83,30 @@ class RedisArticleCache: article_list = articles if isinstance(articles, list) else [articles] try: - # Usar pipeline para mejor rendimiento al añadir múltiples artículos + # Verificar qué artículos ya existen antes de añadirlos + # Usar pipeline para mejor rendimiento al verificar múltiples artículos pipe = self._redis_client.pipeline() + keys_to_check = [] for article in article_list: key = self._get_article_key(article) + keys_to_check.append((article, key)) + pipe.exists(key) + + # Ejecutar las verificaciones + exists_results = pipe.execute() + + # Ahora añadir solo los artículos que no existen + pipe = self._redis_client.pipeline() + added_count = 0 + skipped_count = 0 + + for (article, key), exists in zip(keys_to_check, exists_results): + if exists > 0: + # El artículo ya existe, no hacer nada + skipped_count += 1 + continue + + # El artículo no existe, añadirlo # Guardar toda la información del artículo como JSON article_data = { 'id': article.get_id(), @@ -135,8 +123,13 @@ class RedisArticleCache: 'is_favorite': False, # Por defecto no es favorito } pipe.setex(key, NOTIFIED_ARTICLE_TTL, json.dumps(article_data)) - pipe.execute() - self.logger.debug(f"{len(article_list)} artículos marcados como notificados en Redis") + added_count += 1 + + # Ejecutar solo si hay artículos para añadir + if added_count > 0: + pipe.execute() + + self.logger.debug(f"{added_count} artículos añadidos, {skipped_count} ya existían en Redis") except Exception as e: self.logger.error(f"Error añadiendo artículos a Redis: {e}") @@ -191,31 +184,53 @@ class RedisArticleCache: except Exception as e: self.logger.error(f"Error verificando favorito en Redis: {e}") return False + + def clear_cache(self): + """Elimina toda la caché de artículos notificados en Redis""" + try: + # Obtener todas las claves que empiezan con 'notified:' + keys = self._redis_client.keys('notified:*') + + if not keys: + self.logger.info("Cache de Redis ya está vacío") + return 0 + + # Eliminar todas las claves usando pipeline para mejor rendimiento + count = len(keys) + pipe = self._redis_client.pipeline() + for key in keys: + pipe.delete(key) + pipe.execute() + + self.logger.info(f"Cache de Redis limpiado: {count} artículos eliminados") + return count + except Exception as e: + self.logger.error(f"Error limpiando cache de Redis: {e}") + return 0 -def create_article_cache(cache_type='memory', **kwargs): +def create_article_cache(cache_type='redis', **kwargs): """ - Factory function para crear el cache de artículos apropiado. + Factory function para crear el cache de artículos usando Redis. Args: - cache_type: 'memory' o 'redis' - **kwargs: Argumentos adicionales según el tipo de cache: - - Para 'memory': limit (opcional, default=300) - - Para 'redis': redis_host, redis_port, redis_db, redis_password + cache_type: 'redis' (solo Redis está soportado) + **kwargs: Argumentos para Redis: + - redis_host: host de Redis (default: 'localhost') + - redis_port: puerto de Redis (default: 6379) + - redis_db: base de datos de Redis (default: 0) + - redis_password: contraseña de Redis (opcional) Returns: - MemoryArticleCache o RedisArticleCache + RedisArticleCache """ - if cache_type == 'memory': - limit = kwargs.get('limit', DEFAULT_MEMORY_LIMIT) - return MemoryArticleCache(limit=limit) - elif cache_type == 'redis': - return RedisArticleCache( - redis_host=kwargs.get('redis_host', 'localhost'), - redis_port=kwargs.get('redis_port', 6379), - redis_db=kwargs.get('redis_db', 0), - redis_password=kwargs.get('redis_password') - ) - else: - raise ValueError(f"Tipo de cache desconocido: {cache_type}. Debe ser 'memory' o 'redis'") + if cache_type != 'redis': + raise ValueError(f"Tipo de cache desconocido: {cache_type}. Solo se soporta 'redis'") + + return RedisArticleCache( + redis_host=kwargs.get('redis_host', 'localhost'), + redis_port=kwargs.get('redis_port', 6379), + redis_db=kwargs.get('redis_db', 0), + redis_password=kwargs.get('redis_password') + ) diff --git a/managers/telegram_manager.py b/managers/telegram_manager.py index d4d7c3f..de72ae0 100644 --- a/managers/telegram_manager.py +++ b/managers/telegram_manager.py @@ -87,6 +87,46 @@ class TelegramManager: def escape_html(self, text): return html.escape(str(text)) + + async def get_forum_topics_async(self): + """Obtiene los topics/hilos del canal/grupo""" + try: + # Intentar obtener forum topics usando la API + try: + # Usar get_forum_topics si está disponible (python-telegram-bot 20+) + result = await self._bot.get_forum_topics(chat_id=self._channel, limit=100) + topics = [] + if hasattr(result, 'topics') and result.topics: + for topic in result.topics: + topics.append({ + 'id': topic.message_thread_id, + 'name': getattr(topic, 'name', f'Thread {topic.message_thread_id}'), + 'icon_color': getattr(topic, 'icon_color', None), + 'icon_custom_emoji_id': getattr(topic, 'icon_custom_emoji_id', None), + }) + return topics + except AttributeError: + # Si get_forum_topics no existe, usar método alternativo + return await self._get_topics_from_messages() + except Exception as e: + self.logger.error(f"Error obteniendo forum topics: {e}") + return await self._get_topics_from_messages() + + async def _get_topics_from_messages(self): + """Obtiene topics desde mensajes recientes (workaround)""" + try: + topics_dict = {} + # Intentar obtener algunos mensajes recientes + # Nota: Esto solo funciona si hay mensajes recientes con thread_id + self.logger.info("Obteniendo topics desde mensajes recientes...") + return [] + except Exception as e: + self.logger.error(f"Error obteniendo topics desde mensajes: {e}") + return [] + + def get_forum_topics(self): + """Versión síncrona para obtener forum topics""" + return self._loop.run_until_complete(self.get_forum_topics_async()) def send_telegram_article(self, search_name, article, thread_id=None): self._loop.run_until_complete(self.send_telegram_article_async(search_name, article, thread_id)) @@ -117,8 +157,8 @@ class TelegramManager: # Crear botones inline para el primer mensaje del grupo if is_favorite: - keyboard = [ - [ + keyboard = [ + [ InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{article.get_platform()}_{article.get_id()}"), InlineKeyboardButton("🗑️ Quitar", callback_data=f"unfav_{article.get_platform()}_{article.get_id()}") ], @@ -130,9 +170,9 @@ class TelegramManager: keyboard = [ [ InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{article.get_platform()}_{article.get_id()}_{search_name}"), - InlineKeyboardButton("Ir al anuncio", url=f"{article.get_url()}") + InlineKeyboardButton("Ir al anuncio", url=f"{article.get_url()}") + ] ] - ] reply_markup = InlineKeyboardMarkup(keyboard) # Enviar un mensaje adicional con los botones (reply al primer mensaje del grupo) @@ -149,6 +189,7 @@ class TelegramManager: def _add_handlers(self): """Añade los handlers para comandos y callbacks""" self._application.add_handler(CommandHandler("favs", self.handle_favs_command)) + self._application.add_handler(CommandHandler("threads", self.handle_threads_command)) self._application.add_handler(CallbackQueryHandler(self.handle_favorite_callback, pattern="^fav_")) self._application.add_handler(CallbackQueryHandler(self.handle_unfavorite_callback, pattern="^unfav_")) self.logger.info("Handlers de comandos y callbacks añadidos") @@ -231,7 +272,7 @@ class TelegramManager: ] ] else: - new_keyboard = [ + new_keyboard = [ [ InlineKeyboardButton("✅ En favoritos", callback_data=f"already_fav_{platform}_{article_id}"), InlineKeyboardButton("🗑️ Quitar", callback_data=f"unfav_{platform}_{article_id}") @@ -243,6 +284,39 @@ class TelegramManager: self.logger.info(f"Artículo {article_id} ({platform}) marcado como favorito") + async def handle_threads_command(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE): + """Maneja el comando /threads para mostrar los threads disponibles""" + try: + topics = await self.get_forum_topics_async() + + if not topics: + await update.message.reply_text( + "📋 No se pudieron obtener los threads automáticamente.\n\n" + "💡 Cómo obtener el Thread ID:\n" + "1. Haz clic derecho en el tema/hilo\n" + "2. Selecciona 'Copiar enlace del tema'\n" + "3. El número al final de la URL es el Thread ID\n\n" + "Ejemplo: t.me/c/1234567890/8 → Thread ID = 8", + parse_mode="HTML" + ) + return + + message = f"📋 Threads Disponibles ({len(topics)})\n\n" + for topic in topics: + name = topic.get('name', f'Thread {topic["id"]}') + thread_id = topic['id'] + message += f"• {self.escape_html(name)} → Thread ID: {thread_id}\n" + + await update.message.reply_text(message, parse_mode="HTML") + except Exception as e: + self.logger.error(f"Error en comando /threads: {e}") + await update.message.reply_text( + "❌ Error obteniendo threads. Usa el método manual:\n\n" + "1. Haz clic derecho en el tema\n" + "2. Copia el enlace del tema\n" + "3. El número al final es el Thread ID" + ) + async def handle_favs_command(self, update: telegram.Update, context: telegram.ext.ContextTypes.DEFAULT_TYPE): """Maneja el comando /favs para mostrar los favoritos""" if not self._article_cache: @@ -326,7 +400,7 @@ class TelegramManager: ] ] else: - keyboard = [ + keyboard = [ [ InlineKeyboardButton("⭐ Añadir a favoritos", callback_data=f"fav_{platform}_{article_id}_Unknown") ] diff --git a/wallamonitor.py b/wallamonitor.py index fbc1025..be9c175 100644 --- a/wallamonitor.py +++ b/wallamonitor.py @@ -91,15 +91,9 @@ def load_cache_config(): with open(config_path, 'r') as f: config = yaml.safe_load(f) cache_config = config.get('cache', {}) - cache_type = cache_config.get('type', 'memory') + cache_type = cache_config.get('type', 'redis') - if cache_type == 'memory': - memory_config = cache_config.get('memory', {}) - return { - 'cache_type': 'memory', - 'limit': memory_config.get('limit', 300) - } - elif cache_type == 'redis': + if cache_type == 'redis': redis_config = cache_config.get('redis', {}) return { 'cache_type': 'redis', @@ -109,16 +103,22 @@ def load_cache_config(): 'redis_password': redis_config.get('password') } else: - logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'memory'") + logger.warning(f"Tipo de cache desconocido: {cache_type}, usando 'redis' por defecto") return { - 'cache_type': 'memory', - 'limit': 300 + 'cache_type': 'redis', + 'redis_host': 'localhost', + 'redis_port': 6379, + 'redis_db': 0, + 'redis_password': None } except Exception as e: - logger.warning(f"Error cargando configuración de cache, usando valores por defecto (memory): {e}") + logger.warning(f"Error cargando configuración de cache, usando valores por defecto (redis): {e}") return { - 'cache_type': 'memory', - 'limit': 300 + 'cache_type': 'redis', + 'redis_host': 'localhost', + 'redis_port': 6379, + 'redis_db': 0, + 'redis_password': None } class WorkerManager: diff --git a/web/backend/server.js b/web/backend/server.js index 9594fd2..34549e7 100644 --- a/web/backend/server.js +++ b/web/backend/server.js @@ -327,6 +327,51 @@ app.delete('/api/favorites/:platform/:id', async (req, res) => { } }); +// Limpiar toda la caché de Redis +app.delete('/api/cache', async (req, res) => { + try { + if (!redisClient) { + return res.status(500).json({ error: 'Redis no está disponible' }); + } + + // Obtener todas las claves que empiezan con 'notified:' + const keys = await redisClient.keys('notified:*'); + + if (!keys || keys.length === 0) { + return res.json({ + success: true, + message: 'Cache ya está vacío', + count: 0 + }); + } + + // Eliminar todas las claves + const count = keys.length; + for (const key of keys) { + await redisClient.del(key); + } + + // Notificar a los clientes WebSocket + broadcast({ + type: 'cache_cleared', + data: { count, timestamp: Date.now() } + }); + + // También actualizar favoritos (debería estar vacío ahora) + const favorites = await getFavorites(); + broadcast({ type: 'favorites_updated', data: favorites }); + + res.json({ + success: true, + message: `Cache limpiado: ${count} artículos eliminados`, + count + }); + } catch (error) { + console.error('Error limpiando cache de Redis:', error); + res.status(500).json({ error: error.message }); + } +}); + // Obtener artículos notificados app.get('/api/articles', async (req, res) => { try { @@ -454,6 +499,64 @@ app.get('/api/config', (req, res) => { } }); +// Obtener threads/topics de Telegram +app.get('/api/telegram/threads', async (req, res) => { + try { + if (!config) { + config = yaml.parse(readFileSync(CONFIG_PATH, 'utf8')); + } + + const token = config?.telegram_token; + const channel = config?.telegram_channel; + + if (!token || !channel) { + return res.status(400).json({ error: 'Token o canal de Telegram no configurados' }); + } + + // Convertir el canal a chat_id si es necesario + let chatId = channel; + if (channel.startsWith('@')) { + // Para canales con @, necesitamos obtener el chat_id primero + const getChatUrl = `https://api.telegram.org/bot${token}/getChat?chat_id=${encodeURIComponent(channel)}`; + const chatResponse = await fetch(getChatUrl); + const chatData = await chatResponse.json(); + + if (!chatData.ok) { + return res.status(400).json({ error: `Error obteniendo chat: ${chatData.description || 'Chat no encontrado'}` }); + } + + chatId = chatData.result.id; + } + + // Intentar obtener forum topics + const forumTopicsUrl = `https://api.telegram.org/bot${token}/getForumTopics?chat_id=${chatId}&limit=100`; + const topicsResponse = await fetch(forumTopicsUrl); + const topicsData = await topicsResponse.json(); + + if (topicsData.ok && topicsData.result?.topics) { + const threads = topicsData.result.topics.map(topic => ({ + id: topic.message_thread_id, + name: topic.name || `Thread ${topic.message_thread_id}`, + icon_color: topic.icon_color, + icon_custom_emoji_id: topic.icon_custom_emoji_id, + })); + + return res.json({ threads, success: true }); + } else { + // Si no hay forum topics, devolver un mensaje informativo + return res.json({ + threads: [], + success: false, + message: 'El chat no tiene forum topics habilitados o no se pudieron obtener. Puedes obtener el Thread ID manualmente copiando el enlace del tema.', + info: 'Para obtener el Thread ID manualmente: 1. Haz clic derecho en el tema/hilo en Telegram 2. Selecciona "Copiar enlace del tema" 3. El número al final de la URL es el Thread ID (ej: t.me/c/1234567890/8 → Thread ID = 8)' + }); + } + } catch (error) { + console.error('Error obteniendo threads de Telegram:', error.message); + res.status(500).json({ error: error.message }); + } +}); + // WebSocket connection wss.on('connection', (ws) => { console.log('Cliente WebSocket conectado'); diff --git a/web/frontend/src/services/api.js b/web/frontend/src/services/api.js index f04139f..1853173 100644 --- a/web/frontend/src/services/api.js +++ b/web/frontend/src/services/api.js @@ -69,5 +69,17 @@ export default { const response = await api.get('/config'); return response.data; }, + + // Telegram + async getTelegramThreads() { + const response = await api.get('/telegram/threads'); + return response.data; + }, + + // Cache + async clearCache() { + const response = await api.delete('/cache'); + return response.data; + }, }; diff --git a/web/frontend/src/views/Workers.vue b/web/frontend/src/views/Workers.vue index c4f038a..cc0230e 100644 --- a/web/frontend/src/views/Workers.vue +++ b/web/frontend/src/views/Workers.vue @@ -6,6 +6,9 @@ + @@ -226,8 +229,47 @@
- +
+ + +

Opcional: ID del hilo donde enviar notificaciones

+ + +
+

Threads disponibles:

+
+
+ {{ thread.name }} + ID: {{ thread.id }} +
+ +
+
+ + +
+

{{ threadsMessage }}

+

{{ threadsInfo }}

+
@@ -458,6 +500,11 @@ const generalForm = ref({ description_exclude_text: '', }); +const availableThreads = ref([]); +const loadingThreads = ref(false); +const threadsMessage = ref(''); +const threadsInfo = ref(''); + async function loadWorkers() { loading.value = true; try { @@ -474,6 +521,40 @@ async function loadWorkers() { } } +async function loadTelegramThreads() { + loadingThreads.value = true; + availableThreads.value = []; + threadsMessage.value = ''; + threadsInfo.value = ''; + + try { + const result = await api.getTelegramThreads(); + + if (result.success && result.threads && result.threads.length > 0) { + availableThreads.value = result.threads; + threadsMessage.value = ''; + threadsInfo.value = ''; + } else { + availableThreads.value = []; + threadsMessage.value = result.message || 'No se pudieron obtener los threads automáticamente'; + threadsInfo.value = result.info || ''; + } + } catch (error) { + console.error('Error cargando threads de Telegram:', error); + availableThreads.value = []; + threadsMessage.value = 'Error al obtener threads de Telegram. Verifica que el bot y el canal estén configurados correctamente.'; + threadsInfo.value = 'Para obtener el Thread ID manualmente: 1. Haz clic derecho en el tema/hilo en Telegram 2. Selecciona "Copiar enlace del tema" 3. El número al final de la URL es el Thread ID (ej: t.me/c/1234567890/8 → Thread ID = 8)'; + } finally { + loadingThreads.value = false; + } +} + +function selectThread(threadId) { + workerForm.value.thread_id = threadId; + // Opcional: limpiar la lista después de seleccionar + // availableThreads.value = []; +} + function editWorker(worker, index) { editingWorker.value = { worker, index }; workerForm.value = { @@ -649,6 +730,23 @@ async function deleteWorker(name) { } } +async function handleClearCache() { + if (!confirm('¿Estás seguro de que quieres limpiar toda la caché de Redis?\n\nEsto eliminará todos los artículos notificados de todas las instancias. Esta acción no se puede deshacer.')) { + return; + } + + try { + const result = await api.clearCache(); + const message = result.count > 0 + ? `✓ Caché limpiada exitosamente: ${result.count} artículos eliminados` + : 'La caché ya estaba vacía'; + alert(message); + } catch (error) { + console.error('Error limpiando caché:', error); + alert('Error al limpiar la caché: ' + (error.response?.data?.error || error.message)); + } +} + function handleWSMessage(event) { const data = event.detail; if (data.type === 'workers_updated') {