Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions weconnect/auth/auth_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,116 @@
import json
import re
from html.parser import HTMLParser


def addBearerAuthHeader(token, headers=None):
headers = headers or {}
headers['Authorization'] = f'Bearer {token}'
return headers


class HTMLFormParser(HTMLParser):
def __init__(self, form_id):
super().__init__()
self._form_id = form_id
self._inside_form = False
self.target = None
self.data = {}

def _get_attr(self, attrs, name):
for attr in attrs:
if attr[0] == name:
return attr[1]
return None

def handle_starttag(self, tag, attrs):
if self._inside_form and tag == 'input':
self.handle_input(attrs)
return

if tag == 'form' and self._get_attr(attrs, 'id') == self._form_id:
self._inside_form = True
self.target = self._get_attr(attrs, 'action')

def handle_endtag(self, tag):
if tag == 'form' and self._inside_form:
self._inside_form = False

def handle_input(self, attrs):
if not self._inside_form:
return

name = self._get_attr(attrs, 'name')
value = self._get_attr(attrs, 'value')

if name:
self.data[name] = value


class ScriptFormParser(HTMLParser):
fields = []
targetField = ''

def __init__(self):
super().__init__()
self._inside_script = False
self.data = {}
self.target = None

def handle_starttag(self, tag, attrs):
if not self._inside_script and tag == 'script':
self._inside_script = True

def handle_endtag(self, tag):
if self._inside_script and tag == 'script':
self._inside_script = False

def handle_data(self, data):
if not self._inside_script:
return

match = re.search(r'templateModel: (.*?),\n', data)
if not match:
return

result = json.loads(match.group(1))
self.target = result.get(self.targetField, None)
self.data = {k: v for k, v in result.items() if k in self.fields}

match2 = re.search(r'csrf_token: \'(.*?)\'', data)
if match2:
self.data['_csrf'] = match2.group(1)


class CredentialsFormParser(ScriptFormParser):
fields = ['relayState', 'hmac', 'registerCredentialsPath', 'error', 'errorCode']
targetField = 'postAction'


class TermsAndConditionsFormParser(ScriptFormParser):
fields = ['relayState', 'hmac', 'countryOfResidence', 'legalDocuments']
targetField = 'loginUrl'

def handle_data(self, data):
if not self._inside_script:
return

super().handle_data(data)

if 'countryOfResidence' in self.data:
self.data['countryOfResidence'] = self.data['countryOfResidence'].upper()

if 'legalDocuments' not in self.data:
return

for key in self.data['legalDocuments'][0]:
# Skip unnecessary keys
if key in ('skipLink', 'declineLink', 'majorVersion', 'minorVersion', 'changeSummary'):
continue

# Move values under a new key while converting boolean values to 'yes' or 'no'
v = self.data['legalDocuments'][0][key]
self.data[f'legalDocuments[0].{key}'] = ('yes' if v else 'no') if isinstance(v, bool) else v

# Remove the original object
del self.data['legalDocuments']
177 changes: 2 additions & 175 deletions weconnect/auth/my_cupra_session.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
from typing import Dict, Optional, Match
from typing import Dict

import re
import json
import logging
import requests

from urllib.parse import parse_qsl, urlsplit

from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

from oauthlib.common import to_unicode
from oauthlib.oauth2 import InsecureTransportError
from oauthlib.oauth2 import is_secure_transport

from requests.models import CaseInsensitiveDict
from weconnect.auth.openid_session import AccessType


from weconnect.auth.vw_web_session import VWWebSession
from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError, TemporaryAuthentificationError
from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError


LOG = logging.getLogger("weconnect")
Expand Down Expand Up @@ -55,172 +48,6 @@ def refresh(self):
'https://identity.vwgroup.io/oidc/v1/token',
)

def doWebAuth(self, authorizationUrl): # noqa: C901
websession: requests.Session = requests.Session()
retries = Retry(total=self.retries,
backoff_factor=0.1,
status_forcelist=[500],
raise_on_status=False)
websession.proxies.update(self.proxies)
websession.mount('https://', HTTPAdapter(max_retries=retries))
websession.headers = CaseInsensitiveDict({
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'accept-language': 'en-US,en;q=0.9',
'accept-encoding': 'gzip, deflate, br'
})
while True:
loginFormResponse: requests.Response = websession.get(authorizationUrl, allow_redirects=False)
if loginFormResponse.status_code == requests.codes['ok']:
break
elif loginFormResponse.status_code == requests.codes['found']:
if 'Location' in loginFormResponse.headers:
authorizationUrl = loginFormResponse.headers['Location']
else:
raise APICompatibilityError('Forwarding without Location in Header')
elif loginFormResponse.status_code == requests.codes['internal_server_error']:
raise RetrievalError('Temporary server error during login')
else:
raise APICompatibilityError('Retrieving credentials page was not successfull,'
f' status code: {loginFormResponse.status_code}')

# Find login form on page to obtain inputs
emailFormRegex = r'<form.+id=\"emailPasswordForm\".*action=\"(?P<formAction>[^\"]+)\"[^>]*>' \
r'(?P<formContent>.+?(?=</form>))</form>'
match: Optional[Match[str]] = re.search(emailFormRegex, loginFormResponse.text, flags=re.DOTALL)
if match is None:
raise APICompatibilityError('No login email form found')
# retrieve target url from form
target: str = match.groupdict()['formAction']

# Find all inputs and put those in formData dictionary
inputRegex = r'<input[\\n\\r\s][^/]*name=\"(?P<name>[^\"]+)\"([\\n\\r\s]value=\"(?P<value>[^\"]+)\")?[^/]*/>'
formData: Dict[str, str] = {}
for match in re.finditer(inputRegex, match.groupdict()['formContent']):
if match.groupdict()['name']:
formData[match.groupdict()['name']] = match.groupdict()['value']
if not all(x in ['_csrf', 'relayState', 'hmac', 'email'] for x in formData):
raise APICompatibilityError('Could not find all required input fields in login page')

# Set email to the provided username
formData['email'] = self.sessionuser.username

# build url from form action
login2Url: str = 'https://identity.vwgroup.io' + target

loginHeadersForm: CaseInsensitiveDict = websession.headers.copy()
loginHeadersForm['Content-Type'] = 'application/x-www-form-urlencoded'

# Post form content and retrieve credentials page
login2Response: requests.Response = websession.post(login2Url, headers=loginHeadersForm, data=formData, allow_redirects=True)

if login2Response.status_code != requests.codes['ok']: # pylint: disable=E1101
if login2Response.status_code == requests.codes['internal_server_error']:
raise RetrievalError('Temporary server error during login')
raise APICompatibilityError('Retrieving credentials page was not successfull,'
f' status code: {login2Response.status_code}')

credentialsTemplateRegex = r'<script>\s+window\._IDK\s+=\s+\{\s' \
r'(?P<templateModel>.+?(?=\s+\};?\s+</script>))\s+\};?\s+</script>'
match = re.search(credentialsTemplateRegex, login2Response.text, flags=re.DOTALL)
if match is None:
raise APICompatibilityError('No credentials form found')
if match.groupdict()['templateModel']:
lineRegex = r'\s*(?P<name>[^\:]+)\:\s+[\'\{]?(?P<value>.+)[\'\}][,]?'
form2Data: Dict[str, str] = {}
for match in re.finditer(lineRegex, match.groupdict()['templateModel']):
if match.groupdict()['name'] == 'templateModel':
templateModelString = '{' + match.groupdict()['value'] + '}'
if templateModelString.endswith(','):
templateModelString = templateModelString[:-len(',')]
templateModel = json.loads(templateModelString)
if 'relayState' in templateModel:
form2Data['relayState'] = templateModel['relayState']
if 'hmac' in templateModel:
form2Data['hmac'] = templateModel['hmac']
if 'emailPasswordForm' in templateModel and 'email' in templateModel['emailPasswordForm']:
form2Data['email'] = templateModel['emailPasswordForm']['email']
if 'error' in templateModel and templateModel['error'] is not None:
if templateModel['error'] == 'validator.email.invalid':
raise AuthentificationError('Error during login, email invalid')
raise AuthentificationError(f'Error during login: {templateModel["error"]}')
if 'registerCredentialsPath' in templateModel and templateModel['registerCredentialsPath'] == 'register':
raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist')
if 'errorCode' in templateModel:
raise AuthentificationError('Error during login, is the username correct?')
if 'postAction' in templateModel:
target = templateModel['postAction']
else:
raise APICompatibilityError('Form does not contain postAction')
elif match.groupdict()['name'] == 'csrf_token':
form2Data['_csrf'] = match.groupdict()['value']
form2Data['password'] = self.sessionuser.password
if not all(x in ['_csrf', 'relayState', 'hmac', 'email', 'password'] for x in form2Data):
raise APICompatibilityError('Could not find all required input fields in login page')

login3Url = f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{target}'

# Post form content and retrieve userId in forwarding Location
login3Response: requests.Response = websession.post(login3Url, headers=loginHeadersForm, data=form2Data, allow_redirects=False)
if login3Response.status_code not in (requests.codes['found'], requests.codes['see_other']):
if login3Response.status_code == requests.codes['internal_server_error']:
raise RetrievalError('Temporary server error during login')
raise APICompatibilityError('Forwarding expected (status code 302),'
f' but got status code {login3Response.status_code}')
if 'Location' not in login3Response.headers:
raise APICompatibilityError('No url for forwarding in response headers')

# Parse parametes from forwarding url
params: Dict[str, str] = dict(parse_qsl(urlsplit(login3Response.headers['Location']).query))

# Check if error
if 'error' in params and params['error']:
errorMessages: Dict[str, str] = {
'login.errors.password_invalid': 'Password is invalid',
'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait some'
' minutes until a new login attempt is possible'
}
if params['error'] in errorMessages:
error = errorMessages[params['error']]
else:
error = params['error']
raise AuthentificationError(error)

# Check for user id
if 'userId' not in params or not params['userId']:
if 'updated' in params and params['updated'] == 'dataprivacy':
raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions')
raise APICompatibilityError('No user id provided')
self.userId = params['userId'] # pylint: disable=unused-private-member

# Now follow the forwarding until forwarding URL starts with 'weconnect://authenticated#'
afterLoginUrl: str = login3Response.headers['Location']

consentURL = None
while True:
if 'consent' in afterLoginUrl:
consentURL = afterLoginUrl
afterLoginResponse = self.get(afterLoginUrl, allow_redirects=False, access_type=AccessType.NONE)
if afterLoginResponse.status_code == requests.codes['internal_server_error']:
raise RetrievalError('Temporary server error during login')

if 'Location' not in afterLoginResponse.headers:
if consentURL is not None:
raise AuthentificationError('It seems like you need to accept the terms and conditions for the MyCupra service.'
f' Try to visit the URL "{consentURL}" or log into the MyCupra smartphone app')
raise APICompatibilityError('No Location for forwarding in response headers')

afterLoginUrl = afterLoginResponse.headers['Location']

if afterLoginUrl.startswith(self.redirect_uri):
break

if afterLoginUrl.startswith(self.redirect_uri + '#'):
queryurl = afterLoginUrl.replace(self.redirect_uri + '#', 'https://egal?')
else:
queryurl = afterLoginUrl
return queryurl

def fetchTokens(
self,
token_url,
Expand Down
Loading
Loading