From 52dd2d80fbc8bb3f3cdba2b144581e701b49fa11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20D=C4=85browski?= Date: Sun, 15 Dec 2024 21:21:46 +0100 Subject: [PATCH 1/3] Refactor web authentication - Replace regex with HTML parsers - Move doWebAuth to VWWebSession - Split doWebAuth into multiple methods - Add Terms and Conditions form handling --- weconnect/auth/auth_util.py | 112 ++++++++++++++++ weconnect/auth/my_cupra_session.py | 177 +------------------------ weconnect/auth/vw_web_session.py | 189 +++++++++++++++++++++++++++ weconnect/auth/we_charge_session.py | 182 +------------------------- weconnect/auth/we_connect_session.py | 184 +------------------------- 5 files changed, 306 insertions(+), 538 deletions(-) diff --git a/weconnect/auth/auth_util.py b/weconnect/auth/auth_util.py index e87f25d..03bc0df 100644 --- a/weconnect/auth/auth_util.py +++ b/weconnect/auth/auth_util.py @@ -1,4 +1,116 @@ +import json +import re +from html.parser import HTMLParser + + def addBearerAuthHeader(token, headers=None): headers = headers or {} headers['Authorization'] = f'Bearer {token}' return headers + + +class HTMLFormParser(HTMLParser): + def __init__(self, form_id): + super().__init__() + self._form_id = form_id + self._inside_form = False + self.target = None + self.data = {} + + def _get_attr(self, attrs, name): + for attr in attrs: + if attr[0] == name: + return attr[1] + return None + + def handle_starttag(self, tag, attrs): + if self._inside_form and tag == 'input': + self.handle_input(attrs) + return + + if tag == 'form' and self._get_attr(attrs, 'id') == self._form_id: + self._inside_form = True + self.target = self._get_attr(attrs, 'action') + + def handle_endtag(self, tag): + if tag == 'form' and self._inside_form: + self._inside_form = False + + def handle_input(self, attrs): + if not self._inside_form: + return + + name = self._get_attr(attrs, 'name') + value = self._get_attr(attrs, 'value') + + if name: + self.data[name] = value + + +class ScriptFormParser(HTMLParser): + fields = [] + targetField = '' + + def __init__(self): + super().__init__() + self._inside_script = False + self.data = {} + self.target = None + + def handle_starttag(self, tag, attrs): + if not self._inside_script and tag == 'script': + self._inside_script = True + + def handle_endtag(self, tag): + if self._inside_script and tag == 'script': + self._inside_script = False + + def handle_data(self, data): + if not self._inside_script: + return + + match = re.search(r'templateModel: (.*?),\n', data) + if not match: + return + + result = json.loads(match.group(1)) + self.target = result.get(self.targetField, None) + self.data = {k: v for k, v in result.items() if k in self.fields} + + match2 = re.search(r'csrf_token: \'(.*?)\'', data) + if match2: + self.data['_csrf'] = match2.group(1) + + +class CredentialsFormParser(ScriptFormParser): + fields = ['relayState', 'hmac', 'registerCredentialsPath', 'error', 'errorCode'] + targetField = 'postAction' + + +class TermsAndConditionsFormParser(ScriptFormParser): + fields = ['relayState', 'hmac', 'countryOfResidence', 'legalDocuments'] + targetField = 'loginUrl' + + def handle_data(self, data): + if not self._inside_script: + return + + super().handle_data(data) + + if 'countryOfResidence' in self.data: + self.data['countryOfResidence'] = self.data['countryOfResidence'].upper() + + if 'legalDocuments' not in self.data: + return + + for key in self.data['legalDocuments'][0]: + # Skip unnecessary keys + if key in ('skipLink', 'declineLink', 'majorVersion', 'minorVersion', 'changeSummary'): + continue + + # Move values under a new key while converting boolean values to 'yes' or 'no' + v = self.data['legalDocuments'][0][key] + self.data[f'legalDocuments[0].{key}'] = ('yes' if v else 'no') if isinstance(v, bool) else v + + # Remove the original object + del self.data['legalDocuments'] diff --git a/weconnect/auth/my_cupra_session.py b/weconnect/auth/my_cupra_session.py index c52f7dd..8d69dd6 100644 --- a/weconnect/auth/my_cupra_session.py +++ b/weconnect/auth/my_cupra_session.py @@ -1,15 +1,9 @@ -from typing import Dict, Optional, Match +from typing import Dict -import re import json import logging import requests -from urllib.parse import parse_qsl, urlsplit - -from urllib3.util.retry import Retry -from requests.adapters import HTTPAdapter - from oauthlib.common import to_unicode from oauthlib.oauth2 import InsecureTransportError from oauthlib.oauth2 import is_secure_transport @@ -17,9 +11,8 @@ from requests.models import CaseInsensitiveDict from weconnect.auth.openid_session import AccessType - from weconnect.auth.vw_web_session import VWWebSession -from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError, TemporaryAuthentificationError +from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError LOG = logging.getLogger("weconnect") @@ -55,172 +48,6 @@ def refresh(self): 'https://identity.vwgroup.io/oidc/v1/token', ) - def doWebAuth(self, authorizationUrl): # noqa: C901 - websession: requests.Session = requests.Session() - retries = Retry(total=self.retries, - backoff_factor=0.1, - status_forcelist=[500], - raise_on_status=False) - websession.proxies.update(self.proxies) - websession.mount('https://', HTTPAdapter(max_retries=retries)) - websession.headers = CaseInsensitiveDict({ - 'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', - 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', - 'accept-language': 'en-US,en;q=0.9', - 'accept-encoding': 'gzip, deflate, br' - }) - while True: - loginFormResponse: requests.Response = websession.get(authorizationUrl, allow_redirects=False) - if loginFormResponse.status_code == requests.codes['ok']: - break - elif loginFormResponse.status_code == requests.codes['found']: - if 'Location' in loginFormResponse.headers: - authorizationUrl = loginFormResponse.headers['Location'] - else: - raise APICompatibilityError('Forwarding without Location in Header') - elif loginFormResponse.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - else: - raise APICompatibilityError('Retrieving credentials page was not successfull,' - f' status code: {loginFormResponse.status_code}') - - # Find login form on page to obtain inputs - emailFormRegex = r'[^\"]+)\"[^>]*>' \ - r'(?P.+?(?=))' - match: Optional[Match[str]] = re.search(emailFormRegex, loginFormResponse.text, flags=re.DOTALL) - if match is None: - raise APICompatibilityError('No login email form found') - # retrieve target url from form - target: str = match.groupdict()['formAction'] - - # Find all inputs and put those in formData dictionary - inputRegex = r'[^\"]+)\"([\\n\\r\s]value=\"(?P[^\"]+)\")?[^/]*/>' - formData: Dict[str, str] = {} - for match in re.finditer(inputRegex, match.groupdict()['formContent']): - if match.groupdict()['name']: - formData[match.groupdict()['name']] = match.groupdict()['value'] - if not all(x in ['_csrf', 'relayState', 'hmac', 'email'] for x in formData): - raise APICompatibilityError('Could not find all required input fields in login page') - - # Set email to the provided username - formData['email'] = self.sessionuser.username - - # build url from form action - login2Url: str = 'https://identity.vwgroup.io' + target - - loginHeadersForm: CaseInsensitiveDict = websession.headers.copy() - loginHeadersForm['Content-Type'] = 'application/x-www-form-urlencoded' - - # Post form content and retrieve credentials page - login2Response: requests.Response = websession.post(login2Url, headers=loginHeadersForm, data=formData, allow_redirects=True) - - if login2Response.status_code != requests.codes['ok']: # pylint: disable=E1101 - if login2Response.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - raise APICompatibilityError('Retrieving credentials page was not successfull,' - f' status code: {login2Response.status_code}') - - credentialsTemplateRegex = r'))\s+\};?\s+' - match = re.search(credentialsTemplateRegex, login2Response.text, flags=re.DOTALL) - if match is None: - raise APICompatibilityError('No credentials form found') - if match.groupdict()['templateModel']: - lineRegex = r'\s*(?P[^\:]+)\:\s+[\'\{]?(?P.+)[\'\}][,]?' - form2Data: Dict[str, str] = {} - for match in re.finditer(lineRegex, match.groupdict()['templateModel']): - if match.groupdict()['name'] == 'templateModel': - templateModelString = '{' + match.groupdict()['value'] + '}' - if templateModelString.endswith(','): - templateModelString = templateModelString[:-len(',')] - templateModel = json.loads(templateModelString) - if 'relayState' in templateModel: - form2Data['relayState'] = templateModel['relayState'] - if 'hmac' in templateModel: - form2Data['hmac'] = templateModel['hmac'] - if 'emailPasswordForm' in templateModel and 'email' in templateModel['emailPasswordForm']: - form2Data['email'] = templateModel['emailPasswordForm']['email'] - if 'error' in templateModel and templateModel['error'] is not None: - if templateModel['error'] == 'validator.email.invalid': - raise AuthentificationError('Error during login, email invalid') - raise AuthentificationError(f'Error during login: {templateModel["error"]}') - if 'registerCredentialsPath' in templateModel and templateModel['registerCredentialsPath'] == 'register': - raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist') - if 'errorCode' in templateModel: - raise AuthentificationError('Error during login, is the username correct?') - if 'postAction' in templateModel: - target = templateModel['postAction'] - else: - raise APICompatibilityError('Form does not contain postAction') - elif match.groupdict()['name'] == 'csrf_token': - form2Data['_csrf'] = match.groupdict()['value'] - form2Data['password'] = self.sessionuser.password - if not all(x in ['_csrf', 'relayState', 'hmac', 'email', 'password'] for x in form2Data): - raise APICompatibilityError('Could not find all required input fields in login page') - - login3Url = f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{target}' - - # Post form content and retrieve userId in forwarding Location - login3Response: requests.Response = websession.post(login3Url, headers=loginHeadersForm, data=form2Data, allow_redirects=False) - if login3Response.status_code not in (requests.codes['found'], requests.codes['see_other']): - if login3Response.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - raise APICompatibilityError('Forwarding expected (status code 302),' - f' but got status code {login3Response.status_code}') - if 'Location' not in login3Response.headers: - raise APICompatibilityError('No url for forwarding in response headers') - - # Parse parametes from forwarding url - params: Dict[str, str] = dict(parse_qsl(urlsplit(login3Response.headers['Location']).query)) - - # Check if error - if 'error' in params and params['error']: - errorMessages: Dict[str, str] = { - 'login.errors.password_invalid': 'Password is invalid', - 'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait some' - ' minutes until a new login attempt is possible' - } - if params['error'] in errorMessages: - error = errorMessages[params['error']] - else: - error = params['error'] - raise AuthentificationError(error) - - # Check for user id - if 'userId' not in params or not params['userId']: - if 'updated' in params and params['updated'] == 'dataprivacy': - raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions') - raise APICompatibilityError('No user id provided') - self.userId = params['userId'] # pylint: disable=unused-private-member - - # Now follow the forwarding until forwarding URL starts with 'weconnect://authenticated#' - afterLoginUrl: str = login3Response.headers['Location'] - - consentURL = None - while True: - if 'consent' in afterLoginUrl: - consentURL = afterLoginUrl - afterLoginResponse = self.get(afterLoginUrl, allow_redirects=False, access_type=AccessType.NONE) - if afterLoginResponse.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - - if 'Location' not in afterLoginResponse.headers: - if consentURL is not None: - raise AuthentificationError('It seems like you need to accept the terms and conditions for the MyCupra service.' - f' Try to visit the URL "{consentURL}" or log into the MyCupra smartphone app') - raise APICompatibilityError('No Location for forwarding in response headers') - - afterLoginUrl = afterLoginResponse.headers['Location'] - - if afterLoginUrl.startswith(self.redirect_uri): - break - - if afterLoginUrl.startswith(self.redirect_uri + '#'): - queryurl = afterLoginUrl.replace(self.redirect_uri + '#', 'https://egal?') - else: - queryurl = afterLoginUrl - return queryurl - def fetchTokens( self, token_url, diff --git a/weconnect/auth/vw_web_session.py b/weconnect/auth/vw_web_session.py index 2cf1658..1c2cf98 100644 --- a/weconnect/auth/vw_web_session.py +++ b/weconnect/auth/vw_web_session.py @@ -1,7 +1,196 @@ +from typing import Any, Dict +from urllib3.util.retry import Retry +from urllib.parse import parse_qsl, urlparse, urlsplit + + +import requests +from requests.adapters import HTTPAdapter +from requests.models import CaseInsensitiveDict + +from weconnect.auth.auth_util import CredentialsFormParser, HTMLFormParser, TermsAndConditionsFormParser from weconnect.auth.openid_session import OpenIDSession +from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError class VWWebSession(OpenIDSession): def __init__(self, sessionuser, **kwargs): super(VWWebSession, self).__init__(**kwargs) self.sessionuser = sessionuser + + # Set up the web session + retries = Retry( + total=self.retries, + backoff_factor=0.1, + status_forcelist=[500], + raise_on_status=False + ) + + self.websession: requests.Session = requests.Session() + self.websession.proxies.update(self.proxies) + self.websession.mount('https://', HTTPAdapter(max_retries=retries)) + self.websession.headers = CaseInsensitiveDict({ + 'user-agent': 'Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 ' + 'Chrome/74.0.3729.185 Mobile Safari/537.36', + 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,' + 'application/signed-exchange;v=b3', + 'accept-language': 'en-US,en;q=0.9', + 'accept-encoding': 'gzip, deflate', + 'x-requested-with': 'de.volkswagen.carnet.eu.eremote', + 'upgrade-insecure-requests': '1', + }) + + def doWebAuth(self, url: str) -> str: + # Get the login form + emailForm = self._get_login_form(url) + + # Set email to the provided username + emailForm.data['email'] = self.sessionuser.username + + # Get password form + passwordForm = self._get_password_form( + f'https://identity.vwgroup.io{emailForm.target}', + emailForm.data + ) + + # Set credentials + passwordForm.data['email'] = self.sessionuser.username + passwordForm.data['password'] = self.sessionuser.password + + # Log in and get the redirect URL + url = self._handle_login( + f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{passwordForm.target}', + passwordForm.data + ) + + # Check URL for terms and conditions + while True: + if url.startswith(self.redirect_uri): + break + + if not url.startswith('https://identity.vwgroup.io'): + url = f'https://identity.vwgroup.io{url}' + + if 'terms-and-conditions' in url: + url = self._handle_consent_form(url) + + response = self.websession.get(url, allow_redirects=False) + if response.status_code == requests.codes['internal_server_error']: + raise RetrievalError('Temporary server error during login') + + if 'Location' not in response.headers: + raise APICompatibilityError('Forwarding without Location in headers') + + url = response.headers['Location'] + + return url.replace(self.redirect_uri + '#', 'https://egal?') + + def _get_login_form(self, url: str) -> HTMLFormParser: + while True: + response = self.websession.get(url, allow_redirects=False) + if response.status_code == requests.codes['ok']: + break + + if response.status_code in (requests.codes['found'], requests.codes['see_other']): + if 'Location' not in response.headers: + raise APICompatibilityError('Forwarding without Location in headers') + + url = response.headers['Location'] + continue + + raise APICompatibilityError(f'Retrieving credentials page was not successful, ' + f'status code: {response.status_code}') + + # Find login form on page to obtain inputs + emailForm = HTMLFormParser(form_id='emailPasswordForm') + emailForm.feed(response.text) + + if not emailForm.target or not all(x in emailForm.data for x in ['_csrf', 'relayState', 'hmac', 'email']): + raise APICompatibilityError('Could not find all required input fields in login page') + + return emailForm + + def _get_password_form(self, url: str, data: Dict[str, Any]) -> CredentialsFormParser: + response = self.websession.post(url, data=data, allow_redirects=True) + if response.status_code != requests.codes['ok']: + raise APICompatibilityError(f'Retrieving credentials page was not successful, ' + f'status code: {response.status_code}') + + # Find login form on page to obtain inputs + credentialsForm = CredentialsFormParser() + credentialsForm.feed(response.text) + + if not credentialsForm.target or not all(x in credentialsForm.data for x in ['relayState', 'hmac', '_csrf']): + raise APICompatibilityError('Could not find all required input fields in login page') + + if credentialsForm.data.get('error', None) is not None: + if credentialsForm.data['error'] == 'validator.email.invalid': + raise AuthentificationError('Error during login, email invalid') + raise AuthentificationError(f'Error during login: {credentialsForm.data["error"]}') + + if 'errorCode' in credentialsForm.data: + raise AuthentificationError('Error during login, is the username correct?') + + if credentialsForm.data.get('registerCredentialsPath', None) == 'register': + raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist') + + return credentialsForm + + def _handle_login(self, url: str, data: Dict[str, Any]) -> str: + response: requests.Response = self.websession.post(url, data=data, allow_redirects=False) + + if response.status_code == requests.codes['internal_server_error']: + raise RetrievalError('Temporary server error during login') + + if response.status_code not in (requests.codes['found'], requests.codes['see_other']): + raise APICompatibilityError(f'Forwarding expected (status code 302), ' + f'but got status code {response.status_code}') + + if 'Location' not in response.headers: + raise APICompatibilityError('Forwarding without Location in headers') + + # Parse parameters from forwarding url + params: Dict[str, str] = dict(parse_qsl(urlsplit(response.headers['Location']).query)) + + # Check for login error + if 'error' in params and params['error']: + errorMessages: Dict[str, str] = { + 'login.errors.password_invalid': 'Password is invalid', + 'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait ' + 'a few minutes until a new login attempt is possible' + } + + raise AuthentificationError(errorMessages.get(params['error'], params['error'])) + + # Check for user ID + if 'userId' not in params or not params['userId']: + if 'updated' in params and params['updated'] == 'dataprivacy': + raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions') + raise APICompatibilityError('No user ID provided') + + self.userId = params['userId'] # pylint: disable=unused-private-member + return response.headers['Location'] + + def _handle_consent_form(self, url: str) -> str: + response = self.websession.get(url, allow_redirects=False) + if response.status_code == requests.codes['internal_server_error']: + raise RetrievalError('Temporary server error during login') + + # Find form on page to obtain inputs + tcForm = TermsAndConditionsFormParser() + tcForm.feed(response.text) + + # Remove query from URL + url = urlparse(response.url)._replace(query='').geturl() + + response = self.websession.post(url, data=tcForm.data, allow_redirects=False) + if response.status_code == requests.codes['internal_server_error']: + raise RetrievalError('Temporary server error during login') + + if response.status_code not in (requests.codes['found'], requests.codes['see_other']): + raise APICompatibilityError('Forwarding expected (status code 302), ' + f'but got status code {response.status_code}') + + if 'Location' not in response.headers: + raise APICompatibilityError('Forwarding without Location in headers') + + return response.headers['Location'] diff --git a/weconnect/auth/we_charge_session.py b/weconnect/auth/we_charge_session.py index 9fd4477..a016010 100644 --- a/weconnect/auth/we_charge_session.py +++ b/weconnect/auth/we_charge_session.py @@ -1,25 +1,14 @@ -from typing import Dict, Optional, Match - -import re -import json import logging import requests -from urllib.parse import parse_qsl, urlsplit - -from urllib3.util.retry import Retry -from requests.adapters import HTTPAdapter - from oauthlib.common import add_params_to_uri from oauthlib.oauth2 import InsecureTransportError, is_secure_transport - from requests.models import CaseInsensitiveDict - from weconnect.auth.openid_session import AccessType from weconnect.auth.vw_web_session import VWWebSession -from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError, TemporaryAuthentificationError +from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError LOG = logging.getLogger("weconnect") @@ -61,175 +50,6 @@ def refresh(self): 'https://wecharge.apps.emea.vwapps.io/user-identity/v1/identity/login', ) - def doWebAuth(self, authorizationUrl): # noqa: C901 - websession: requests.Session = requests.Session() - retries = Retry(total=self.retries, - backoff_factor=0.1, - status_forcelist=[500], - raise_on_status=False) - websession.proxies.update(self.proxies) - websession.mount('https://', HTTPAdapter(max_retries=retries)) - websession.headers = CaseInsensitiveDict({ - 'user-agent': 'Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 ' - 'Chrome/74.0.3729.185 Mobile Safari/537.36', - 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,' - 'application/signed-exchange;v=b3', - 'accept-language': 'en-US,en;q=0.9', - 'accept-encoding': 'gzip, deflate', - 'x-requested-with': 'de.volkswagen.carnet.eu.eremote', - 'upgrade-insecure-requests': '1', - }) - - while True: - loginFormResponse: requests.Response = websession.get(authorizationUrl, allow_redirects=False) - if loginFormResponse.status_code == requests.codes['ok']: - break - elif loginFormResponse.status_code == requests.codes['found']: - if 'Location' in loginFormResponse.headers: - authorizationUrl = loginFormResponse.headers['Location'] - else: - raise APICompatibilityError('Forwarding without Location in Header') - elif loginFormResponse.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - else: - raise APICompatibilityError('Retrieving credentials page was not successfull,' - f' status code: {loginFormResponse.status_code}') - # Find login form on page to obtain inputs - emailFormRegex = r'[^\"]+)\"[^>]*>' \ - r'(?P.+?(?=))' - match: Optional[Match[str]] = re.search(emailFormRegex, loginFormResponse.text, flags=re.DOTALL) - if match is None: - raise AuthentificationError('No login email form found') - # retrieve target url from form - target: str = match.groupdict()['formAction'] - - # Find all inputs and put those in formData dictionary - inputRegex = r'[^\"]+)\"([\\n\\r\s]value=\"(?P[^\"]+)\")?[^/]*/>' - formData: Dict[str, str] = {} - for match in re.finditer(inputRegex, match.groupdict()['formContent']): - if match.groupdict()['name']: - formData[match.groupdict()['name']] = match.groupdict()['value'] - if not all(x in ['_csrf', 'registerFlow', 'relayState', 'hmac', 'identifier'] for x in formData): - raise AuthentificationError('Could not find all required input fields in login page') - - # Set email to the provided username - formData['identifier'] = self.sessionuser.username - - # build url from form action - login2Url: str = 'https://identity.vwgroup.io' + target - - loginHeadersForm: CaseInsensitiveDict = websession.headers.copy() - loginHeadersForm['Content-Type'] = 'application/x-www-form-urlencoded' - - # Post form content and retrieve credentials page - login2Response: requests.Response = websession.post(login2Url, headers=loginHeadersForm, data=formData, allow_redirects=True) - - if login2Response.status_code != requests.codes['ok']: # pylint: disable=E1101 - if login2Response.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - raise AuthentificationError('Retrieving credentials page was not successfull,' - f' status code: {login2Response.status_code}') - - credentialsTemplateRegex = r'))\s+\};?\s+' - match = re.search(credentialsTemplateRegex, login2Response.text, flags=re.DOTALL) - if match is None: - raise AuthentificationError('No credentials form found') - if match.groupdict()['templateModel']: - lineRegex = r'\s*(?P[^\:]+)\:\s+[\'\{]?(?P.+)[\'\}][,]?' - form2Data: Dict[str, str] = {} - for match in re.finditer(lineRegex, match.groupdict()['templateModel']): - if match.groupdict()['name'] == 'templateModel': - templateModelString = '{' + match.groupdict()['value'] + '}' - if templateModelString.endswith(','): - templateModelString = templateModelString[:-len(',')] - templateModel = json.loads(templateModelString) - if 'relayState' in templateModel: - form2Data['relayState'] = templateModel['relayState'] - if 'hmac' in templateModel: - form2Data['hmac'] = templateModel['hmac'] - if 'emailPasswordForm' in templateModel and 'email' in templateModel['emailPasswordForm']: - form2Data['email'] = templateModel['emailPasswordForm']['email'] - if 'error' in templateModel and templateModel['error'] is not None: - if templateModel['error'] == 'validator.email.invalid': - raise AuthentificationError('Error during login, email invalid') - raise AuthentificationError(f'Error during login: {templateModel["error"]}') - if 'registerCredentialsPath' in templateModel and templateModel['registerCredentialsPath'] == 'register': - raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist') - if 'errorCode' in templateModel: - raise AuthentificationError('Error during login, is the username correct?') - if 'postAction' in templateModel: - target = templateModel['postAction'] - else: - raise APICompatibilityError('Form does not contain postAction') - elif match.groupdict()['name'] == 'csrf_token': - form2Data['_csrf'] = match.groupdict()['value'] - form2Data['password'] = self.sessionuser.password - if not all(x in ['_csrf', 'relayState', 'hmac', 'email', 'password'] for x in form2Data): - raise AuthentificationError('Could not find all required input fields in credentials page') - - login3Url = f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{target}' - - # Post form content and retrieve userId in forwarding Location - login3Response: requests.Response = websession.post(login3Url, headers=loginHeadersForm, data=form2Data, allow_redirects=False) - if login3Response.status_code not in (requests.codes['found'], requests.codes['see_other']): - if login3Response.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - raise AuthentificationError('Forwarding expected (status code 302),' - f' but got status code {login3Response.status_code}') - if 'Location' not in login3Response.headers: - raise AuthentificationError('No url for forwarding in response headers') - - # Parse parametes from forwarding url - params: Dict[str, str] = dict(parse_qsl(urlsplit(login3Response.headers['Location']).query)) - - # Check if error - if 'error' in params and params['error']: - errorMessages: Dict[str, str] = { - 'login.errors.password_invalid': 'Password is invalid', - 'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait some' - ' minutes until a new login attempt is possible' - } - if params['error'] in errorMessages: - error = errorMessages[params['error']] - else: - error = params['error'] - raise AuthentificationError(error) - - # Check for user id - if 'userId' not in params or not params['userId']: - if 'updated' in params and params['updated'] == 'dataprivacy': - raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions') - raise AuthentificationError('No user id provided') - self.userId = params['userId'] # pylint: disable=unused-private-member - - # Now follow the forwarding until forwarding URL starts with 'weconnect://authenticated#' - afterLoginUrl: str = login3Response.headers['Location'] - - while True: - if 'consent' in afterLoginUrl: - consentURL = afterLoginUrl - afterLoginResponse = self.get(afterLoginUrl, allow_redirects=False, access_type=AccessType.NONE) - if afterLoginResponse.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - - if 'Location' not in afterLoginResponse.headers: - if consentURL is not None: - raise AuthentificationError('It seems like you need to accept the terms and conditions for the WeConnect ID service.' - f' Try to visit the URL "{consentURL}" or log into the WeConnect ID smartphone app') - raise AuthentificationError('No Location for forwarding in response headers') - - afterLoginUrl = afterLoginResponse.headers['Location'] - - if afterLoginUrl.startswith(self.redirect_uri): - break - - if afterLoginUrl.startswith(self.redirect_uri + '#'): - queryurl = afterLoginUrl.replace(self.redirect_uri + '#', 'https://egal?') - else: - queryurl = afterLoginUrl - return queryurl - def fetchTokens( self, token_url, diff --git a/weconnect/auth/we_connect_session.py b/weconnect/auth/we_connect_session.py index b18e932..2b4f675 100644 --- a/weconnect/auth/we_connect_session.py +++ b/weconnect/auth/we_connect_session.py @@ -1,14 +1,8 @@ -from typing import Dict, Optional, Match - -import re import json import logging import requests -from urllib.parse import parse_qsl, urlparse, urlsplit - -from urllib3.util.retry import Retry -from requests.adapters import HTTPAdapter +from urllib.parse import parse_qsl, urlparse from oauthlib.common import add_params_to_uri, generate_nonce, to_unicode from oauthlib.oauth2 import InsecureTransportError @@ -17,9 +11,8 @@ from requests.models import CaseInsensitiveDict from weconnect.auth.openid_session import AccessType - from weconnect.auth.vw_web_session import VWWebSession -from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError, TemporaryAuthentificationError +from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError LOG = logging.getLogger("weconnect") @@ -101,179 +94,6 @@ def authorizationUrl(self, url, state=None, **kwargs): return redirect - def doWebAuth(self, authorizationUrl): # noqa: C901 - websession: requests.Session = requests.Session() - retries = Retry(total=self.retries, - backoff_factor=0.1, - status_forcelist=[500], - raise_on_status=False) - websession.proxies.update(self.proxies) - websession.mount('https://', HTTPAdapter(max_retries=retries)) - websession.headers = CaseInsensitiveDict({ - 'user-agent': 'Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 ' - 'Chrome/74.0.3729.185 Mobile Safari/537.36', - 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,' - 'application/signed-exchange;v=b3', - 'accept-language': 'en-US,en;q=0.9', - 'accept-encoding': 'gzip, deflate', - 'x-requested-with': 'de.volkswagen.carnet.eu.eremote', - 'upgrade-insecure-requests': '1', - }) - while True: - loginFormResponse: requests.Response = websession.get(authorizationUrl, allow_redirects=False) - if loginFormResponse.status_code == requests.codes['ok']: - break - elif loginFormResponse.status_code in (requests.codes['found'], requests.codes['see_other']): - if 'Location' in loginFormResponse.headers: - authorizationUrl = loginFormResponse.headers['Location'] - else: - raise APICompatibilityError('Forwarding without Location in Header') - elif loginFormResponse.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - else: - raise APICompatibilityError('Retrieving credentials page was not successfull,' - f' status code: {loginFormResponse.status_code}') - - # Find login form on page to obtain inputs - emailFormRegex = r'[^\"]+)\"[^>]*>' \ - r'(?P.+?(?=))' - match: Optional[Match[str]] = re.search(emailFormRegex, loginFormResponse.text, flags=re.DOTALL) - if match is None: - raise APICompatibilityError('No login email form found') - # retrieve target url from form - target: str = match.groupdict()['formAction'] - - # Find all inputs and put those in formData dictionary - inputRegex = r'[^\"]+)\"([\\n\\r\s]value=\"(?P[^\"]+)\")?[^/]*/>' - formData: Dict[str, str] = {} - for match in re.finditer(inputRegex, match.groupdict()['formContent']): - if match.groupdict()['name']: - formData[match.groupdict()['name']] = match.groupdict()['value'] - if not all(x in ['_csrf', 'relayState', 'hmac', 'email'] for x in formData): - raise APICompatibilityError('Could not find all required input fields in login page') - - # Set email to the provided username - formData['email'] = self.sessionuser.username - - # build url from form action - login2Url: str = 'https://identity.vwgroup.io' + target - - loginHeadersForm: CaseInsensitiveDict = websession.headers.copy() - loginHeadersForm['Content-Type'] = 'application/x-www-form-urlencoded' - - # Post form content and retrieve credentials page - login2Response: requests.Response = websession.post(login2Url, headers=loginHeadersForm, data=formData, allow_redirects=True) - - if login2Response.status_code != requests.codes['ok']: # pylint: disable=E1101 - if login2Response.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - raise APICompatibilityError('Retrieving credentials page was not successfull,' - f' status code: {login2Response.status_code}') - - credentialsTemplateRegex = r'))\s+\};?\s+' - match = re.search(credentialsTemplateRegex, login2Response.text, flags=re.DOTALL) - if match is None: - raise APICompatibilityError('No credentials form found') - if match.groupdict()['templateModel']: - lineRegex = r'\s*(?P[^\:]+)\:\s+[\'\{]?(?P.+)[\'\}][,]?' - form2Data: Dict[str, str] = {} - for match in re.finditer(lineRegex, match.groupdict()['templateModel']): - if match.groupdict()['name'] == 'templateModel': - templateModelString = '{' + match.groupdict()['value'] + '}' - if templateModelString.endswith(','): - templateModelString = templateModelString[:-len(',')] - templateModel = json.loads(templateModelString) - if 'relayState' in templateModel: - form2Data['relayState'] = templateModel['relayState'] - if 'hmac' in templateModel: - form2Data['hmac'] = templateModel['hmac'] - if 'emailPasswordForm' in templateModel and 'email' in templateModel['emailPasswordForm']: - form2Data['email'] = templateModel['emailPasswordForm']['email'] - if 'error' in templateModel and templateModel['error'] is not None: - if templateModel['error'] == 'validator.email.invalid': - raise AuthentificationError('Error during login, email invalid') - raise AuthentificationError(f'Error during login: {templateModel["error"]}') - if 'registerCredentialsPath' in templateModel and templateModel['registerCredentialsPath'] == 'register': - raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist') - if 'errorCode' in templateModel: - raise AuthentificationError('Error during login, is the username correct?') - if 'postAction' in templateModel: - target = templateModel['postAction'] - else: - raise APICompatibilityError('Form does not contain postAction') - elif match.groupdict()['name'] == 'csrf_token': - form2Data['_csrf'] = match.groupdict()['value'] - form2Data['password'] = self.sessionuser.password - if not all(x in ['_csrf', 'relayState', 'hmac', 'email', 'password'] for x in form2Data): - raise APICompatibilityError('Could not find all required input fields in login page') - - login3Url = f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{target}' - - # Post form content and retrieve userId in forwarding Location - login3Response: requests.Response = websession.post(login3Url, headers=loginHeadersForm, data=form2Data, allow_redirects=False) - if login3Response.status_code not in (requests.codes['found'], requests.codes['see_other']): - if login3Response.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - raise APICompatibilityError('Forwarding expected (status code 302),' - f' but got status code {login3Response.status_code}') - if 'Location' not in login3Response.headers: - raise APICompatibilityError('No url for forwarding in response headers') - - # Parse parametes from forwarding url - params: Dict[str, str] = dict(parse_qsl(urlsplit(login3Response.headers['Location']).query)) - - # Check if error - if 'error' in params and params['error']: - errorMessages: Dict[str, str] = { - 'login.errors.password_invalid': 'Password is invalid', - 'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait some' - ' minutes until a new login attempt is possible' - } - if params['error'] in errorMessages: - error = errorMessages[params['error']] - else: - error = params['error'] - raise AuthentificationError(error) - - # Check for user id - if 'userId' not in params or not params['userId']: - if 'updated' in params and params['updated'] == 'dataprivacy': - raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions') - raise APICompatibilityError('No user id provided. A possible reason is that you have to reconfirm the terms and conditions.') - self.userId = params['userId'] - - # Now follow the forwarding until forwarding URL starts with 'weconnect://authenticated#' - afterLoginUrl: str = login3Response.headers['Location'] - - consentURL = None - while True: - # if 'consent' in afterLoginUrl: - # consentURL = afterLoginUrl - if 'terms-and-conditions' in afterLoginUrl: - raise AuthentificationError('It seems like you need to accept the terms and conditions for the Volkswagen service.' - f' Try to visit the URL "https://identity.vwgroup.io/{afterLoginUrl}" or log into the Volkswagen smartphone app') - afterLoginResponse = self.get(afterLoginUrl, allow_redirects=False, access_type=AccessType.NONE) - if afterLoginResponse.status_code == requests.codes['internal_server_error']: - raise RetrievalError('Temporary server error during login') - - if 'Location' not in afterLoginResponse.headers: - if consentURL is not None: - raise AuthentificationError('It seems like you need to accept the terms and conditions for the Volkswagen service.' - f' Try to visit the URL "{consentURL}" or log into the Volkswagen smartphone app') - raise APICompatibilityError('No Location for forwarding in response headers') - - afterLoginUrl = afterLoginResponse.headers['Location'] - - if afterLoginUrl.startswith(self.redirect_uri): - break - - if afterLoginUrl.startswith(self.redirect_uri + '#'): - queryurl = afterLoginUrl.replace(self.redirect_uri + '#', 'https://egal?') - else: - queryurl = afterLoginUrl - return queryurl - def fetchTokens( self, token_url, From 8553c9cabc43c07b5db20dbee48e7ef595e0ae7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20D=C4=85browski?= Date: Sun, 15 Dec 2024 22:44:41 +0100 Subject: [PATCH 2/3] Fix security issue --- weconnect/auth/vw_web_session.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/weconnect/auth/vw_web_session.py b/weconnect/auth/vw_web_session.py index 1c2cf98..4cbff9c 100644 --- a/weconnect/auth/vw_web_session.py +++ b/weconnect/auth/vw_web_session.py @@ -48,7 +48,7 @@ def doWebAuth(self, url: str) -> str: # Get password form passwordForm = self._get_password_form( - f'https://identity.vwgroup.io{emailForm.target}', + f'https://identity.vwgroup.io/{emailForm.target}', emailForm.data ) @@ -67,8 +67,8 @@ def doWebAuth(self, url: str) -> str: if url.startswith(self.redirect_uri): break - if not url.startswith('https://identity.vwgroup.io'): - url = f'https://identity.vwgroup.io{url}' + if not url.startswith('https://identity.vwgroup.io/'): + url = f'https://identity.vwgroup.io/{url}' if 'terms-and-conditions' in url: url = self._handle_consent_form(url) From fb1fe610ce1cf9074f9156b102c865ffb558950c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20D=C4=85browski?= Date: Fri, 20 Dec 2024 09:18:33 +0100 Subject: [PATCH 3/3] Fix URL issue and add `acceptTermsOnLogin` switch --- weconnect/auth/vw_web_session.py | 22 +++++++++++++--------- weconnect/weconnect.py | 4 +++- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/weconnect/auth/vw_web_session.py b/weconnect/auth/vw_web_session.py index 4cbff9c..079859a 100644 --- a/weconnect/auth/vw_web_session.py +++ b/weconnect/auth/vw_web_session.py @@ -1,6 +1,6 @@ from typing import Any, Dict from urllib3.util.retry import Retry -from urllib.parse import parse_qsl, urlparse, urlsplit +from urllib.parse import parse_qsl, urlparse, urlsplit, urljoin import requests @@ -13,9 +13,10 @@ class VWWebSession(OpenIDSession): - def __init__(self, sessionuser, **kwargs): + def __init__(self, sessionuser, acceptTermsOnLogin=False, **kwargs): super(VWWebSession, self).__init__(**kwargs) self.sessionuser = sessionuser + self.acceptTermsOnLogin = acceptTermsOnLogin # Set up the web session retries = Retry( @@ -48,7 +49,7 @@ def doWebAuth(self, url: str) -> str: # Get password form passwordForm = self._get_password_form( - f'https://identity.vwgroup.io/{emailForm.target}', + urljoin('https://identity.vwgroup.io', emailForm.target), emailForm.data ) @@ -67,11 +68,14 @@ def doWebAuth(self, url: str) -> str: if url.startswith(self.redirect_uri): break - if not url.startswith('https://identity.vwgroup.io/'): - url = f'https://identity.vwgroup.io/{url}' + url = urljoin('https://identity.vwgroup.io', url) if 'terms-and-conditions' in url: - url = self._handle_consent_form(url) + if self.acceptTermsOnLogin: + url = self._handle_consent_form(url) + else: + raise AuthentificationError(f'It seems like you need to accept the terms and conditions. ' + f'Try to visit the URL "{url}" or log into smartphone app.') response = self.websession.get(url, allow_redirects=False) if response.status_code == requests.codes['internal_server_error']: @@ -97,7 +101,7 @@ def _get_login_form(self, url: str) -> HTMLFormParser: url = response.headers['Location'] continue - raise APICompatibilityError(f'Retrieving credentials page was not successful, ' + raise APICompatibilityError(f'Retrieving login page was not successful, ' f'status code: {response.status_code}') # Find login form on page to obtain inputs @@ -105,7 +109,7 @@ def _get_login_form(self, url: str) -> HTMLFormParser: emailForm.feed(response.text) if not emailForm.target or not all(x in emailForm.data for x in ['_csrf', 'relayState', 'hmac', 'email']): - raise APICompatibilityError('Could not find all required input fields in login page') + raise APICompatibilityError('Could not find all required input fields on login page') return emailForm @@ -120,7 +124,7 @@ def _get_password_form(self, url: str, data: Dict[str, Any]) -> CredentialsFormP credentialsForm.feed(response.text) if not credentialsForm.target or not all(x in credentialsForm.data for x in ['relayState', 'hmac', '_csrf']): - raise APICompatibilityError('Could not find all required input fields in login page') + raise APICompatibilityError('Could not find all required input fields on credentials page') if credentialsForm.data.get('error', None) is not None: if credentialsForm.data['error'] == 'validator.email.invalid': diff --git a/weconnect/weconnect.py b/weconnect/weconnect.py index be78fab..4ab68b1 100644 --- a/weconnect/weconnect.py +++ b/weconnect/weconnect.py @@ -44,7 +44,8 @@ def __init__( # noqa: C901 # pylint: disable=too-many-arguments numRetries: int = 3, timeout: bool = None, selective: Optional[list[Domain]] = None, - forceReloginAfter: Optional[int] = None + forceReloginAfter: Optional[int] = None, + acceptTermsOnLogin: Optional[bool] = False, ) -> None: """Initialize WeConnect interface. If loginOnInit is true the user will be tried to login. If loginOnInit is true also an initial fetch of data is performed. @@ -109,6 +110,7 @@ def __init__( # noqa: C901 # pylint: disable=too-many-arguments self.__session.timeout = timeout self.__session.retries = numRetries self.__session.forceReloginAfter = forceReloginAfter + self.__session.acceptTermsOnLogin = acceptTermsOnLogin if loginOnInit: self.__session.login()