import express from 'express'; import cors from 'cors'; import { WebSocketServer } from 'ws'; import { createServer } from 'http'; import { fileURLToPath } from 'url'; import { dirname, join } from 'path'; import { readFileSync, writeFileSync, existsSync, statSync } from 'fs'; import { watch } from 'chokidar'; import yaml from 'yaml'; import redis from 'redis'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); // En Docker, usar PROJECT_ROOT de env, sino usar ruta relativa const PROJECT_ROOT = process.env.PROJECT_ROOT || join(__dirname, '../..'); const app = express(); const server = createServer(app); const wss = new WebSocketServer({ server, path: '/ws' }); app.use(cors()); app.use(express.json()); // Configuración const CONFIG_PATH = join(PROJECT_ROOT, 'config.yaml'); const WORKERS_PATH = join(PROJECT_ROOT, 'workers.json'); // Función para obtener la ruta del log (en Docker puede estar en /data/logs) function getLogPath() { const logsDirPath = join(PROJECT_ROOT, 'logs', 'monitor.log'); const rootLogPath = join(PROJECT_ROOT, 'monitor.log'); if (existsSync(logsDirPath)) { return logsDirPath; } return rootLogPath; } const LOG_PATH = getLogPath(); let redisClient = null; let config = null; // Inicializar Redis si está configurado async function initRedis() { try { config = yaml.parse(readFileSync(CONFIG_PATH, 'utf8')); const cacheConfig = config?.cache; if (cacheConfig?.type === 'redis') { const redisConfig = cacheConfig.redis; // En Docker, usar el nombre del servicio si no se especifica host const redisHost = process.env.REDIS_HOST || redisConfig.host || 'localhost'; redisClient = redis.createClient({ socket: { host: redisHost, port: redisConfig.port || 6379, }, password: redisConfig.password || undefined, database: redisConfig.db || 0, }); redisClient.on('error', (err) => console.error('Redis Client Error', err)); await redisClient.connect(); console.log('✅ Conectado a Redis'); } else { console.log('ℹ️ Redis no configurado, usando modo memoria'); } } catch (error) { console.error('Error inicializando Redis:', error.message); } } // Broadcast a todos los clientes WebSocket function broadcast(data) { wss.clients.forEach((client) => { if (client.readyState === 1) { // WebSocket.OPEN client.send(JSON.stringify(data)); } }); } // Leer archivo JSON de forma segura function readJSON(path, defaultValue = {}) { try { if (existsSync(path)) { return JSON.parse(readFileSync(path, 'utf8')); } return defaultValue; } catch (error) { console.error(`Error leyendo ${path}:`, error.message); return defaultValue; } } // Escribir archivo JSON function writeJSON(path, data) { try { writeFileSync(path, JSON.stringify(data, null, 2), 'utf8'); return true; } catch (error) { console.error(`Error escribiendo ${path}:`, error.message); return false; } } // Obtener artículos notificados desde Redis async function getNotifiedArticles() { if (!redisClient) { return []; } try { const keys = await redisClient.keys('notified:*'); const articles = []; for (const key of keys) { const parts = key.split(':'); if (parts.length >= 3) { const platform = parts[1]; const id = parts.slice(2).join(':'); const ttl = await redisClient.ttl(key); const value = await redisClient.get(key); // Intentar parsear como JSON (nuevo formato con toda la info) let articleData = {}; try { if (value && value !== '1') { articleData = JSON.parse(value); } } catch (e) { // Si no es JSON válido, usar valor por defecto } articles.push({ platform: articleData.platform || platform, id: articleData.id || id, title: articleData.title || null, description: articleData.description || null, price: articleData.price || null, currency: articleData.currency || null, location: articleData.location || null, allows_shipping: articleData.allows_shipping !== undefined ? articleData.allows_shipping : null, url: articleData.url || null, images: articleData.images || [], modified_at: articleData.modified_at || null, notifiedAt: Date.now() - (7 * 24 * 60 * 60 - ttl) * 1000, expiresAt: Date.now() + ttl * 1000, }); } } return articles; } catch (error) { console.error('Error obteniendo artículos de Redis:', error.message); return []; } } // API Routes // Obtener favoritos desde Redis async function getFavorites() { if (!redisClient) { return []; } try { const keys = await redisClient.keys('notified:*'); const favorites = []; for (const key of keys) { const value = await redisClient.get(key); if (value) { try { const articleData = JSON.parse(value); if (articleData.is_favorite === true) { favorites.push(articleData); } } catch (e) { // Si no es JSON válido, ignorar } } } return favorites; } catch (error) { console.error('Error obteniendo favoritos de Redis:', error.message); return []; } } // Obtener estadísticas app.get('/api/stats', async (req, res) => { try { const workers = readJSON(WORKERS_PATH, { items: [] }); const favorites = await getFavorites(); const notifiedArticles = await getNotifiedArticles(); const stats = { totalWorkers: workers.items?.length || 0, activeWorkers: (workers.items || []).filter(w => !workers.disabled?.includes(w.name)).length, totalFavorites: favorites.length, totalNotified: notifiedArticles.length, platforms: { wallapop: notifiedArticles.filter(a => a.platform === 'wallapop').length, vinted: notifiedArticles.filter(a => a.platform === 'vinted').length, }, }; res.json(stats); } catch (error) { res.status(500).json({ error: error.message }); } }); // Obtener workers app.get('/api/workers', (req, res) => { try { const workers = readJSON(WORKERS_PATH, { items: [], general: {}, disabled: [] }); res.json(workers); } catch (error) { res.status(500).json({ error: error.message }); } }); // Actualizar workers app.put('/api/workers', (req, res) => { try { const workers = req.body; if (writeJSON(WORKERS_PATH, workers)) { broadcast({ type: 'workers_updated', data: workers }); res.json({ success: true }); } else { res.status(500).json({ error: 'Error guardando workers' }); } } catch (error) { res.status(500).json({ error: error.message }); } }); // Obtener favoritos app.get('/api/favorites', async (req, res) => { try { const favorites = await getFavorites(); res.json(favorites); } catch (error) { res.status(500).json({ error: error.message }); } }); // Añadir favorito app.post('/api/favorites', async (req, res) => { try { if (!redisClient) { return res.status(500).json({ error: 'Redis no está disponible' }); } const { platform, id } = req.body; if (!platform || !id) { return res.status(400).json({ error: 'platform e id son requeridos' }); } const key = `notified:${platform}:${id}`; const value = await redisClient.get(key); if (!value) { return res.status(404).json({ error: 'Artículo no encontrado' }); } try { const articleData = JSON.parse(value); articleData.is_favorite = true; // Mantener el TTL existente const ttl = await redisClient.ttl(key); if (ttl > 0) { await redisClient.setex(key, ttl, JSON.stringify(articleData)); } else { await redisClient.set(key, JSON.stringify(articleData)); } const favorites = await getFavorites(); broadcast({ type: 'favorites_updated', data: favorites }); res.json({ success: true, favorites }); } catch (e) { res.status(500).json({ error: 'Error procesando artículo' }); } } catch (error) { res.status(500).json({ error: error.message }); } }); // Eliminar favorito app.delete('/api/favorites/:platform/:id', async (req, res) => { try { if (!redisClient) { return res.status(500).json({ error: 'Redis no está disponible' }); } const { platform, id } = req.params; const key = `notified:${platform}:${id}`; const value = await redisClient.get(key); if (!value) { return res.status(404).json({ error: 'Artículo no encontrado' }); } try { const articleData = JSON.parse(value); articleData.is_favorite = false; // Mantener el TTL existente const ttl = await redisClient.ttl(key); if (ttl > 0) { await redisClient.setex(key, ttl, JSON.stringify(articleData)); } else { await redisClient.set(key, JSON.stringify(articleData)); } const favorites = await getFavorites(); broadcast({ type: 'favorites_updated', data: favorites }); res.json({ success: true, favorites }); } catch (e) { res.status(500).json({ error: 'Error procesando artículo' }); } } catch (error) { res.status(500).json({ error: error.message }); } }); // Limpiar toda la caché de Redis app.delete('/api/cache', async (req, res) => { try { if (!redisClient) { return res.status(500).json({ error: 'Redis no está disponible' }); } // Obtener todas las claves que empiezan con 'notified:' const keys = await redisClient.keys('notified:*'); if (!keys || keys.length === 0) { return res.json({ success: true, message: 'Cache ya está vacío', count: 0 }); } // Eliminar todas las claves const count = keys.length; for (const key of keys) { await redisClient.del(key); } // Notificar a los clientes WebSocket broadcast({ type: 'cache_cleared', data: { count, timestamp: Date.now() } }); // También actualizar favoritos (debería estar vacío ahora) const favorites = await getFavorites(); broadcast({ type: 'favorites_updated', data: favorites }); res.json({ success: true, message: `Cache limpiado: ${count} artículos eliminados`, count }); } catch (error) { console.error('Error limpiando cache de Redis:', error); res.status(500).json({ error: error.message }); } }); // Obtener artículos notificados app.get('/api/articles', async (req, res) => { try { const articles = await getNotifiedArticles(); const limit = parseInt(req.query.limit) || 100; const offset = parseInt(req.query.offset) || 0; const sorted = articles.sort((a, b) => b.notifiedAt - a.notifiedAt); const paginated = sorted.slice(offset, offset + limit); res.json({ articles: paginated, total: articles.length, limit, offset, }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Buscar artículos en Redis app.get('/api/articles/search', async (req, res) => { try { const query = req.query.q || ''; if (!query.trim()) { return res.json({ articles: [], total: 0 }); } const searchTerm = query.toLowerCase().trim(); const allArticles = await getNotifiedArticles(); // Filtrar artículos que coincidan con la búsqueda const filtered = allArticles.filter(article => { // Buscar en título const title = (article.title || '').toLowerCase(); if (title.includes(searchTerm)) return true; // Buscar en descripción const description = (article.description || '').toLowerCase(); if (description.includes(searchTerm)) return true; // Buscar en localidad const location = (article.location || '').toLowerCase(); if (location.includes(searchTerm)) return true; // Buscar en precio (como número o texto) const price = String(article.price || '').toLowerCase(); if (price.includes(searchTerm)) return true; // Buscar en plataforma const platform = (article.platform || '').toLowerCase(); if (platform.includes(searchTerm)) return true; // Buscar en ID const id = String(article.id || '').toLowerCase(); if (id.includes(searchTerm)) return true; return false; }); // Ordenar por fecha de notificación (más recientes primero) const sorted = filtered.sort((a, b) => b.notifiedAt - a.notifiedAt); res.json({ articles: sorted, total: sorted.length, query: query, }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Obtener logs (últimas líneas) app.get('/api/logs', (req, res) => { try { // Intentar múltiples ubicaciones posibles let logFile = LOG_PATH; if (!existsSync(logFile)) { // Intentar en el directorio de logs const altPath = join(PROJECT_ROOT, 'logs', 'monitor.log'); if (existsSync(altPath)) { logFile = altPath; } else { return res.json({ logs: [] }); } } // Verificar que no sea un directorio try { const stats = statSync(logFile); if (stats.isDirectory()) { return res.json({ logs: ['Error: monitor.log es un directorio. Por favor, elimínalo y reinicia.'] }); } } catch (e) { return res.json({ logs: [] }); } const logs = readFileSync(logFile, 'utf8'); const lines = logs.split('\n').filter(l => l.trim()); const limit = parseInt(req.query.limit) || 100; const lastLines = lines.slice(-limit); // Mantener orden natural: más antiguo a más reciente res.json({ logs: lastLines }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Obtener configuración app.get('/api/config', (req, res) => { try { if (!config) { config = yaml.parse(readFileSync(CONFIG_PATH, 'utf8')); } // No enviar token por seguridad const safeConfig = { ...config }; if (safeConfig.telegram_token) { safeConfig.telegram_token = '***'; } res.json(safeConfig); } catch (error) { res.status(500).json({ error: error.message }); } }); // Obtener threads/topics de Telegram app.get('/api/telegram/threads', async (req, res) => { try { if (!config) { config = yaml.parse(readFileSync(CONFIG_PATH, 'utf8')); } const token = config?.telegram_token; const channel = config?.telegram_channel; if (!token || !channel) { return res.status(400).json({ error: 'Token o canal de Telegram no configurados' }); } // Convertir el canal a chat_id si es necesario let chatId = channel; if (channel.startsWith('@')) { // Para canales con @, necesitamos obtener el chat_id primero const getChatUrl = `https://api.telegram.org/bot${token}/getChat?chat_id=${encodeURIComponent(channel)}`; const chatResponse = await fetch(getChatUrl); const chatData = await chatResponse.json(); if (!chatData.ok) { return res.status(400).json({ error: `Error obteniendo chat: ${chatData.description || 'Chat no encontrado'}` }); } chatId = chatData.result.id; } // Intentar obtener forum topics const forumTopicsUrl = `https://api.telegram.org/bot${token}/getForumTopics?chat_id=${chatId}&limit=100`; const topicsResponse = await fetch(forumTopicsUrl); const topicsData = await topicsResponse.json(); if (topicsData.ok && topicsData.result?.topics) { const threads = topicsData.result.topics.map(topic => ({ id: topic.message_thread_id, name: topic.name || `Thread ${topic.message_thread_id}`, icon_color: topic.icon_color, icon_custom_emoji_id: topic.icon_custom_emoji_id, })); return res.json({ threads, success: true }); } else { // Si no hay forum topics, devolver un mensaje informativo return res.json({ threads: [], success: false, message: 'El chat no tiene forum topics habilitados o no se pudieron obtener. Puedes obtener el Thread ID manualmente copiando el enlace del tema.', info: 'Para obtener el Thread ID manualmente: 1. Haz clic derecho en el tema/hilo en Telegram 2. Selecciona "Copiar enlace del tema" 3. El número al final de la URL es el Thread ID (ej: t.me/c/1234567890/8 → Thread ID = 8)' }); } } catch (error) { console.error('Error obteniendo threads de Telegram:', error.message); res.status(500).json({ error: error.message }); } }); // WebSocket connection wss.on('connection', (ws) => { console.log('Cliente WebSocket conectado'); ws.on('close', () => { console.log('Cliente WebSocket desconectado'); }); ws.on('error', (error) => { console.error('Error WebSocket:', error); }); }); // Determinar la ruta del log para el watcher let watchLogPath = LOG_PATH; if (!existsSync(watchLogPath)) { const altPath = join(PROJECT_ROOT, 'logs', 'monitor.log'); if (existsSync(altPath)) { watchLogPath = altPath; } } // Watch files for changes (ya no vigilamos FAVORITES_PATH porque usa Redis) const watcher = watch([WORKERS_PATH, watchLogPath].filter(p => existsSync(p)), { persistent: true, ignoreInitial: true, }); watcher.on('change', async (path) => { console.log(`Archivo cambiado: ${path}`); if (path === WORKERS_PATH) { const workers = readJSON(WORKERS_PATH); broadcast({ type: 'workers_updated', data: workers }); } else if (path === LOG_PATH) { broadcast({ type: 'logs_updated' }); } }); // Inicializar servidor const PORT = process.env.PORT || 3001; async function startServer() { await initRedis(); server.listen(PORT, () => { console.log(`🚀 Servidor backend ejecutándose en http://localhost:${PORT}`); console.log(`📡 WebSocket disponible en ws://localhost:${PORT}`); }); } startServer().catch(console.error);