Major refactoring

This commit is contained in:
Kifixo
2024-02-12 23:59:16 +01:00
parent cefa2aec20
commit 685518bbcd
10 changed files with 343 additions and 261 deletions

196
worker.py
View File

@@ -1,127 +1,115 @@
import time
import requests
import telegram
from dotenv import load_dotenv
import os
load_dotenv()
import logging
from article import Article
from telegram_handler import TelegramHandler
import traceback
import asyncio
TELEGRAM_CHANNEL_ID = os.getenv("TELEGRAM_CHANNEL_ID")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
SLEEP_TIME = 5
REQUEST_SLEEP_TIME = 5
REQUEST_RETRY_TIME = 3
ERROR_SLEEP_TIME = 10
worker_logger = logging.getLogger(__name__)
worker_logger.setLevel(logging.INFO) # Set the level as needed
worker_logger.addHandler(logging.StreamHandler())
class Worker:
def __init__(self, item_to_monitor):
self.logger = logging.getLogger(__name__)
self._item_monitoring = item_to_monitor
self._notified_articles = self._request_articles()
self._telegram_handler = TelegramHandler()
def request(self, product_name, n_articles, latitude='40.4165', longitude='-3.70256', distance='0', condition='all', min_price=0, max_price=10000000):
url = (f"http://api.wallapop.com/api/v3/general/search?keywords={product_name}"
f"&order_by=newest&latitude={latitude}"
f"&longitude={longitude}"
f"&distance={distance}"
f"&min_sale_price={min_price}"
f"&max_sale_price={max_price}"
f"&filters_source=quick_filters&language=es_ES")
if condition != "all":
url = url + f"&condition={condition}" # new, as_good_as_new, good, fair, has_given_it_all
def _request_articles(self):
url = (
f"http://api.wallapop.com/api/v3/general/search?keywords={self._item_monitoring.get_search_query()}"
f"&order_by=newest&latitude={self._item_monitoring.get_latitude()}"
f"&longitude={self._item_monitoring.get_longitude()}"
f"&distance={self._item_monitoring.get_max_distance()}"
f"&min_sale_price={self._item_monitoring.get_min_price()}"
f"&max_sale_price={self._item_monitoring.get_max_price()}"
f"&filters_source=quick_filters&language=es_ES"
)
if self._item_monitoring.get_condition() != "all":
url += f"&condition={self._item_monitoring.get_condition()}" # new, as_good_as_new, good, fair, has_given_it_all
while True:
response = requests.get(url)
try:
if response.status_code == 200:
break
else:
print(f"\'{product_name}\' -> Wallapop returned status {response.status_code}. Illegal parameters or Wallapop service is down. Retrying...")
except Exception as e:
print("Exception: " + e)
time.sleep(3)
response = requests.get(url)
response.raise_for_status()
break
except requests.exceptions.RequestException as err:
self.logger.error(f"Request Exception: {err}")
time.sleep(REQUEST_RETRY_TIME)
json_data = response.json()
return json_data['search_objects']
json_response = response.json()
articles = self._parse_json_response(json_response['search_objects'])
return articles
def first_run(self, args):
list = []
articles = self.request(args['product_name'], 0, args['latitude'], args['longitude'], args['distance'], args['condition'], args['min_price'], args['max_price'])
for article in articles:
list.insert(0, article['id'])
return list
def _parse_json_response(self, json_response):
articles = []
for json_article in json_response:
articles.append(Article.load_from_json(json_article))
return articles
def work(self, args, list):
def _has_words(self, text, word_list):
return any(word in text for word in word_list)
def _title_has_excluded_words(self, article):
return self._has_words(article.get_title(),
self._item_monitoring.get_title_exclude())
def _description_has_excluded_words(self, article):
return self._has_words(article.get_description(),
self._item_monitoring.get_description_exclude())
def _title_has_required_words(self, article):
return not self._item_monitoring.get_title_must_include() \
or self._has_words(article.get_title(),
self._item_monitoring.get_title_must_include())
def _description_has_required_words(self, article):
return not self._item_monitoring.get_description_must_include() \
or self._has_words(article.get_description(),
self._item_monitoring.get_description_must_include())
def _meets_item_conditions(self, article):
return (
self._title_has_required_words(article) and
self._description_has_required_words(article) and
not self._title_has_excluded_words(article) and
not self._description_has_excluded_words(article) and
article not in self._notified_articles
)
def work(self):
exec_times = []
bot = telegram.Bot(token = TELEGRAM_TOKEN)
while True:
start_time = time.time()
articles = self.request(args['product_name'], 0, args['latitude'], args['longitude'], args['distance'], args['condition'], args['min_price'], args['max_price'])
for article in articles:
if not article['id'] in list:
articles = self._request_articles()
for article in articles[0:1]:
if self._meets_item_conditions(article):
try:
if not self.has_excluded_words(article['title'].lower(), article['description'].lower(), args['exclude']) and not self.is_title_key_word_excluded(article['title'].lower(), args['title_keyword_exclude']):
try:
bot.send_message(TELEGRAM_CHANNEL_ID, f"*Artículo*: {article['title']}\n"
f"*Descripción*: {article['description']}\n"
f"*Precio*: {article['price']} {article['currency']}\n"
f"[Ir al anuncio](https://es.wallapop.com/item/{article['web_slug']})"
, "MARKDOWN")
except:
bot.send_message(TELEGRAM_CHANNEL_ID, f"*Artículo*: {article['title']}\n"
f"*Descripción*: Descripción inválida\n"
f"*Precio*: {article['price']} {article['currency']}\n"
f"[Ir al anuncio](https://es.wallapop.com/item/{article['web_slug']})"
, "MARKDOWN")
time.sleep(1) # Avoid Telegram flood restriction
list.insert(0, article['id'])
self._telegram_handler.send_telegram_article(article)
self._notified_articles.insert(0, article)
except Exception as e:
print("---------- EXCEPTION -----------")
f = open("error_log.txt", "a")
f.write(f"{args['product_name']} worker crashed. {e}")
f.write(f"{args['product_name']}: Trying to parse {article['id']}: {article['title']} .\n")
f.close()
time.sleep(SLEEP_TIME)
self.logger.error(f"{self._item_monitoring.get_search_query()} worker crashed: {e}")
time.sleep(REQUEST_SLEEP_TIME)
exec_times.append(time.time() - start_time)
print(f"\'{args['product_name']}\' node-> last: {exec_times[-1]} max: {self.get_max_time(exec_times)} avg: {self.get_average_time(exec_times)}")
def has_excluded_words(self, title, description, excluded_words):
for word in excluded_words:
print("EXCLUDER: Checking '" + word + "' for title: '" + title)
if word in title or word in description:
print("EXCLUDE!")
return True
return False
self.logger.info(f"\'{self._item_monitoring.get_search_query()}\' node-> last: {exec_times[-1]}"
f" max: {max(exec_times)} avg: {sum(exec_times) / len(exec_times)}")
def is_title_key_word_excluded(self, title, excluded_words):
for word in excluded_words:
print("Checking '" + word + "' for title: '" + title)
if word in title:
return True
return False
def get_average_time(self, exec_times):
sum = 0
for i in exec_times:
sum = sum + i
return sum / len(exec_times)
def get_max_time(self, exec_times):
largest = 0
for i in exec_times:
if i > largest:
largest = i
return largest
def run(args):
worker = Worker()
list = worker.first_run(args)
def run(self):
while True:
try:
print(f"Wallapop monitor worker started. Checking for new items containing: \'{args['product_name']}\' with given parameters periodically")
worker.work(args, list)
self.logger.info(f"Wallapop monitor worker started. Checking for "
f"new items containing '{self._item_monitoring.get_search_query()}' "
f"with given parameters periodically")
self.work()
except Exception as e:
print(f"Exception: {e}")
print(f"{args['product_name']} worker crashed. Restarting worker...")
time.sleep(10)
self.logger.error(f"{''.join(traceback.format_exception(None, e, e.__traceback__))}")
self.logger.error(f"{self._item_monitoring.get_search_query()} worker crashed. Restarting worker...")
time.sleep(ERROR_SLEEP_TIME)