Skip to content

Commit

Permalink
Re-authenticate if the session is closed by a concurrent request
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Vasilyev <[email protected]>
  • Loading branch information
nolar committed Jun 11, 2023
1 parent c266479 commit fc76eaa
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
10 changes: 10 additions & 0 deletions kopf/_cogs/clients/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ async def request(
)
await errors.check_response(response) # but do not parse it!

# aiohttp raises a generic error if the session/transport is closed, so we try to guess.
except RuntimeError as e:
if context.session.closed:
# NB: this will reset the retry counter and do the full cycle with the new creds.
# TODO: find a way to gracefully replace the active session in the existing context,
# so that all ongoing requests would switch to new session & credentials.
logger.error(f"Request attempt {idx} failed; will try re-authenticating: {what}")
raise errors.APISessionClosed("Session is closed.") from e
raise

except (aiohttp.ClientConnectionError, errors.APIServerError, asyncio.TimeoutError) as e:
if backoff is None: # i.e. the last or the only attempt.
logger.error(f"Request attempt {idx} failed; escalating: {what} -> {e!r}")
Expand Down
2 changes: 1 addition & 1 deletion kopf/_cogs/clients/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
async for key, info, context in vault.extended(APIContext, 'contexts'):
try:
return await fn(*args, **kwargs, context=context)
except errors.APIUnauthorizedError as e:
except (errors.APIUnauthorizedError, errors.APISessionClosed) as e:
await vault.invalidate(key, exc=e)

# Normally, either `vault.extended()` or `vault.invalidate()` raise the login errors.
Expand Down
13 changes: 13 additions & 0 deletions kopf/_cogs/clients/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ class APIConflictError(APIClientError):
pass


class APISessionClosed(APIError):
"""
A helper to escalate from inside the requests to cause re-authentication.
This happens when credentials expire while multiple concurrent requests
are ongoing (including their retries, mostly their back-off timeouts):
one random request will raise HTTP 401 and cause the re-authentication,
while others will retry their requests with the old session (now closed!)
and get a generic RuntimeError from aiohttp, thus failing their whole task.
"""
pass


async def check_response(
response: aiohttp.ClientResponse,
) -> None:
Expand Down

0 comments on commit fc76eaa

Please sign in to comment.