Skip to content

Commit

Permalink
some clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverrahner committed Dec 4, 2023
1 parent 405715b commit 3c753a3
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 126 deletions.
119 changes: 10 additions & 109 deletions volkswagencarnet/vw_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ def base64URLEncode(s):
allow_redirects=False,
params={
"redirect_uri": APP_URI,
# "prompt": "login",
# "nonce": getNonce(),
# "state": getNonce(),
# "code_challenge_method": "s256",
# "code_challenge": challenge.decode(),
"response_type": CLIENT[client].get("TOKEN_TYPES"),
"client_id": CLIENT[client].get("CLIENT_ID"),
"scope": CLIENT[client].get("SCOPE"),
Expand Down Expand Up @@ -340,7 +335,6 @@ def base64URLEncode(s):
)
if req.status != 200:
raise Exception(f"Token exchange failed. Received message: {await req.content.read()}")
# Save tokens as "identity", these are tokens representing the user
self._session_tokens[client] = await req.json()
if "error" in self._session_tokens[client]:
error_msg = self._session_tokens[client].get("error", "")
Expand All @@ -364,92 +358,28 @@ def base64URLEncode(s):
self._session_headers["Authorization"] = "Bearer " + self._session_tokens[client]["access_token"]
return True

async def _getAPITokens(self):
try:
# Get VW Group API tokens
# https://mbboauth-1d.prd.ece.vwg-connect.com/mbbcoauth/mobile/oauth2/v1/token
tokenBody2 = {
"grant_type": "id_token",
"token": self._session_tokens["identity"]["id_token"],
"scope": "sc2:fal",
}
_LOGGER.debug("Trying to fetch api tokens.")
req = await self._session.post(
url="https://mbboauth-1d.prd.ece.vwg-connect.com/mbbcoauth/mobile/oauth2/v1/token",
headers={
"User-Agent": USER_AGENT,
"X-App-Version": XAPPVERSION,
"X-App-Name": XAPPNAME,
"X-Client-Id": XCLIENT_ID,
},
data=tokenBody2,
allow_redirects=False,
)
if req.status > 400:
_LOGGER.debug("API token request failed.")
raise Exception(f"API token request returned with status code {req.status}")
else:
# Save tokens as "vwg", use these for get/posts to VW Group API
self._session_tokens["vwg"] = await req.json()
if "error" in self._session_tokens["vwg"]:
error = self._session_tokens["vwg"].get("error", "")
if "error_description" in self._session_tokens["vwg"]:
error_description = self._session_tokens["vwg"].get("error_description", "")
raise Exception(f"{error} - {error_description}")
else:
raise Exception(error)
if self._session_fulldebug:
for token in self._session_tokens.get("vwg", {}):
_LOGGER.debug(f"Got token {token}")
if not await self.verify_tokens(self._session_tokens["vwg"].get("access_token", ""), "vwg"):
_LOGGER.warning("VW-Group API token could not be verified!")
else:
_LOGGER.debug("VW-Group API token verified OK.")

# Update headers for requests, defaults to using VWG token
self._session_headers["Authorization"] = "Bearer " + self._session_tokens["vwg"]["access_token"]
except Exception as error:
_LOGGER.error(f"Failed to fetch VW-Group API tokens, {error}")
self._session_logged_in = False
return False
return True

async def terminate(self):
"""Log out from connect services."""
_LOGGER.info("Initiating logout")
await self.logout()

async def logout(self):
"""Logout, revoke tokens."""
# TODO: not tested yet
self._session_headers.pop("Authorization", None)

if self._session_logged_in:
if self._session_headers.get("vwg", {}).get("access_token"):
_LOGGER.info("Revoking API Access Token...")
self._session_headers["token_type_hint"] = "access_token"
params = {"token": self._session_tokens["vwg"]["access_token"]}
await self.post(
"https://mbboauth-1d.prd.ece.vwg-connect.com/mbbcoauth/mobile/oauth2/v1/revoke", data=params
)
if self._session_headers.get("vwg", {}).get("refresh_token"):
_LOGGER.info("Revoking API Refresh Token...")
self._session_headers["token_type_hint"] = "refresh_token"
params = {"token": self._session_tokens["vwg"]["refresh_token"]}
await self.post(
"https://mbboauth-1d.prd.ece.vwg-connect.com/mbbcoauth/mobile/oauth2/v1/revoke", data=params
)
self._session_headers.pop("token_type_hint", None)
if self._session_headers.get("identity", {}).get("identity_token"):
_LOGGER.info("Revoking Identity Access Token...")
# params = {
# "token": self._session_tokens['identity']['access_token'],
# "brand": BRAND
# }
# revoke_at = await self.post('https://tokenrefreshservice.apps.emea.vwapps.io/revokeToken', data = params)
# revoke_at = await self.post('https://emea.bff.cariad.digital/login/v1/idk/revoke', data = params)
if self._session_headers.get("identity", {}).get("refresh_token"):
_LOGGER.info("Revoking Identity Refresh Token...")
params = {"token": self._session_tokens["identity"]["refresh_token"], "brand": BRAND}
await self.post("https://tokenrefreshservice.apps.emea.vwapps.io/revokeToken", data=params)
params = {"token": self._session_tokens["identity"]["refresh_token"]}
await self.post("https://emea.bff.cariad.digital/login/v1/idk/revoke", data=params)

# HTTP methods to API
async def _request(self, method, url, **kwargs):
Expand All @@ -474,21 +404,21 @@ async def _request(self, method, url, **kwargs):

try:
if response.status == 204:
res = {"status_code": response.status}
res = {"status_code": response.status, "body": response.text}
elif response.status >= 200 or response.status <= 300:
res = await response.json(loads=json_loads)
else:
res = {}
_LOGGER.debug(f"Not success status code [{response.status}] response: {response}")
_LOGGER.debug(f"Not success status code [{response.status}] response: {response.text}")
if "X-RateLimit-Remaining" in response.headers:
res["rate_limit_remaining"] = response.headers.get("X-RateLimit-Remaining", "")
except Exception:
res = {}
_LOGGER.debug(f"Something went wrong [{response.status}] response: {response}")
_LOGGER.debug(f"Something went wrong [{response.status}] response: {response.text}")
return res

if self._session_fulldebug:
_LOGGER.debug(f'Request for "{url}" returned with status code [{response.status}], response: {res}')
_LOGGER.debug(f'Request for "{url}" returned with status code [{response.status}], response: {res.text}')
else:
_LOGGER.debug(f'Request for "{url}" returned with status code [{response.status}]')
return res
Expand Down Expand Up @@ -1246,10 +1176,6 @@ async def verify_tokens(self, token, type, client="Legacy"):
"https://api.vas.eu.dp15.vwg-connect.com",
"https://api.vas.eu.wcardp.io",
]
elif type == "vwg":
req = await self._session.get(url="https://mbboauth-1d.prd.ece.vwg-connect.com/mbbcoauth/public/jwk/v1")
keys = await req.json()
audience = "mal.prd.ece.vwg-connect.com"
else:
_LOGGER.debug("Not implemented")
return False
Expand All @@ -1261,8 +1187,6 @@ async def verify_tokens(self, token, type, client="Legacy"):
pubkeys[kid] = jwt.algorithms.RSAAlgorithm.from_jwk(to_json(jwk))

token_kid = jwt.get_unverified_header(token)["kid"]
if type == "vwg":
token_kid = "VWGMBB01DELIV1." + token_kid

pubkey = pubkeys[token_kid]
jwt.decode(token, key=pubkey, algorithms=JWT_ALGORITHMS, audience=audience)
Expand All @@ -1286,11 +1210,11 @@ async def refresh_tokens(self):

body = {
"grant_type": "refresh_token",
# "brand": BRAND,
"refresh_token": self._session_tokens["identity"]["refresh_token"],
"client_id": XCLIENT_ID
}
response = await self._session.post(
url="https://tokenrefreshservice.apps.emea.vwapps.io/refreshTokens", headers=tHeaders, data=body
url="https://emea.bff.cariad.digital/login/v1/idk/token", headers=tHeaders, data=body
)
if response.status == 200:
tokens = await response.json()
Expand All @@ -1303,34 +1227,11 @@ async def refresh_tokens(self):
_LOGGER.warning(f"Something went wrong when refreshing {BRAND} account tokens.")
return False

body = {"grant_type": "id_token", "scope": "sc2:fal", "token": self._session_tokens["identity"]["id_token"]}

response = await self._session.post(
url="https://mbboauth-1d.prd.ece.vwg-connect.com/mbbcoauth/mobile/oauth2/v1/token",
headers=tHeaders,
data=body,
allow_redirects=True,
)
if response.status == 200:
tokens = await response.json()
if not await self.verify_tokens(tokens["access_token"], "vwg"):
_LOGGER.warning("Token could not be verified!")
for token in tokens:
self._session_tokens["vwg"][token] = tokens[token]
else:
resp = await response.text()
_LOGGER.warning("Something went wrong when refreshing API tokens. %s" % resp)
return False
return True
except Exception as error:
_LOGGER.warning(f"Could not refresh tokens: {error}")
return False

async def set_token(self, type):
"""Switch between tokens."""
self._session_headers["Authorization"] = "Bearer " + self._session_tokens[type]["access_token"]
return

# Class helpers #
@property
def vehicles(self):
Expand Down
17 changes: 2 additions & 15 deletions volkswagencarnet/vw_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,9 @@
CLIENT = {
"Legacy": {
"CLIENT_ID": "a24fba63-34b3-4d43-b181-942111e6bda8@apps_vw-dilab_com",
# client id for VWG API, legacy Skoda Connect/MySkoda
"SCOPE": "openid profile badge cars dealers vin",
# 'SCOPE': 'openid mbb profile cars address email birthdate badge phone driversLicense dealers profession vin',
"TOKEN_TYPES": "code", # id_token token",
},
"New": {
"CLIENT_ID": "f9a2359a-b776-46d9-bd0c-db1904343117@apps_vw-dilab_com",
# Provides access to new API? tokentype=IDK_TECHNICAL..
"SCOPE": "openid mbb profile",
"TOKEN_TYPES": "code id_token",
},
"Unknown": {
"CLIENT_ID": "72f9d29d-aa2b-40c1-bebe-4c7683681d4c@apps_vw-dilab_com", # gives tokentype=IDK_SMARTLINK ?
"SCOPE": "openid dealers profile email cars address",
"TOKEN_TYPES": "code id_token",
},
"TOKEN_TYPES": "code"
}
}


Expand Down
4 changes: 2 additions & 2 deletions volkswagencarnet/vw_vehicle.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ async def set_charge_min_level(self, level: int):

async def set_charger(self, action) -> bool:
"""Charging actions."""
if not self._services.get("rbatterycharge_v1", False):
if not self._services.get("charging", False):
_LOGGER.info("Remote start/stop of charger is not supported.")
raise Exception("Remote start/stop of charger is not supported.")
if self._in_progress("batterycharge"):
Expand Down Expand Up @@ -480,7 +480,7 @@ async def set_climatisation(self, mode="off", spin=False):

async def set_climater(self, data, spin=False):
"""Climater actions."""
if not self._services.get("rclima_v1", False):
if not self._services.get("climatisation", False):
_LOGGER.info("Remote control of climatisation functions is not supported.")
raise Exception("Remote control of climatisation functions is not supported.")
if self._in_progress("climatisation"):
Expand Down

0 comments on commit 3c753a3

Please sign in to comment.