add abstraction ob platform and article + vinted

"

Signed-off-by: Omar Sánchez Pizarro <omar.sanchez@pistacero.net>
This commit is contained in:
Omar Sánchez Pizarro
2025-10-10 14:58:27 +02:00
parent b5178f415b
commit 4111f57564
16 changed files with 1890 additions and 102 deletions

162
platforms/README.md Normal file
View File

@@ -0,0 +1,162 @@
# Plataformas de Monitoreo
Este directorio contiene las implementaciones de las diferentes plataformas de marketplace que el monitor puede usar.
## Estructura
- `base_platform.py`: Clase abstracta que define la interfaz común para todas las plataformas
- `platform_factory.py`: Factory para crear instancias de plataformas dinámicamente
- `wallapop_platform.py`: Implementación para Wallapop
- `vinted_platform_template.py`: Plantilla de ejemplo para añadir nuevas plataformas
## Cómo añadir una nueva plataforma
### 1. Crear la clase de la plataforma
Crea un nuevo archivo `<platform_name>_platform.py` que herede de `BasePlatform`:
```python
from platforms.base_platform import BasePlatform
from models.article import Article
import requests
import logging
class MiPlataformaPlatform(BasePlatform):
def __init__(self, item_monitor):
super().__init__(item_monitor)
self.logger = logging.getLogger(__name__)
def get_platform_name(self):
return "mi_plataforma"
def create_url(self):
# Construye la URL de búsqueda usando self._item_monitor
url = f"https://api.miplataforma.com/search?q={self._item_monitor.get_search_query()}"
# Añade parámetros opcionales según la configuración
if self._item_monitor.get_min_price() != 0:
url += f"&min_price={self._item_monitor.get_min_price()}"
return url
def fetch_articles(self):
# Obtiene artículos desde la API
url = self.create_url()
headers = self.get_request_headers()
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
return self.parse_response(data)
def parse_response(self, response_data):
# Parsea la respuesta y crea objetos Article
articles = []
for item in response_data['items']:
article = Article(
id=item['id'],
title=item['title'],
description=item['description'],
price=item['price'],
currency=item['currency'],
location=item['location'],
allows_shipping=item['shipping'],
url=item['url'],
images=item['images'][:3],
modified_at=item['date'],
platform=self.get_platform_name()
)
articles.append(article)
return articles
```
### 2. Registrar la plataforma en el Factory
Edita `platform_factory.py` y añade tu plataforma al diccionario `_platforms`:
```python
from platforms.mi_plataforma_platform import MiPlataformaPlatform
class PlatformFactory:
_platforms = {
'wallapop': WallapopPlatform,
'mi_plataforma': MiPlataformaPlatform, # <-- Añade aquí
}
```
### 3. Configurar workers.json
Añade el campo `platform` a tus items en `workers.json`:
```json
{
"items": [
{
"name": "Mi búsqueda",
"platform": "mi_plataforma",
"search_query": "gameboy",
"min_price": 10,
"max_price": 100,
...
}
]
}
```
## Parámetros disponibles en ItemMonitor
Tu plataforma puede usar los siguientes parámetros de configuración:
- `get_name()`: Nombre del monitor
- `get_search_query()`: Términos de búsqueda
- `get_latitude()` / `get_longitude()`: Coordenadas geográficas
- `get_max_distance()`: Distancia máxima en km
- `get_condition()`: Estado del artículo
- `get_min_price()` / `get_max_price()`: Rango de precios
- `get_title_exclude()`: Palabras a excluir del título
- `get_description_exclude()`: Palabras a excluir de la descripción
- `get_title_must_include()`: Palabras requeridas en el título
- `get_description_must_include()`: Palabras requeridas en la descripción
- `get_title_first_word_exclude()`: Primera palabra del título a excluir
- `get_check_every()`: Intervalo de verificación en segundos
- `get_thread_id()`: ID del hilo de Telegram
**Nota:** Los filtros de título y descripción son aplicados automáticamente por el Worker,
no necesitas implementarlos en tu plataforma.
## Modelo Article
Todas las plataformas deben devolver objetos `Article` con los siguientes campos:
- `id`: Identificador único del artículo en la plataforma
- `title`: Título del artículo
- `description`: Descripción (se trunca a 500 caracteres)
- `price`: Precio numérico
- `currency`: Código de moneda (EUR, USD, etc.)
- `location`: Ubicación del vendedor
- `allows_shipping`: Boolean indicando si permite envíos
- `url`: URL del artículo
- `images`: Lista de URLs de imágenes (máximo 3)
- `modified_at`: Fecha de modificación (string "YYYY-MM-DD HH:MM:SS")
- `platform`: Nombre de la plataforma
## Lógica común del Worker
El Worker maneja automáticamente:
- ✅ Filtrado por palabras excluidas en título/descripción
- ✅ Filtrado por palabras requeridas en título/descripción
- ✅ Filtrado por primera palabra del título
- ✅ Deduplicación de artículos
- ✅ Envío de notificaciones a Telegram
- ✅ Gestión de errores y reintentos
- ✅ Logging y estadísticas
Tu plataforma solo debe encargarse de:
- 🔧 Construir la URL de búsqueda específica de la plataforma
- 🔧 Hacer la petición HTTP con los headers apropiados
- 🔧 Parsear la respuesta y convertirla a objetos Article

2
platforms/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# Platforms module

View File

@@ -0,0 +1,69 @@
from abc import ABC, abstractmethod
class BasePlatform(ABC):
"""Abstract base class for marketplace platforms"""
def __init__(self, item_monitor):
"""
Initialize platform with item monitoring configuration
Args:
item_monitor: ItemMonitor instance with search parameters
"""
self._item_monitor = item_monitor
@abstractmethod
def get_platform_name(self):
"""
Get the name of the platform
Returns:
str: Platform name (e.g., 'wallapop', 'vinted', 'buyee')
"""
pass
@abstractmethod
def create_url(self):
"""
Create the search URL based on item_monitor parameters
Returns:
str: Complete URL for API/search request
"""
pass
@abstractmethod
def fetch_articles(self):
"""
Fetch articles from the platform
Returns:
list: List of Article objects
"""
pass
@abstractmethod
def parse_response(self, response_data):
"""
Parse platform-specific response into Article objects
Args:
response_data: Raw response data from the platform
Returns:
list: List of Article objects
"""
pass
def get_request_headers(self):
"""
Get platform-specific request headers
Override this method if platform needs custom headers
Returns:
dict: Headers for HTTP request
"""
return {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
}

View File

@@ -0,0 +1,62 @@
from platforms.wallapop_platform import WallapopPlatform
from platforms.vinted_platform import VintedPlatform
class PlatformFactory:
"""Factory class for creating platform instances"""
# Registry of available platforms
_platforms = {
'wallapop': WallapopPlatform,
'vinted': VintedPlatform,
# Add more platforms here as they are implemented:
# 'buyee': BuyeePlatform,
}
@classmethod
def create_platform(cls, platform_name, item_monitor):
"""
Create a platform instance based on the platform name
Args:
platform_name: Name of the platform (e.g., 'wallapop', 'vinted')
item_monitor: ItemMonitor instance with search parameters
Returns:
BasePlatform: Instance of the requested platform
Raises:
ValueError: If platform is not supported
"""
platform_name = platform_name.lower()
if platform_name not in cls._platforms:
available = ', '.join(cls._platforms.keys())
raise ValueError(
f"Platform '{platform_name}' is not supported. "
f"Available platforms: {available}"
)
platform_class = cls._platforms[platform_name]
return platform_class(item_monitor)
@classmethod
def get_available_platforms(cls):
"""
Get list of available platform names
Returns:
list: List of supported platform names
"""
return list(cls._platforms.keys())
@classmethod
def register_platform(cls, platform_name, platform_class):
"""
Register a new platform class
Args:
platform_name: Name identifier for the platform
platform_class: Class implementing BasePlatform
"""
cls._platforms[platform_name.lower()] = platform_class

View File

@@ -0,0 +1,307 @@
"""
Vinted Platform Implementation
Uses Vinted's internal API for product search
"""
import requests
import logging
import time
from datetime import datetime
from platforms.base_platform import BasePlatform
from models.article import Article
REQUEST_RETRY_TIME = 5
class VintedPlatform(BasePlatform):
"""Vinted marketplace platform implementation"""
# Mapping de dominios por país
COUNTRY_DOMAINS = {
'es': 'vinted.es',
'fr': 'vinted.fr',
'de': 'vinted.de',
'it': 'vinted.it',
'pl': 'vinted.pl',
'cz': 'vinted.cz',
'lt': 'vinted.lt',
'uk': 'vinted.co.uk',
'us': 'vinted.com',
'nl': 'vinted.nl',
'be': 'vinted.be',
'at': 'vinted.at',
}
def __init__(self, item_monitor):
super().__init__(item_monitor)
self.logger = logging.getLogger(__name__)
# Por defecto España, se puede configurar con un campo 'country' en item_monitor
self.country = getattr(item_monitor, '_country', 'es')
self.domain = self.COUNTRY_DOMAINS.get(self.country, 'vinted.es')
self.session = requests.Session()
self._init_session()
def _init_session(self):
"""Initialize session with proper cookies and headers"""
try:
# Primera petición para obtener cookies
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
response = self.session.get(f'https://www.{self.domain}', headers=headers, timeout=15)
self.logger.info(f"Vinted session initialized for {self.domain}")
except Exception as e:
self.logger.warning(f"Could not initialize Vinted session: {e}")
def get_platform_name(self):
return "vinted"
def create_url(self):
"""Construir URL de búsqueda de Vinted"""
# API interna de Vinted
base_url = f"https://www.{self.domain}/api/v2/catalog/items"
params = []
# Query de búsqueda
search_query = self._item_monitor.get_search_query()
if search_query:
params.append(f"search_text={requests.utils.quote(search_query)}")
# Ordenar por más reciente
params.append("order=newest_first")
# Precio (Vinted usa céntimos, multiplicamos por 100)
if self._item_monitor.get_min_price() != 0:
price_cents = int(self._item_monitor.get_min_price() * 100)
params.append(f"price_from={price_cents}")
if self._item_monitor.get_max_price() != 0:
price_cents = int(self._item_monitor.get_max_price() * 100)
params.append(f"price_to={price_cents}")
# Resultados por página (máximo suele ser 96)
params.append("per_page=96")
# Página (por defecto la primera)
params.append("page=1")
# Mapeo de condiciones Wallapop -> Vinted
condition = self._item_monitor.get_condition()
if condition != "all":
vinted_status = self._map_condition_to_vinted(condition)
if vinted_status:
params.append(f"status_ids[]={vinted_status}")
url = base_url
if params:
url += "?" + "&".join(params)
return url
def _map_condition_to_vinted(self, wallapop_condition):
"""
Mapear condiciones de Wallapop a IDs de estado de Vinted
Vinted status IDs: 1=Satisfactory, 2=Good, 3=Very Good, 6=Brand new with tag, 7=Brand new without tag
"""
mapping = {
'new': '6', # Brand new with tag
'as_good_as_new': '7', # Brand new without tag
'good': '3', # Very Good
'fair': '2', # Good
'has_given_it_all': '1' # Satisfactory
}
return mapping.get(wallapop_condition)
def get_request_headers(self):
"""Headers específicos para Vinted"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': f'https://www.{self.domain}/',
'Origin': f'https://www.{self.domain}',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Sec-Ch-Ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
}
return headers
def fetch_articles(self):
"""Obtener artículos desde Vinted"""
url = self.create_url()
max_retries = 3
for attempt in range(max_retries):
try:
headers = self.get_request_headers()
response = self.session.get(url, headers=headers, timeout=30, allow_redirects=True)
response.raise_for_status()
json_response = response.json()
# Verificar estructura de respuesta
if 'items' not in json_response:
self.logger.warning(f"Unexpected Vinted response structure. Keys: {list(json_response.keys())}")
# Intentar ver si hay un mensaje de error
if 'error' in json_response:
self.logger.error(f"Vinted API error: {json_response['error']}")
return []
# INSERT_YOUR_CODE
json_items = json_response['items']
articles = self.parse_response(json_items)
return articles
except requests.exceptions.HTTPError as err:
status_code = err.response.status_code
self.logger.error(f"Vinted HTTP Error {status_code}: {err}")
if status_code == 401 or status_code == 403:
self.logger.warning("Vinted authentication issue, reinitializing session...")
self._init_session()
elif status_code == 429:
self.logger.warning("Vinted rate limit hit, waiting longer...")
time.sleep(REQUEST_RETRY_TIME * 3)
elif status_code == 404:
self.logger.error("Vinted API endpoint not found. Check URL.")
return []
# Log response content for debugging
try:
self.logger.debug(f"Response content: {err.response.text[:500]}")
except:
pass
except requests.exceptions.RequestException as err:
self.logger.error(f"Vinted Request Exception: {err}")
except ValueError as e:
self.logger.error(f"Error parsing JSON response from Vinted: {e}")
try:
self.logger.debug(f"Response text: {response.text[:500]}")
except:
pass
except Exception as e:
self.logger.error(f"Unexpected error fetching from Vinted: {e}")
if attempt < max_retries - 1:
wait_time = REQUEST_RETRY_TIME * (attempt + 1)
self.logger.info(f"Retrying in {wait_time} seconds... (attempt {attempt + 2}/{max_retries})")
time.sleep(wait_time)
self.logger.warning(f"Failed to fetch articles from Vinted after {max_retries} attempts")
return []
def parse_response(self, json_items):
"""Parsear respuesta de Vinted"""
articles = []
for json_article in json_items:
article = self._parse_single_article(json_article)
if article:
articles.append(article)
return articles
def _parse_single_article(self, json_data):
"""Parsear un artículo individual de Vinted"""
try:
# ID del artículo
article_id = str(json_data['id'])
# Título
title = json_data.get('title', '')
# Descripción
description = json_data.get('description', '')
# Precio (Vinted devuelve en céntimos, convertimos a euros)
price_amount = json_data.get('price', {}).get('amount', 0)
if price_amount:
price = float(price_amount)
else:
price = 0.0
# Moneda
currency = json_data.get('price', {}).get('currency_code', 'EUR')
# Ubicación
user_data = json_data.get('user', {})
location = user_data.get('city', 'Unknown')
# URL del artículo
article_url = json_data.get('url', '')
if article_url and not article_url.startswith('http'):
article_url = f"https://www.{self.domain}{article_url}"
# Imágenes
images = []
photo = json_data.get('photo')
if photo and 'url' in photo:
images.append(photo['url'])
# Imágenes adicionales
photos = json_data.get('photos', [])
for photo in photos[:3]: # Máximo 3 imágenes
if 'url' in photo:
url = photo['url']
if url not in images: # Evitar duplicados
images.append(url)
# Limitar a 3 imágenes
images = images[:3]
# Fecha de modificación
updated_at_str = json_data.get('photo', {}).get('high_resolution', {}).get('timestamp')
if not updated_at_str:
# Alternativa: usar created_at_ts
created_ts = json_data.get('created_at_ts')
if created_ts:
try:
dt = datetime.fromtimestamp(int(created_ts))
modified_at = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
modified_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
modified_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
try:
dt = datetime.fromtimestamp(int(updated_at_str))
modified_at = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
modified_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Envíos - Vinted generalmente permite envíos
allows_shipping = True
return Article(
id=article_id,
title=title,
description=description,
price=price,
currency=currency,
location=location,
allows_shipping=allows_shipping,
url=article_url,
images=images,
modified_at=modified_at,
platform=self.get_platform_name()
)
except (KeyError, ValueError, TypeError) as e:
self.logger.info(f"Error parsing Vinted article: {e} {json_data}")
return None

View File

@@ -0,0 +1,112 @@
import requests
import logging
import time
import datetime
from platforms.base_platform import BasePlatform
from models.article import Article
REQUEST_RETRY_TIME = 5
class WallapopPlatform(BasePlatform):
"""Wallapop marketplace platform implementation"""
def __init__(self, item_monitor):
super().__init__(item_monitor)
self.logger = logging.getLogger(__name__)
def get_platform_name(self):
return "wallapop"
def create_url(self):
"""Create Wallapop API search URL"""
url = (
f"http://api.wallapop.com/api/v3/search"
f"?source=search_box"
f"&keywords={self._item_monitor.get_search_query()}"
f"&order_by=newest"
f"&language=es_ES"
)
# Only include latitude and longitude if both are not 0
if self._item_monitor.get_latitude() != 0 and self._item_monitor.get_longitude() != 0:
url += (
f"&latitude={self._item_monitor.get_latitude()}"
f"&longitude={self._item_monitor.get_longitude()}"
)
if self._item_monitor.get_min_price() != 0:
url += f"&min_sale_price={self._item_monitor.get_min_price()}"
if self._item_monitor.get_max_price() != 0:
url += f"&max_sale_price={self._item_monitor.get_max_price()}"
if self._item_monitor.get_max_distance() != 0:
url += f"&distance_in_km={self._item_monitor.get_max_distance()}"
if self._item_monitor.get_condition() != "all":
url += f"&condition={self._item_monitor.get_condition()}" # new, as_good_as_new, good, fair, has_given_it_all
return url
def get_request_headers(self):
"""Get Wallapop-specific headers"""
headers = super().get_request_headers()
headers['X-DeviceOS'] = '0'
return headers
def fetch_articles(self):
"""Fetch articles from Wallapop API"""
url = self.create_url()
while True:
try:
headers = self.get_request_headers()
response = requests.get(url, headers=headers)
response.raise_for_status()
break
except requests.exceptions.RequestException as err:
self.logger.error(f"Request Exception: {err}")
time.sleep(REQUEST_RETRY_TIME)
json_response = response.json()
json_items = json_response['data']['section']['payload']['items']
articles = self.parse_response(json_items)
return articles
def parse_response(self, json_items):
"""Parse Wallapop JSON response into Article objects"""
articles = []
for json_article in json_items:
article = self._parse_single_article(json_article)
if article:
articles.append(article)
return articles
def _parse_single_article(self, json_data):
"""Parse a single Wallapop article from JSON"""
try:
# Extract images with proper format
images = [img['urls']['medium'] for img in json_data['images'][:3]]
# Convert timestamp to datetime string
ts = int(json_data['modified_at'])
dt = datetime.datetime.fromtimestamp(ts / 1000)
modified_at = dt.strftime("%Y-%m-%d %H:%M:%S")
return Article(
id=json_data['id'],
title=json_data['title'],
description=json_data['description'],
price=json_data['price']['amount'],
currency=json_data['price']['currency'],
location=json_data['location']['city'],
allows_shipping=json_data['shipping']['user_allows_shipping'],
url="https://wallapop.com/item/" + json_data['web_slug'],
images=images,
modified_at=modified_at,
platform=self.get_platform_name()
)
except (KeyError, ValueError) as e:
self.logger.error(f"Error parsing Wallapop article: {e}")
return None