diff --git a/weconnect/auth/auth_util.py b/weconnect/auth/auth_util.py
index e87f25d..03bc0df 100644
--- a/weconnect/auth/auth_util.py
+++ b/weconnect/auth/auth_util.py
@@ -1,4 +1,116 @@
+import json
+import re
+from html.parser import HTMLParser
+
+
def addBearerAuthHeader(token, headers=None):
headers = headers or {}
headers['Authorization'] = f'Bearer {token}'
return headers
+
+
+class HTMLFormParser(HTMLParser):
+ def __init__(self, form_id):
+ super().__init__()
+ self._form_id = form_id
+ self._inside_form = False
+ self.target = None
+ self.data = {}
+
+ def _get_attr(self, attrs, name):
+ for attr in attrs:
+ if attr[0] == name:
+ return attr[1]
+ return None
+
+ def handle_starttag(self, tag, attrs):
+ if self._inside_form and tag == 'input':
+ self.handle_input(attrs)
+ return
+
+ if tag == 'form' and self._get_attr(attrs, 'id') == self._form_id:
+ self._inside_form = True
+ self.target = self._get_attr(attrs, 'action')
+
+ def handle_endtag(self, tag):
+ if tag == 'form' and self._inside_form:
+ self._inside_form = False
+
+ def handle_input(self, attrs):
+ if not self._inside_form:
+ return
+
+ name = self._get_attr(attrs, 'name')
+ value = self._get_attr(attrs, 'value')
+
+ if name:
+ self.data[name] = value
+
+
+class ScriptFormParser(HTMLParser):
+ fields = []
+ targetField = ''
+
+ def __init__(self):
+ super().__init__()
+ self._inside_script = False
+ self.data = {}
+ self.target = None
+
+ def handle_starttag(self, tag, attrs):
+ if not self._inside_script and tag == 'script':
+ self._inside_script = True
+
+ def handle_endtag(self, tag):
+ if self._inside_script and tag == 'script':
+ self._inside_script = False
+
+ def handle_data(self, data):
+ if not self._inside_script:
+ return
+
+ match = re.search(r'templateModel: (.*?),\n', data)
+ if not match:
+ return
+
+ result = json.loads(match.group(1))
+ self.target = result.get(self.targetField, None)
+ self.data = {k: v for k, v in result.items() if k in self.fields}
+
+ match2 = re.search(r'csrf_token: \'(.*?)\'', data)
+ if match2:
+ self.data['_csrf'] = match2.group(1)
+
+
+class CredentialsFormParser(ScriptFormParser):
+ fields = ['relayState', 'hmac', 'registerCredentialsPath', 'error', 'errorCode']
+ targetField = 'postAction'
+
+
+class TermsAndConditionsFormParser(ScriptFormParser):
+ fields = ['relayState', 'hmac', 'countryOfResidence', 'legalDocuments']
+ targetField = 'loginUrl'
+
+ def handle_data(self, data):
+ if not self._inside_script:
+ return
+
+ super().handle_data(data)
+
+ if 'countryOfResidence' in self.data:
+ self.data['countryOfResidence'] = self.data['countryOfResidence'].upper()
+
+ if 'legalDocuments' not in self.data:
+ return
+
+ for key in self.data['legalDocuments'][0]:
+ # Skip unnecessary keys
+ if key in ('skipLink', 'declineLink', 'majorVersion', 'minorVersion', 'changeSummary'):
+ continue
+
+ # Move values under a new key while converting boolean values to 'yes' or 'no'
+ v = self.data['legalDocuments'][0][key]
+ self.data[f'legalDocuments[0].{key}'] = ('yes' if v else 'no') if isinstance(v, bool) else v
+
+ # Remove the original object
+ del self.data['legalDocuments']
diff --git a/weconnect/auth/my_cupra_session.py b/weconnect/auth/my_cupra_session.py
index c52f7dd..8d69dd6 100644
--- a/weconnect/auth/my_cupra_session.py
+++ b/weconnect/auth/my_cupra_session.py
@@ -1,15 +1,9 @@
-from typing import Dict, Optional, Match
+from typing import Dict
-import re
import json
import logging
import requests
-from urllib.parse import parse_qsl, urlsplit
-
-from urllib3.util.retry import Retry
-from requests.adapters import HTTPAdapter
-
from oauthlib.common import to_unicode
from oauthlib.oauth2 import InsecureTransportError
from oauthlib.oauth2 import is_secure_transport
@@ -17,9 +11,8 @@
from requests.models import CaseInsensitiveDict
from weconnect.auth.openid_session import AccessType
-
from weconnect.auth.vw_web_session import VWWebSession
-from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError, TemporaryAuthentificationError
+from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError
LOG = logging.getLogger("weconnect")
@@ -55,172 +48,6 @@ def refresh(self):
'https://identity.vwgroup.io/oidc/v1/token',
)
- def doWebAuth(self, authorizationUrl): # noqa: C901
- websession: requests.Session = requests.Session()
- retries = Retry(total=self.retries,
- backoff_factor=0.1,
- status_forcelist=[500],
- raise_on_status=False)
- websession.proxies.update(self.proxies)
- websession.mount('https://', HTTPAdapter(max_retries=retries))
- websession.headers = CaseInsensitiveDict({
- 'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148',
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'accept-language': 'en-US,en;q=0.9',
- 'accept-encoding': 'gzip, deflate, br'
- })
- while True:
- loginFormResponse: requests.Response = websession.get(authorizationUrl, allow_redirects=False)
- if loginFormResponse.status_code == requests.codes['ok']:
- break
- elif loginFormResponse.status_code == requests.codes['found']:
- if 'Location' in loginFormResponse.headers:
- authorizationUrl = loginFormResponse.headers['Location']
- else:
- raise APICompatibilityError('Forwarding without Location in Header')
- elif loginFormResponse.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- else:
- raise APICompatibilityError('Retrieving credentials page was not successfull,'
- f' status code: {loginFormResponse.status_code}')
-
- # Find login form on page to obtain inputs
- emailFormRegex = r'
[^\"]+)\"[^>]*>' \
- r'(?P.+?(?=))'
- match: Optional[Match[str]] = re.search(emailFormRegex, loginFormResponse.text, flags=re.DOTALL)
- if match is None:
- raise APICompatibilityError('No login email form found')
- # retrieve target url from form
- target: str = match.groupdict()['formAction']
-
- # Find all inputs and put those in formData dictionary
- inputRegex = r'[^\"]+)\"([\\n\\r\s]value=\"(?P[^\"]+)\")?[^/]*/>'
- formData: Dict[str, str] = {}
- for match in re.finditer(inputRegex, match.groupdict()['formContent']):
- if match.groupdict()['name']:
- formData[match.groupdict()['name']] = match.groupdict()['value']
- if not all(x in ['_csrf', 'relayState', 'hmac', 'email'] for x in formData):
- raise APICompatibilityError('Could not find all required input fields in login page')
-
- # Set email to the provided username
- formData['email'] = self.sessionuser.username
-
- # build url from form action
- login2Url: str = 'https://identity.vwgroup.io' + target
-
- loginHeadersForm: CaseInsensitiveDict = websession.headers.copy()
- loginHeadersForm['Content-Type'] = 'application/x-www-form-urlencoded'
-
- # Post form content and retrieve credentials page
- login2Response: requests.Response = websession.post(login2Url, headers=loginHeadersForm, data=formData, allow_redirects=True)
-
- if login2Response.status_code != requests.codes['ok']: # pylint: disable=E1101
- if login2Response.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- raise APICompatibilityError('Retrieving credentials page was not successfull,'
- f' status code: {login2Response.status_code}')
-
- credentialsTemplateRegex = r'))\s+\};?\s+'
- match = re.search(credentialsTemplateRegex, login2Response.text, flags=re.DOTALL)
- if match is None:
- raise APICompatibilityError('No credentials form found')
- if match.groupdict()['templateModel']:
- lineRegex = r'\s*(?P[^\:]+)\:\s+[\'\{]?(?P.+)[\'\}][,]?'
- form2Data: Dict[str, str] = {}
- for match in re.finditer(lineRegex, match.groupdict()['templateModel']):
- if match.groupdict()['name'] == 'templateModel':
- templateModelString = '{' + match.groupdict()['value'] + '}'
- if templateModelString.endswith(','):
- templateModelString = templateModelString[:-len(',')]
- templateModel = json.loads(templateModelString)
- if 'relayState' in templateModel:
- form2Data['relayState'] = templateModel['relayState']
- if 'hmac' in templateModel:
- form2Data['hmac'] = templateModel['hmac']
- if 'emailPasswordForm' in templateModel and 'email' in templateModel['emailPasswordForm']:
- form2Data['email'] = templateModel['emailPasswordForm']['email']
- if 'error' in templateModel and templateModel['error'] is not None:
- if templateModel['error'] == 'validator.email.invalid':
- raise AuthentificationError('Error during login, email invalid')
- raise AuthentificationError(f'Error during login: {templateModel["error"]}')
- if 'registerCredentialsPath' in templateModel and templateModel['registerCredentialsPath'] == 'register':
- raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist')
- if 'errorCode' in templateModel:
- raise AuthentificationError('Error during login, is the username correct?')
- if 'postAction' in templateModel:
- target = templateModel['postAction']
- else:
- raise APICompatibilityError('Form does not contain postAction')
- elif match.groupdict()['name'] == 'csrf_token':
- form2Data['_csrf'] = match.groupdict()['value']
- form2Data['password'] = self.sessionuser.password
- if not all(x in ['_csrf', 'relayState', 'hmac', 'email', 'password'] for x in form2Data):
- raise APICompatibilityError('Could not find all required input fields in login page')
-
- login3Url = f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{target}'
-
- # Post form content and retrieve userId in forwarding Location
- login3Response: requests.Response = websession.post(login3Url, headers=loginHeadersForm, data=form2Data, allow_redirects=False)
- if login3Response.status_code not in (requests.codes['found'], requests.codes['see_other']):
- if login3Response.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- raise APICompatibilityError('Forwarding expected (status code 302),'
- f' but got status code {login3Response.status_code}')
- if 'Location' not in login3Response.headers:
- raise APICompatibilityError('No url for forwarding in response headers')
-
- # Parse parametes from forwarding url
- params: Dict[str, str] = dict(parse_qsl(urlsplit(login3Response.headers['Location']).query))
-
- # Check if error
- if 'error' in params and params['error']:
- errorMessages: Dict[str, str] = {
- 'login.errors.password_invalid': 'Password is invalid',
- 'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait some'
- ' minutes until a new login attempt is possible'
- }
- if params['error'] in errorMessages:
- error = errorMessages[params['error']]
- else:
- error = params['error']
- raise AuthentificationError(error)
-
- # Check for user id
- if 'userId' not in params or not params['userId']:
- if 'updated' in params and params['updated'] == 'dataprivacy':
- raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions')
- raise APICompatibilityError('No user id provided')
- self.userId = params['userId'] # pylint: disable=unused-private-member
-
- # Now follow the forwarding until forwarding URL starts with 'weconnect://authenticated#'
- afterLoginUrl: str = login3Response.headers['Location']
-
- consentURL = None
- while True:
- if 'consent' in afterLoginUrl:
- consentURL = afterLoginUrl
- afterLoginResponse = self.get(afterLoginUrl, allow_redirects=False, access_type=AccessType.NONE)
- if afterLoginResponse.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
-
- if 'Location' not in afterLoginResponse.headers:
- if consentURL is not None:
- raise AuthentificationError('It seems like you need to accept the terms and conditions for the MyCupra service.'
- f' Try to visit the URL "{consentURL}" or log into the MyCupra smartphone app')
- raise APICompatibilityError('No Location for forwarding in response headers')
-
- afterLoginUrl = afterLoginResponse.headers['Location']
-
- if afterLoginUrl.startswith(self.redirect_uri):
- break
-
- if afterLoginUrl.startswith(self.redirect_uri + '#'):
- queryurl = afterLoginUrl.replace(self.redirect_uri + '#', 'https://egal?')
- else:
- queryurl = afterLoginUrl
- return queryurl
-
def fetchTokens(
self,
token_url,
diff --git a/weconnect/auth/vw_web_session.py b/weconnect/auth/vw_web_session.py
index 2cf1658..079859a 100644
--- a/weconnect/auth/vw_web_session.py
+++ b/weconnect/auth/vw_web_session.py
@@ -1,7 +1,200 @@
+from typing import Any, Dict
+from urllib3.util.retry import Retry
+from urllib.parse import parse_qsl, urlparse, urlsplit, urljoin
+
+
+import requests
+from requests.adapters import HTTPAdapter
+from requests.models import CaseInsensitiveDict
+
+from weconnect.auth.auth_util import CredentialsFormParser, HTMLFormParser, TermsAndConditionsFormParser
from weconnect.auth.openid_session import OpenIDSession
+from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError
class VWWebSession(OpenIDSession):
- def __init__(self, sessionuser, **kwargs):
+ def __init__(self, sessionuser, acceptTermsOnLogin=False, **kwargs):
super(VWWebSession, self).__init__(**kwargs)
self.sessionuser = sessionuser
+ self.acceptTermsOnLogin = acceptTermsOnLogin
+
+ # Set up the web session
+ retries = Retry(
+ total=self.retries,
+ backoff_factor=0.1,
+ status_forcelist=[500],
+ raise_on_status=False
+ )
+
+ self.websession: requests.Session = requests.Session()
+ self.websession.proxies.update(self.proxies)
+ self.websession.mount('https://', HTTPAdapter(max_retries=retries))
+ self.websession.headers = CaseInsensitiveDict({
+ 'user-agent': 'Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 '
+ 'Chrome/74.0.3729.185 Mobile Safari/537.36',
+ 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,'
+ 'application/signed-exchange;v=b3',
+ 'accept-language': 'en-US,en;q=0.9',
+ 'accept-encoding': 'gzip, deflate',
+ 'x-requested-with': 'de.volkswagen.carnet.eu.eremote',
+ 'upgrade-insecure-requests': '1',
+ })
+
+ def doWebAuth(self, url: str) -> str:
+ # Get the login form
+ emailForm = self._get_login_form(url)
+
+ # Set email to the provided username
+ emailForm.data['email'] = self.sessionuser.username
+
+ # Get password form
+ passwordForm = self._get_password_form(
+ urljoin('https://identity.vwgroup.io', emailForm.target),
+ emailForm.data
+ )
+
+ # Set credentials
+ passwordForm.data['email'] = self.sessionuser.username
+ passwordForm.data['password'] = self.sessionuser.password
+
+ # Log in and get the redirect URL
+ url = self._handle_login(
+ f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{passwordForm.target}',
+ passwordForm.data
+ )
+
+ # Check URL for terms and conditions
+ while True:
+ if url.startswith(self.redirect_uri):
+ break
+
+ url = urljoin('https://identity.vwgroup.io', url)
+
+ if 'terms-and-conditions' in url:
+ if self.acceptTermsOnLogin:
+ url = self._handle_consent_form(url)
+ else:
+ raise AuthentificationError(f'It seems like you need to accept the terms and conditions. '
+ f'Try to visit the URL "{url}" or log into smartphone app.')
+
+ response = self.websession.get(url, allow_redirects=False)
+ if response.status_code == requests.codes['internal_server_error']:
+ raise RetrievalError('Temporary server error during login')
+
+ if 'Location' not in response.headers:
+ raise APICompatibilityError('Forwarding without Location in headers')
+
+ url = response.headers['Location']
+
+ return url.replace(self.redirect_uri + '#', 'https://egal?')
+
+ def _get_login_form(self, url: str) -> HTMLFormParser:
+ while True:
+ response = self.websession.get(url, allow_redirects=False)
+ if response.status_code == requests.codes['ok']:
+ break
+
+ if response.status_code in (requests.codes['found'], requests.codes['see_other']):
+ if 'Location' not in response.headers:
+ raise APICompatibilityError('Forwarding without Location in headers')
+
+ url = response.headers['Location']
+ continue
+
+ raise APICompatibilityError(f'Retrieving login page was not successful, '
+ f'status code: {response.status_code}')
+
+ # Find login form on page to obtain inputs
+ emailForm = HTMLFormParser(form_id='emailPasswordForm')
+ emailForm.feed(response.text)
+
+ if not emailForm.target or not all(x in emailForm.data for x in ['_csrf', 'relayState', 'hmac', 'email']):
+ raise APICompatibilityError('Could not find all required input fields on login page')
+
+ return emailForm
+
+ def _get_password_form(self, url: str, data: Dict[str, Any]) -> CredentialsFormParser:
+ response = self.websession.post(url, data=data, allow_redirects=True)
+ if response.status_code != requests.codes['ok']:
+ raise APICompatibilityError(f'Retrieving credentials page was not successful, '
+ f'status code: {response.status_code}')
+
+ # Find login form on page to obtain inputs
+ credentialsForm = CredentialsFormParser()
+ credentialsForm.feed(response.text)
+
+ if not credentialsForm.target or not all(x in credentialsForm.data for x in ['relayState', 'hmac', '_csrf']):
+ raise APICompatibilityError('Could not find all required input fields on credentials page')
+
+ if credentialsForm.data.get('error', None) is not None:
+ if credentialsForm.data['error'] == 'validator.email.invalid':
+ raise AuthentificationError('Error during login, email invalid')
+ raise AuthentificationError(f'Error during login: {credentialsForm.data["error"]}')
+
+ if 'errorCode' in credentialsForm.data:
+ raise AuthentificationError('Error during login, is the username correct?')
+
+ if credentialsForm.data.get('registerCredentialsPath', None) == 'register':
+ raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist')
+
+ return credentialsForm
+
+ def _handle_login(self, url: str, data: Dict[str, Any]) -> str:
+ response: requests.Response = self.websession.post(url, data=data, allow_redirects=False)
+
+ if response.status_code == requests.codes['internal_server_error']:
+ raise RetrievalError('Temporary server error during login')
+
+ if response.status_code not in (requests.codes['found'], requests.codes['see_other']):
+ raise APICompatibilityError(f'Forwarding expected (status code 302), '
+ f'but got status code {response.status_code}')
+
+ if 'Location' not in response.headers:
+ raise APICompatibilityError('Forwarding without Location in headers')
+
+ # Parse parameters from forwarding url
+ params: Dict[str, str] = dict(parse_qsl(urlsplit(response.headers['Location']).query))
+
+ # Check for login error
+ if 'error' in params and params['error']:
+ errorMessages: Dict[str, str] = {
+ 'login.errors.password_invalid': 'Password is invalid',
+ 'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait '
+ 'a few minutes until a new login attempt is possible'
+ }
+
+ raise AuthentificationError(errorMessages.get(params['error'], params['error']))
+
+ # Check for user ID
+ if 'userId' not in params or not params['userId']:
+ if 'updated' in params and params['updated'] == 'dataprivacy':
+ raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions')
+ raise APICompatibilityError('No user ID provided')
+
+ self.userId = params['userId'] # pylint: disable=unused-private-member
+ return response.headers['Location']
+
+ def _handle_consent_form(self, url: str) -> str:
+ response = self.websession.get(url, allow_redirects=False)
+ if response.status_code == requests.codes['internal_server_error']:
+ raise RetrievalError('Temporary server error during login')
+
+ # Find form on page to obtain inputs
+ tcForm = TermsAndConditionsFormParser()
+ tcForm.feed(response.text)
+
+ # Remove query from URL
+ url = urlparse(response.url)._replace(query='').geturl()
+
+ response = self.websession.post(url, data=tcForm.data, allow_redirects=False)
+ if response.status_code == requests.codes['internal_server_error']:
+ raise RetrievalError('Temporary server error during login')
+
+ if response.status_code not in (requests.codes['found'], requests.codes['see_other']):
+ raise APICompatibilityError('Forwarding expected (status code 302), '
+ f'but got status code {response.status_code}')
+
+ if 'Location' not in response.headers:
+ raise APICompatibilityError('Forwarding without Location in headers')
+
+ return response.headers['Location']
diff --git a/weconnect/auth/we_charge_session.py b/weconnect/auth/we_charge_session.py
index 9fd4477..a016010 100644
--- a/weconnect/auth/we_charge_session.py
+++ b/weconnect/auth/we_charge_session.py
@@ -1,25 +1,14 @@
-from typing import Dict, Optional, Match
-
-import re
-import json
import logging
import requests
-from urllib.parse import parse_qsl, urlsplit
-
-from urllib3.util.retry import Retry
-from requests.adapters import HTTPAdapter
-
from oauthlib.common import add_params_to_uri
from oauthlib.oauth2 import InsecureTransportError, is_secure_transport
-
from requests.models import CaseInsensitiveDict
-
from weconnect.auth.openid_session import AccessType
from weconnect.auth.vw_web_session import VWWebSession
-from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError, TemporaryAuthentificationError
+from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError
LOG = logging.getLogger("weconnect")
@@ -61,175 +50,6 @@ def refresh(self):
'https://wecharge.apps.emea.vwapps.io/user-identity/v1/identity/login',
)
- def doWebAuth(self, authorizationUrl): # noqa: C901
- websession: requests.Session = requests.Session()
- retries = Retry(total=self.retries,
- backoff_factor=0.1,
- status_forcelist=[500],
- raise_on_status=False)
- websession.proxies.update(self.proxies)
- websession.mount('https://', HTTPAdapter(max_retries=retries))
- websession.headers = CaseInsensitiveDict({
- 'user-agent': 'Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 '
- 'Chrome/74.0.3729.185 Mobile Safari/537.36',
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,'
- 'application/signed-exchange;v=b3',
- 'accept-language': 'en-US,en;q=0.9',
- 'accept-encoding': 'gzip, deflate',
- 'x-requested-with': 'de.volkswagen.carnet.eu.eremote',
- 'upgrade-insecure-requests': '1',
- })
-
- while True:
- loginFormResponse: requests.Response = websession.get(authorizationUrl, allow_redirects=False)
- if loginFormResponse.status_code == requests.codes['ok']:
- break
- elif loginFormResponse.status_code == requests.codes['found']:
- if 'Location' in loginFormResponse.headers:
- authorizationUrl = loginFormResponse.headers['Location']
- else:
- raise APICompatibilityError('Forwarding without Location in Header')
- elif loginFormResponse.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- else:
- raise APICompatibilityError('Retrieving credentials page was not successfull,'
- f' status code: {loginFormResponse.status_code}')
- # Find login form on page to obtain inputs
- emailFormRegex = r'[^\"]+)\"[^>]*>' \
- r'(?P.+?(?=))'
- match: Optional[Match[str]] = re.search(emailFormRegex, loginFormResponse.text, flags=re.DOTALL)
- if match is None:
- raise AuthentificationError('No login email form found')
- # retrieve target url from form
- target: str = match.groupdict()['formAction']
-
- # Find all inputs and put those in formData dictionary
- inputRegex = r'[^\"]+)\"([\\n\\r\s]value=\"(?P[^\"]+)\")?[^/]*/>'
- formData: Dict[str, str] = {}
- for match in re.finditer(inputRegex, match.groupdict()['formContent']):
- if match.groupdict()['name']:
- formData[match.groupdict()['name']] = match.groupdict()['value']
- if not all(x in ['_csrf', 'registerFlow', 'relayState', 'hmac', 'identifier'] for x in formData):
- raise AuthentificationError('Could not find all required input fields in login page')
-
- # Set email to the provided username
- formData['identifier'] = self.sessionuser.username
-
- # build url from form action
- login2Url: str = 'https://identity.vwgroup.io' + target
-
- loginHeadersForm: CaseInsensitiveDict = websession.headers.copy()
- loginHeadersForm['Content-Type'] = 'application/x-www-form-urlencoded'
-
- # Post form content and retrieve credentials page
- login2Response: requests.Response = websession.post(login2Url, headers=loginHeadersForm, data=formData, allow_redirects=True)
-
- if login2Response.status_code != requests.codes['ok']: # pylint: disable=E1101
- if login2Response.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- raise AuthentificationError('Retrieving credentials page was not successfull,'
- f' status code: {login2Response.status_code}')
-
- credentialsTemplateRegex = r'))\s+\};?\s+'
- match = re.search(credentialsTemplateRegex, login2Response.text, flags=re.DOTALL)
- if match is None:
- raise AuthentificationError('No credentials form found')
- if match.groupdict()['templateModel']:
- lineRegex = r'\s*(?P[^\:]+)\:\s+[\'\{]?(?P.+)[\'\}][,]?'
- form2Data: Dict[str, str] = {}
- for match in re.finditer(lineRegex, match.groupdict()['templateModel']):
- if match.groupdict()['name'] == 'templateModel':
- templateModelString = '{' + match.groupdict()['value'] + '}'
- if templateModelString.endswith(','):
- templateModelString = templateModelString[:-len(',')]
- templateModel = json.loads(templateModelString)
- if 'relayState' in templateModel:
- form2Data['relayState'] = templateModel['relayState']
- if 'hmac' in templateModel:
- form2Data['hmac'] = templateModel['hmac']
- if 'emailPasswordForm' in templateModel and 'email' in templateModel['emailPasswordForm']:
- form2Data['email'] = templateModel['emailPasswordForm']['email']
- if 'error' in templateModel and templateModel['error'] is not None:
- if templateModel['error'] == 'validator.email.invalid':
- raise AuthentificationError('Error during login, email invalid')
- raise AuthentificationError(f'Error during login: {templateModel["error"]}')
- if 'registerCredentialsPath' in templateModel and templateModel['registerCredentialsPath'] == 'register':
- raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist')
- if 'errorCode' in templateModel:
- raise AuthentificationError('Error during login, is the username correct?')
- if 'postAction' in templateModel:
- target = templateModel['postAction']
- else:
- raise APICompatibilityError('Form does not contain postAction')
- elif match.groupdict()['name'] == 'csrf_token':
- form2Data['_csrf'] = match.groupdict()['value']
- form2Data['password'] = self.sessionuser.password
- if not all(x in ['_csrf', 'relayState', 'hmac', 'email', 'password'] for x in form2Data):
- raise AuthentificationError('Could not find all required input fields in credentials page')
-
- login3Url = f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{target}'
-
- # Post form content and retrieve userId in forwarding Location
- login3Response: requests.Response = websession.post(login3Url, headers=loginHeadersForm, data=form2Data, allow_redirects=False)
- if login3Response.status_code not in (requests.codes['found'], requests.codes['see_other']):
- if login3Response.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- raise AuthentificationError('Forwarding expected (status code 302),'
- f' but got status code {login3Response.status_code}')
- if 'Location' not in login3Response.headers:
- raise AuthentificationError('No url for forwarding in response headers')
-
- # Parse parametes from forwarding url
- params: Dict[str, str] = dict(parse_qsl(urlsplit(login3Response.headers['Location']).query))
-
- # Check if error
- if 'error' in params and params['error']:
- errorMessages: Dict[str, str] = {
- 'login.errors.password_invalid': 'Password is invalid',
- 'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait some'
- ' minutes until a new login attempt is possible'
- }
- if params['error'] in errorMessages:
- error = errorMessages[params['error']]
- else:
- error = params['error']
- raise AuthentificationError(error)
-
- # Check for user id
- if 'userId' not in params or not params['userId']:
- if 'updated' in params and params['updated'] == 'dataprivacy':
- raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions')
- raise AuthentificationError('No user id provided')
- self.userId = params['userId'] # pylint: disable=unused-private-member
-
- # Now follow the forwarding until forwarding URL starts with 'weconnect://authenticated#'
- afterLoginUrl: str = login3Response.headers['Location']
-
- while True:
- if 'consent' in afterLoginUrl:
- consentURL = afterLoginUrl
- afterLoginResponse = self.get(afterLoginUrl, allow_redirects=False, access_type=AccessType.NONE)
- if afterLoginResponse.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
-
- if 'Location' not in afterLoginResponse.headers:
- if consentURL is not None:
- raise AuthentificationError('It seems like you need to accept the terms and conditions for the WeConnect ID service.'
- f' Try to visit the URL "{consentURL}" or log into the WeConnect ID smartphone app')
- raise AuthentificationError('No Location for forwarding in response headers')
-
- afterLoginUrl = afterLoginResponse.headers['Location']
-
- if afterLoginUrl.startswith(self.redirect_uri):
- break
-
- if afterLoginUrl.startswith(self.redirect_uri + '#'):
- queryurl = afterLoginUrl.replace(self.redirect_uri + '#', 'https://egal?')
- else:
- queryurl = afterLoginUrl
- return queryurl
-
def fetchTokens(
self,
token_url,
diff --git a/weconnect/auth/we_connect_session.py b/weconnect/auth/we_connect_session.py
index b18e932..2b4f675 100644
--- a/weconnect/auth/we_connect_session.py
+++ b/weconnect/auth/we_connect_session.py
@@ -1,14 +1,8 @@
-from typing import Dict, Optional, Match
-
-import re
import json
import logging
import requests
-from urllib.parse import parse_qsl, urlparse, urlsplit
-
-from urllib3.util.retry import Retry
-from requests.adapters import HTTPAdapter
+from urllib.parse import parse_qsl, urlparse
from oauthlib.common import add_params_to_uri, generate_nonce, to_unicode
from oauthlib.oauth2 import InsecureTransportError
@@ -17,9 +11,8 @@
from requests.models import CaseInsensitiveDict
from weconnect.auth.openid_session import AccessType
-
from weconnect.auth.vw_web_session import VWWebSession
-from weconnect.errors import APICompatibilityError, AuthentificationError, RetrievalError, TemporaryAuthentificationError
+from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError
LOG = logging.getLogger("weconnect")
@@ -101,179 +94,6 @@ def authorizationUrl(self, url, state=None, **kwargs):
return redirect
- def doWebAuth(self, authorizationUrl): # noqa: C901
- websession: requests.Session = requests.Session()
- retries = Retry(total=self.retries,
- backoff_factor=0.1,
- status_forcelist=[500],
- raise_on_status=False)
- websession.proxies.update(self.proxies)
- websession.mount('https://', HTTPAdapter(max_retries=retries))
- websession.headers = CaseInsensitiveDict({
- 'user-agent': 'Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 '
- 'Chrome/74.0.3729.185 Mobile Safari/537.36',
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,'
- 'application/signed-exchange;v=b3',
- 'accept-language': 'en-US,en;q=0.9',
- 'accept-encoding': 'gzip, deflate',
- 'x-requested-with': 'de.volkswagen.carnet.eu.eremote',
- 'upgrade-insecure-requests': '1',
- })
- while True:
- loginFormResponse: requests.Response = websession.get(authorizationUrl, allow_redirects=False)
- if loginFormResponse.status_code == requests.codes['ok']:
- break
- elif loginFormResponse.status_code in (requests.codes['found'], requests.codes['see_other']):
- if 'Location' in loginFormResponse.headers:
- authorizationUrl = loginFormResponse.headers['Location']
- else:
- raise APICompatibilityError('Forwarding without Location in Header')
- elif loginFormResponse.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- else:
- raise APICompatibilityError('Retrieving credentials page was not successfull,'
- f' status code: {loginFormResponse.status_code}')
-
- # Find login form on page to obtain inputs
- emailFormRegex = r'[^\"]+)\"[^>]*>' \
- r'(?P.+?(?=))'
- match: Optional[Match[str]] = re.search(emailFormRegex, loginFormResponse.text, flags=re.DOTALL)
- if match is None:
- raise APICompatibilityError('No login email form found')
- # retrieve target url from form
- target: str = match.groupdict()['formAction']
-
- # Find all inputs and put those in formData dictionary
- inputRegex = r'[^\"]+)\"([\\n\\r\s]value=\"(?P[^\"]+)\")?[^/]*/>'
- formData: Dict[str, str] = {}
- for match in re.finditer(inputRegex, match.groupdict()['formContent']):
- if match.groupdict()['name']:
- formData[match.groupdict()['name']] = match.groupdict()['value']
- if not all(x in ['_csrf', 'relayState', 'hmac', 'email'] for x in formData):
- raise APICompatibilityError('Could not find all required input fields in login page')
-
- # Set email to the provided username
- formData['email'] = self.sessionuser.username
-
- # build url from form action
- login2Url: str = 'https://identity.vwgroup.io' + target
-
- loginHeadersForm: CaseInsensitiveDict = websession.headers.copy()
- loginHeadersForm['Content-Type'] = 'application/x-www-form-urlencoded'
-
- # Post form content and retrieve credentials page
- login2Response: requests.Response = websession.post(login2Url, headers=loginHeadersForm, data=formData, allow_redirects=True)
-
- if login2Response.status_code != requests.codes['ok']: # pylint: disable=E1101
- if login2Response.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- raise APICompatibilityError('Retrieving credentials page was not successfull,'
- f' status code: {login2Response.status_code}')
-
- credentialsTemplateRegex = r'))\s+\};?\s+'
- match = re.search(credentialsTemplateRegex, login2Response.text, flags=re.DOTALL)
- if match is None:
- raise APICompatibilityError('No credentials form found')
- if match.groupdict()['templateModel']:
- lineRegex = r'\s*(?P[^\:]+)\:\s+[\'\{]?(?P.+)[\'\}][,]?'
- form2Data: Dict[str, str] = {}
- for match in re.finditer(lineRegex, match.groupdict()['templateModel']):
- if match.groupdict()['name'] == 'templateModel':
- templateModelString = '{' + match.groupdict()['value'] + '}'
- if templateModelString.endswith(','):
- templateModelString = templateModelString[:-len(',')]
- templateModel = json.loads(templateModelString)
- if 'relayState' in templateModel:
- form2Data['relayState'] = templateModel['relayState']
- if 'hmac' in templateModel:
- form2Data['hmac'] = templateModel['hmac']
- if 'emailPasswordForm' in templateModel and 'email' in templateModel['emailPasswordForm']:
- form2Data['email'] = templateModel['emailPasswordForm']['email']
- if 'error' in templateModel and templateModel['error'] is not None:
- if templateModel['error'] == 'validator.email.invalid':
- raise AuthentificationError('Error during login, email invalid')
- raise AuthentificationError(f'Error during login: {templateModel["error"]}')
- if 'registerCredentialsPath' in templateModel and templateModel['registerCredentialsPath'] == 'register':
- raise AuthentificationError(f'Error during login, account {self.sessionuser.username} does not exist')
- if 'errorCode' in templateModel:
- raise AuthentificationError('Error during login, is the username correct?')
- if 'postAction' in templateModel:
- target = templateModel['postAction']
- else:
- raise APICompatibilityError('Form does not contain postAction')
- elif match.groupdict()['name'] == 'csrf_token':
- form2Data['_csrf'] = match.groupdict()['value']
- form2Data['password'] = self.sessionuser.password
- if not all(x in ['_csrf', 'relayState', 'hmac', 'email', 'password'] for x in form2Data):
- raise APICompatibilityError('Could not find all required input fields in login page')
-
- login3Url = f'https://identity.vwgroup.io/signin-service/v1/{self.client_id}/{target}'
-
- # Post form content and retrieve userId in forwarding Location
- login3Response: requests.Response = websession.post(login3Url, headers=loginHeadersForm, data=form2Data, allow_redirects=False)
- if login3Response.status_code not in (requests.codes['found'], requests.codes['see_other']):
- if login3Response.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
- raise APICompatibilityError('Forwarding expected (status code 302),'
- f' but got status code {login3Response.status_code}')
- if 'Location' not in login3Response.headers:
- raise APICompatibilityError('No url for forwarding in response headers')
-
- # Parse parametes from forwarding url
- params: Dict[str, str] = dict(parse_qsl(urlsplit(login3Response.headers['Location']).query))
-
- # Check if error
- if 'error' in params and params['error']:
- errorMessages: Dict[str, str] = {
- 'login.errors.password_invalid': 'Password is invalid',
- 'login.error.throttled': 'Login throttled, probably too many wrong logins. You have to wait some'
- ' minutes until a new login attempt is possible'
- }
- if params['error'] in errorMessages:
- error = errorMessages[params['error']]
- else:
- error = params['error']
- raise AuthentificationError(error)
-
- # Check for user id
- if 'userId' not in params or not params['userId']:
- if 'updated' in params and params['updated'] == 'dataprivacy':
- raise AuthentificationError('You have to login at myvolkswagen.de and accept the terms and conditions')
- raise APICompatibilityError('No user id provided. A possible reason is that you have to reconfirm the terms and conditions.')
- self.userId = params['userId']
-
- # Now follow the forwarding until forwarding URL starts with 'weconnect://authenticated#'
- afterLoginUrl: str = login3Response.headers['Location']
-
- consentURL = None
- while True:
- # if 'consent' in afterLoginUrl:
- # consentURL = afterLoginUrl
- if 'terms-and-conditions' in afterLoginUrl:
- raise AuthentificationError('It seems like you need to accept the terms and conditions for the Volkswagen service.'
- f' Try to visit the URL "https://identity.vwgroup.io/{afterLoginUrl}" or log into the Volkswagen smartphone app')
- afterLoginResponse = self.get(afterLoginUrl, allow_redirects=False, access_type=AccessType.NONE)
- if afterLoginResponse.status_code == requests.codes['internal_server_error']:
- raise RetrievalError('Temporary server error during login')
-
- if 'Location' not in afterLoginResponse.headers:
- if consentURL is not None:
- raise AuthentificationError('It seems like you need to accept the terms and conditions for the Volkswagen service.'
- f' Try to visit the URL "{consentURL}" or log into the Volkswagen smartphone app')
- raise APICompatibilityError('No Location for forwarding in response headers')
-
- afterLoginUrl = afterLoginResponse.headers['Location']
-
- if afterLoginUrl.startswith(self.redirect_uri):
- break
-
- if afterLoginUrl.startswith(self.redirect_uri + '#'):
- queryurl = afterLoginUrl.replace(self.redirect_uri + '#', 'https://egal?')
- else:
- queryurl = afterLoginUrl
- return queryurl
-
def fetchTokens(
self,
token_url,
diff --git a/weconnect/weconnect.py b/weconnect/weconnect.py
index be78fab..4ab68b1 100644
--- a/weconnect/weconnect.py
+++ b/weconnect/weconnect.py
@@ -44,7 +44,8 @@ def __init__( # noqa: C901 # pylint: disable=too-many-arguments
numRetries: int = 3,
timeout: bool = None,
selective: Optional[list[Domain]] = None,
- forceReloginAfter: Optional[int] = None
+ forceReloginAfter: Optional[int] = None,
+ acceptTermsOnLogin: Optional[bool] = False,
) -> None:
"""Initialize WeConnect interface. If loginOnInit is true the user will be tried to login.
If loginOnInit is true also an initial fetch of data is performed.
@@ -109,6 +110,7 @@ def __init__( # noqa: C901 # pylint: disable=too-many-arguments
self.__session.timeout = timeout
self.__session.retries = numRetries
self.__session.forceReloginAfter = forceReloginAfter
+ self.__session.acceptTermsOnLogin = acceptTermsOnLogin
if loginOnInit:
self.__session.login()