142 lines
4.8 KiB
Python
142 lines
4.8 KiB
Python
import asyncio
|
|
|
|
import requests, json, pytz
|
|
from datetime import datetime
|
|
from time import sleep
|
|
import random
|
|
from app import arg_parser, config_parser, telegram_bot, google_calendar
|
|
|
|
args = arg_parser.ArgParser().parse()
|
|
config = config_parser.ConfigParser().loadConfig()
|
|
|
|
|
|
class timenetManager:
|
|
telegram = None
|
|
message_acumulator = ""
|
|
|
|
def __init__(self):
|
|
self.telegram = telegram_bot.telegramBot()
|
|
|
|
headers = {
|
|
"Content-type": "application/x-www-form-urlencoded; charset=UTF-8",
|
|
"tn-v": "3.0.9",
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
|
|
}
|
|
|
|
def get(self, url, data={}, headers={}):
|
|
furl = config['api_url'] + url
|
|
if url != "version":
|
|
furl = config['api_url'] + "v1/" + url
|
|
return requests.get(furl, data=data, headers=headers)
|
|
|
|
def post(self, url, data={}, headers={}):
|
|
furl = config['api_url'] + url
|
|
if url != "version":
|
|
furl = config['api_url'] + "v1/" + url
|
|
return requests.post(furl, data=data, headers=headers)
|
|
|
|
def addMessage(self, message):
|
|
self.message_acumulator += message + "\n"
|
|
|
|
def sendReport(self):
|
|
# self.telegram.sendMessage(self.message_acumulator)
|
|
asyncio.run(self.telegram.sendMessage(self.message_acumulator))
|
|
|
|
def updateTime(self):
|
|
self.headers["tn-d"] = "\"" + datetime.now().astimezone(pytz.UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + "\""
|
|
|
|
def updateVersion(self):
|
|
self.headers["tn-v"] = self.getVersion()
|
|
print(self.headers)
|
|
exit(20)
|
|
|
|
def getVersion(self):
|
|
self.updateTime()
|
|
return self.get('version', self.headers)
|
|
|
|
def getInfo(self):
|
|
return self.get('check/info', {"guid": args.user})
|
|
|
|
def sendUpdate(self):
|
|
|
|
if args.pin is None:
|
|
# if pin is None, prompt for it
|
|
self.args.pin = int(input("Introduzca su pin: "))
|
|
|
|
calendar = google_calendar.GoogleCalendar()
|
|
calendar = calendar.getEvent()
|
|
if calendar:
|
|
self.addMessage("🟥🕐 Comprobación de calendario: " + calendar)
|
|
return
|
|
|
|
typ = 0
|
|
if args.basedtime and not args.type:
|
|
hour = datetime.now().strftime("%H")
|
|
|
|
hIN = config['in_hours']
|
|
|
|
hOUT = config['out_hours']
|
|
|
|
if hour in hIN:
|
|
typ = 0
|
|
elif hour in hOUT:
|
|
typ = 1
|
|
else:
|
|
self.addMessage("No se ha fichado ni desfichado ya que estamos en horario laboral, fuerze el estado con el parametro --type <0 = Entrada, 1 = Salida>")
|
|
else:
|
|
typ = args.type
|
|
|
|
|
|
|
|
response = None
|
|
if not args.debug:
|
|
self.addMessage("####HACIENDO CHECK####")
|
|
sleep(random.randint(0, 230))
|
|
self.updateTime()
|
|
data = {
|
|
"cp": config['user'],
|
|
"pin": args.pin,
|
|
"typ": typ,
|
|
"date": self.headers['tn-d'],
|
|
"geoLatitude": config['geo']['latitude'],
|
|
"geoLongitude": config['geo']['longitude'],
|
|
"geoError": "",
|
|
"c": 1,
|
|
"geoAccuracy": config['geo']['accuracy']
|
|
}
|
|
response = self.post("check", data, self.headers)
|
|
else:
|
|
self.addMessage('Corriendo en modo debug. No se realizará ninguna acción')
|
|
exit(20)
|
|
|
|
if response.text == "no valid worker":
|
|
self.addMessage("El trabajador no ha podido ser identificado")
|
|
exit(20)
|
|
|
|
try:
|
|
rj = json.loads(response.text)
|
|
except:
|
|
self.addMessage("La respuesta al hacer check no es correcta... algo ha pasado :/")
|
|
exit(20)
|
|
|
|
if response.status_code == 200:
|
|
if rj['Repeated']:
|
|
time = datetime.strptime(rj['RepeatedTime'], "%Y-%m-%dT%H:%M:%SZ").replace(
|
|
tzinfo=pytz.utc).astimezone(pytz.timezone("Europe/Madrid")).strftime("%H:%M")
|
|
self.addMessage("🕐 Ya se ha realizado este marcaje antes a las %s" % time)
|
|
else:
|
|
time = datetime.now().astimezone(pytz.timezone("Europe/Madrid")).strftime("%H:%M")
|
|
|
|
conditional_response = [{
|
|
"text": "fichado",
|
|
"emoji": "🕐🏢🏃♂️🏡"
|
|
},{
|
|
"text": "desfichado",
|
|
"emoji": "🏡🏃♂️🏢🕐"
|
|
}]
|
|
self.addMessage('%s Has %s correctamente a las %s' % (conditional_response[int(typ)]['emoji'], conditional_response[int(typ)]['text'], time))
|
|
else:
|
|
self.addMessage("🟥🕐 No se ha podido fichar, la web no ha devuelto 200 OK")
|
|
|
|
self.sendReport()
|