import google.oauth2.credentials from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError from datetime import datetime from dateutil.relativedelta import relativedelta import os class GoogleCalendar: def __init__(self): self.creds = None self.service = None self.SCOPES = ['https://www.googleapis.com/auth/calendar.readonly'] self.authfile = os.path.join(os.path.dirname(__file__), '../googleoauth.json') self.grantfile = os.path.join(os.path.dirname(__file__), '../googleoauth_grant.json') if os.path.exists(self.authfile): try: self.creds = Credentials.from_authorized_user_file(self.authfile, self.SCOPES) except ValueError as error: print(error) self.checkAuth() self.buildService() def serverAuth(self): flow = InstalledAppFlow.from_client_secrets_file(self.grantfile, self.SCOPES) self.creds = flow.run_local_server(port=0) def saveAuth(self): with open(self.authfile, 'w') as token: token.write(self.creds.to_json()) def refreshAuth(self): self.creds.refresh(Request()) def checkAuth(self): if not self.creds or not self.creds.valid: if self.creds and self.creds.expired and self.creds.refresh_token: try: self.refreshAuth() except google.oauth2.credentials.exceptions.RefreshError as rerror: self.serverAuth() else: self.serverAuth() self.saveAuth() def buildService(self): self.service = build('calendar', 'v3', credentials=self.creds) def getNow(self): return datetime.utcnow().strftime('%Y-%m-%dT00:00:00Z') def getNowAfterOneDay(self): return (datetime.utcnow() + relativedelta(days=1)).strftime('%Y-%m-%dT00:00:00Z') def getEvent(self): try: now = self.getNow() now_after_one_day = self.getNowAfterOneDay() events_result_autoficher = self.service.events().list( calendarId='e51d3bfb6b352c48afb8bd7a5d494599cc3eaa686800a856796306f50c6d72fd@group.calendar.google.com', timeMin=now, timeMax=now_after_one_day, maxResults=10, singleEvents=True, orderBy='startTime').execute() events_result_holiday = self.service.events().list( calendarId='es.spain#holiday@group.v.calendar.google.com', timeMin=now, timeMax=now_after_one_day, maxResults=10, singleEvents=True, orderBy='startTime').execute() events = events_result_autoficher.get('items', []) + events_result_holiday.get('items', []) if not events: return False # Prints the start and name of the next 10 events for event in events: start = event['start'].get('dateTime', event['start'].get('date')) end = event['end'].get('dateTime', event['end'].get('date')) if datetime.fromisoformat(start).timestamp() <= datetime.now().timestamp() <= datetime.fromisoformat( end).timestamp(): if event['organizer']['displayName'] == 'autoficher': return 'Forzado desde autoficher - ' + event.get("summary", "Sin Título") if 'description' in event and ("Cataluña" in event['description'] or 'Día festivo' in event[ 'description'] or 'Celebración\n' in event['description']) and not 'Cambio de horario' in event[ 'summary']: return event['summary'] except HttpError as error: print(f'An error occurred: {error}') return 'Error de llama a Google Calendar' return False