Skip to content

Commit

Permalink
Replace withBackoff with more explicit alternatives (#638)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomashoneyman authored Jul 27, 2023
1 parent 23cbb4b commit 38fc5d5
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 147 deletions.
29 changes: 18 additions & 11 deletions app/src/App/Effect/GitHub.purs
Original file line number Diff line number Diff line change
Expand Up @@ -258,18 +258,25 @@ request octokit githubRequest@{ route: route@(GitHubRoute method _ _), codec } =
requestWithBackoff :: forall a r. Octokit -> Request a -> Run (LOG + AFF + r) (Either Octokit.GitHubError a)
requestWithBackoff octokit githubRequest = do
Log.debug $ "Making request to " <> Octokit.printGitHubRoute githubRequest.route
let action = Octokit.request octokit githubRequest
result <- Run.liftAff $ withBackoff
{ delay: Duration.Milliseconds 5_000.0
, action
, shouldCancel: \_ -> Octokit.request octokit Octokit.rateLimitRequest >>= case _ of
Right { remaining } | remaining == 0 -> pure false
_ -> pure true
, shouldRetry: \attempt -> if attempt <= 3 then pure (Just action) else pure Nothing
}
result <- Run.liftAff do
let
retryOptions =
{ timeout: defaultRetry.timeout
, retryOnCancel: defaultRetry.retryOnCancel
, retryOnFailure: \attempt err -> case err of
UnexpectedError _ -> false
DecodeError _ -> false
-- https://docs.github.com/en/rest/overview/resources-in-the-rest-api?apiVersion=2022-11-28#exceeding-the-rate-limit
APIError { statusCode } | statusCode >= 400 && statusCode <= 500 -> false
APIError _ -> attempt <= 3
}
withRetry retryOptions (Octokit.request octokit githubRequest)
case result of
Nothing -> pure $ Left $ APIError { statusCode: 400, message: "Unable to reach GitHub servers." }
Just accepted -> pure accepted
Cancelled -> pure $ Left $ APIError { statusCode: 400, message: "Unable to reach GitHub servers." }
Failed err -> do
Log.debug $ "Request failed with error: " <> Octokit.printGitHubError err
pure $ Left err
Succeeded success -> pure $ Right success

type RequestResult =
{ modified :: DateTime
Expand Down
78 changes: 36 additions & 42 deletions app/src/App/Effect/Pursuit.purs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import Data.HTTP.Method as Method
import Data.Map as Map
import Data.MediaType.Common as MediaType
import Data.Profunctor as Profunctor
import Effect.Aff (Milliseconds(..))
import Effect.Aff as Aff
import Registry.App.Effect.Log (LOG)
import Registry.App.Effect.Log as Log
import Registry.App.Legacy.LenientVersion (LenientVersion(..))
Expand Down Expand Up @@ -64,50 +62,46 @@ handleAff (GitHubToken token) = case _ of
Publish payload reply -> do
Log.debug "Pushing to Pursuit..."

let
loop n = do
result <- Run.liftAff $ withBackoff' $ Affjax.Node.request
{ content: Just $ RequestBody.json payload
, headers:
[ RequestHeader.Accept MediaType.applicationJSON
, RequestHeader.RequestHeader "Authorization" ("token " <> token)
]
, method: Left Method.POST
, username: Nothing
, withCredentials: false
, password: Nothing
, responseFormat: ResponseFormat.string
, timeout: Nothing
, url: "https://pursuit.purescript.org/packages"
}

case result of
Nothing -> do
Log.error $ "Pursuit failed to connect after several retries."
pure $ Left $ "Expected to receive a 201 status from Pursuit, but failed to connect after several retries."
Just (Right { status: StatusCode status })
| status == 201 -> do
Log.debug "Received 201 status, which indicates the upload was successful."
pure $ Right unit
| n > 0, status == 400 || status == 502 -> do
Log.debug $ "Received " <> show status <> ", retrying..."
Run.liftAff $ Aff.delay $ Milliseconds 1000.0
loop (n - 1)
Just (Right { body, status: StatusCode status }) -> do
result <- Run.liftAff $ withRetryRequest'
{ content: Just $ RequestBody.json payload
, headers:
[ RequestHeader.Accept MediaType.applicationJSON
, RequestHeader.RequestHeader "Authorization" ("token " <> token)
]
, method: Left Method.POST
, username: Nothing
, withCredentials: false
, password: Nothing
, responseFormat: ResponseFormat.string
, timeout: Nothing
, url: "https://pursuit.purescript.org/packages"
}

result' <- case result of
Cancelled -> do
Log.error $ "Pursuit failed to connect after several retries."
pure $ Left $ "Expected to receive a 201 status from Pursuit, but failed to connect after several retries."
Failed reqError -> case reqError of
AffjaxError err -> do
pure $ Left $ "Pursuit publishing failed with an HTTP error: " <> Affjax.Node.printError err
StatusError { body, status: StatusCode status } -> do
Log.error $ "Pursuit publishing failed with status " <> show status <> " and body\n" <> body
pure $ Left $ "Expected to receive a 201 status from Pursuit, but received " <> show status <> " instead."
Succeeded { body, status: StatusCode status }
| status == 201 -> do
Log.debug "Received 201 status, which indicates the upload was successful."
pure $ Right unit
| otherwise -> do
Log.error $ "Pursuit publishing failed with status " <> show status <> " and body\n" <> body
pure $ Left $ "Expected to receive a 201 status from Pursuit, but received " <> show status <> " instead."
Just (Left httpError) -> do
let printedError = Affjax.Node.printError httpError
Log.error $ "Pursuit publishing failed because of an HTTP error: " <> printedError
pure $ Left "Could not reach Pursuit due to an HTTP error."

reply <$> loop 2
pure $ reply result'

GetPublishedVersions pname reply -> do
let name = PackageName.print pname
let url = "https://pursuit.purescript.org/packages/purescript-" <> name <> "/available-versions"
Log.debug $ "Checking if package docs for " <> name <> " are published on Pursuit using endpoint " <> url
result <- Run.liftAff $ withBackoff' $ Affjax.Node.request
result <- Run.liftAff $ withRetryRequest'
{ content: Nothing
, headers: [ RequestHeader.Accept MediaType.applicationJSON ]
, method: Left Method.GET
Expand All @@ -120,17 +114,17 @@ handleAff (GitHubToken token) = case _ of
}

case result of
Nothing -> do
Cancelled -> do
Log.error $ "Could not reach Pursuit after multiple retries at URL " <> url
pure $ reply $ Left $ "Could not reach Pursuit to determine published versions for " <> name
Just (Left httpError) -> do
Failed (AffjaxError httpError) -> do
let printedError = Affjax.Node.printError httpError
Log.error $ "Pursuit publishing failed because of an HTTP error: " <> printedError
pure $ reply $ Left "Could not reach Pursuit due to an HTTP error."
Just (Right { body, status: StatusCode status }) | status /= 200 -> do
Failed (StatusError { body, status: StatusCode status }) -> do
Log.error $ "Could not fetch published versions from Pursuit (received non-200 response) " <> show status <> " and body\n" <> Argonaut.stringify body
pure $ reply $ Left $ "Received non-200 response from Pursuit: " <> show status
Just (Right { body }) -> case CA.decode availableVersionsCodec body of
Succeeded { body } -> case CA.decode availableVersionsCodec body of
Left error -> do
let printed = CA.printJsonDecodeError error
Log.error $ "Failed to decode body " <> Argonaut.stringify body <> "\n with error: " <> printed
Expand Down
19 changes: 9 additions & 10 deletions app/src/App/Effect/Source.purs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import Registry.App.Prelude

import Affjax.Node as Affjax.Node
import Affjax.ResponseFormat as ResponseFormat
import Affjax.StatusCode (StatusCode(..))
import Data.Array as Array
import Data.DateTime (DateTime)
import Data.HTTP.Method (Method(..))
Expand Down Expand Up @@ -79,10 +78,10 @@ handle = case _ of
clonePackageAtTag = do
let url = Array.fold [ "https://github.com/", owner, "/", repo ]
let args = [ "clone", url, "--branch", ref, "--single-branch", "-c", "advice.detachedHead=false", repoDir ]
withBackoff' (Git.gitCLI args Nothing) >>= case _ of
Nothing -> Aff.throwError $ Aff.error $ "Timed out attempting to clone git tag: " <> url <> " " <> ref
Just (Left err) -> Aff.throwError $ Aff.error err
Just (Right _) -> pure unit
withRetryOnTimeout (Git.gitCLI args Nothing) >>= case _ of
Cancelled -> Aff.throwError $ Aff.error $ "Timed out attempting to clone git tag: " <> url <> " " <> ref
Failed err -> Aff.throwError $ Aff.error err
Succeeded _ -> pure unit

Run.liftAff (Aff.attempt clonePackageAtTag) >>= case _ of
Left error -> do
Expand Down Expand Up @@ -131,23 +130,23 @@ handle = case _ of
let archiveUrl = "https://github.com/" <> owner <> "/" <> repo <> "/archive/" <> tarballName
Log.debug $ "Fetching tarball from GitHub: " <> archiveUrl

response <- Run.liftAff $ withBackoff' $ Affjax.Node.request $ Affjax.Node.defaultRequest
response <- Run.liftAff $ withRetryRequest' $ Affjax.Node.defaultRequest
{ method = Left GET
, responseFormat = ResponseFormat.arrayBuffer
, url = archiveUrl
}

case response of
Nothing -> Except.throw $ "Could not download " <> archiveUrl
Just (Left error) -> do
Cancelled -> Except.throw $ "Could not download " <> archiveUrl
Failed (AffjaxError error) -> do
Log.error $ "Failed to download " <> archiveUrl <> " because of an HTTP error: " <> Affjax.Node.printError error
Except.throw $ "Could not download " <> archiveUrl
Just (Right { status, body }) | status /= StatusCode 200 -> do
Failed (StatusError { status, body }) -> do
buffer <- Run.liftEffect $ Buffer.fromArrayBuffer body
bodyString <- Run.liftEffect $ Buffer.toString UTF8 (buffer :: Buffer)
Log.error $ "Failed to download " <> archiveUrl <> " because of a non-200 status code (" <> show status <> ") with body " <> bodyString
Except.throw $ "Could not download " <> archiveUrl
Just (Right { body }) -> do
Succeeded { body } -> do
Log.debug $ "Successfully downloaded " <> archiveUrl <> " into a buffer."
buffer <- Run.liftEffect $ Buffer.fromArrayBuffer body
Run.liftAff (Aff.attempt (FS.Aff.writeFile absoluteTarballPath buffer)) >>= case _ of
Expand Down
86 changes: 43 additions & 43 deletions app/src/App/Effect/Storage.purs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import Registry.App.Prelude

import Affjax.Node as Affjax.Node
import Affjax.ResponseFormat as ResponseFormat
import Affjax.StatusCode (StatusCode(..))
import Data.Array as Array
import Data.Exists as Exists
import Data.HTTP.Method (Method(..))
Expand All @@ -34,6 +33,7 @@ import Registry.App.Effect.Cache as Cache
import Registry.App.Effect.Log (LOG)
import Registry.App.Effect.Log as Log
import Registry.Constants as Constants
import Registry.Foreign.S3 (Space)
import Registry.Foreign.S3 as S3
import Registry.PackageName as PackageName
import Registry.Version as Version
Expand Down Expand Up @@ -106,13 +106,13 @@ connectS3 key = do
let bucket = "purescript-registry"
let space = "ams3.digitaloceanspaces.com"
Log.debug $ "Connecting to the bucket " <> bucket <> " at space " <> space <> " with public key " <> key.key
Run.liftAff (withBackoff' (Aff.attempt (S3.connect key "ams3.digitaloceanspaces.com" bucket))) >>= case _ of
Nothing ->
Run.liftAff (withRetryOnTimeout (Aff.attempt (S3.connect key "ams3.digitaloceanspaces.com" bucket))) >>= case _ of
Cancelled ->
Except.throw "Timed out when attempting to connect to S3 storage backend."
Just (Left err) -> do
Failed err -> do
Log.error $ "Failed to connect to S3 due to an exception: " <> Aff.message err
Except.throw "Could not connect to storage backend."
Just (Right connection) -> do
Succeeded connection -> do
Log.debug "Connected to S3!"
pure connection

Expand All @@ -126,17 +126,10 @@ handleS3 :: forall r a. S3Env -> Storage a -> Run (LOG + AFF + EFFECT + r) a
handleS3 env = Cache.interpret _storageCache (Cache.handleFs env.cache) <<< case _ of
Query name reply -> map (map reply) Except.runExcept do
s3 <- connectS3 env.s3
resources <- Run.liftAff (withBackoff' (S3.listObjects s3 { prefix: PackageName.print name <> "/" })) >>= case _ of
Nothing -> do
Log.error $ "Failed to list S3 objects for " <> PackageName.print name <> " because the process timed out."
Except.throw $ "Could not upload package " <> PackageName.print name <> " due to an error connecting to the storage backend."
Just objects ->
pure $ map _.key objects
pure $ Set.fromFoldable
$ resources
>>= \resource -> do
{ name: parsedName, version } <- Array.fromFoldable $ parsePackagePath resource
version <$ guard (name == parsedName)
resources <- Except.rethrow =<< Run.liftAff (withRetryListObjects s3 name)
pure $ Set.fromFoldable $ resources >>= \resource -> do
{ name: parsedName, version } <- Array.fromFoldable $ parsePackagePath resource
version <$ guard (name == parsedName)

Download name version path reply -> map (map reply) Except.runExcept do
let package = formatPackageVersion name version
Expand Down Expand Up @@ -167,24 +160,21 @@ handleS3 env = Cache.interpret _storageCache (Cache.handleFs env.cache) <<< case

Log.debug $ "Read file for " <> package <> ", now uploading to " <> packagePath <> "..."
s3 <- connectS3 env.s3
published <- Run.liftAff (withBackoff' (S3.listObjects s3 { prefix: PackageName.print name <> "/" })) >>= case _ of
Nothing -> do
Log.error $ "Failed to list S3 objects for " <> PackageName.print name <> " because the process timed out."
Except.throw $ "Could not upload package " <> package <> " due to an error connecting to the storage backend."
Just objects ->
pure $ map _.key objects

published <- Except.rethrow =<< Run.liftAff (withRetryListObjects s3 name)
if Array.elem packagePath published then do
Log.error $ packagePath <> " already exists on S3."
Except.throw $ "Could not upload " <> package <> " because a package at " <> formatPackageUrl name version <> " already exists."
else do
Log.debug $ "Uploading release to the bucket at path " <> packagePath
let putParams = { key: packagePath, body: buffer, acl: S3.PublicRead }
Run.liftAff (withBackoff' (S3.putObject s3 putParams)) >>= case _ of
Nothing -> do
Log.error "Failed to put object to S3 because the process timed out."
Run.liftAff (withRetryOnTimeout (Aff.attempt (S3.putObject s3 putParams))) >>= case _ of
Cancelled -> do
Log.error "Failed to upload object to S3 because the process timed out."
Except.throw $ "Could not upload package " <> package <> " due to an error connecting to the storage backend."
Failed error -> do
Log.error $ "Failed to upload object to S3 because of an exception: " <> Aff.message error
Except.throw $ "Could not upload package " <> package <> " due to an error connecting to the storage backend."
Just _ ->
Succeeded _ ->
Log.info $ "Uploaded " <> package <> " to the bucket at path " <> packagePath

Delete name version reply -> map (map reply) Except.runExcept do
Expand All @@ -194,21 +184,18 @@ handleS3 env = Cache.interpret _storageCache (Cache.handleFs env.cache) <<< case

Log.debug $ "Deleting " <> package
s3 <- connectS3 env.s3
published <- Run.liftAff (withBackoff' (S3.listObjects s3 { prefix: PackageName.print name <> "/" })) >>= case _ of
Nothing -> do
Log.error $ "Failed to delete " <> package <> " because the process timed out when attempting to list objects at " <> packagePath <> " from S3."
Except.throw $ "Could not delete " <> package <> " from the storage backend."
Just objects ->
pure $ map _.key objects

published <- Except.rethrow =<< Run.liftAff (withRetryListObjects s3 name)
if Array.elem packagePath published then do
Log.debug $ "Deleting release from the bucket at path " <> packagePath
let deleteParams = { key: packagePath }
Run.liftAff (withBackoff' (S3.deleteObject s3 deleteParams)) >>= case _ of
Nothing -> do
Run.liftAff (withRetryOnTimeout (Aff.attempt (S3.deleteObject s3 deleteParams))) >>= case _ of
Cancelled -> do
Log.error $ "Timed out when attempting to delete the release of " <> package <> " from S3 at the path " <> packagePath
Except.throw $ "Could not delete " <> package <> " from the storage backend."
Just _ -> do
Failed error -> do
Log.error $ "Failed to delete object from S3 because of an exception: " <> Aff.message error
Except.throw $ "Could not delete package " <> package <> " due to an error connecting to the storage backend."
Succeeded _ -> do
Log.debug $ "Deleted release of " <> package <> " from S3 at the path " <> packagePath
pure unit
else do
Expand Down Expand Up @@ -253,7 +240,7 @@ downloadS3 name version = do
packageUrl = formatPackageUrl name version

Log.debug $ "Downloading " <> package <> " from " <> packageUrl
response <- Run.liftAff $ withBackoff' $ Affjax.Node.request $ Affjax.Node.defaultRequest
response <- Run.liftAff $ withRetryRequest' $ Affjax.Node.defaultRequest
{ method = Left GET
, responseFormat = ResponseFormat.arrayBuffer
, url = packageUrl
Expand All @@ -262,22 +249,35 @@ downloadS3 name version = do
-- TODO: Rely on the metadata to check the size and hash? Or do we not care
-- for registry-internal operations?
case response of
Nothing -> do
Cancelled -> do
Log.error $ "Failed to download " <> package <> " from " <> packageUrl <> " because of a connection timeout."
Except.throw $ "Failed to download " <> package <> " from the storage backend."
Just (Left error) -> do
Failed (AffjaxError error) -> do
Log.error $ "Failed to download " <> package <> " from " <> packageUrl <> " because of an HTTP error: " <> Affjax.Node.printError error
Except.throw $ "Could not download " <> package <> " from the storage backend."
Just (Right { status, body }) | status /= StatusCode 200 -> do
Failed (StatusError { status, body }) -> do
buffer <- Run.liftEffect $ Buffer.fromArrayBuffer body
bodyString <- Run.liftEffect $ Buffer.toString UTF8 (buffer :: Buffer)
Log.error $ "Failed to download " <> package <> " from " <> packageUrl <> " because of a non-200 status code (" <> show status <> ") with body " <> bodyString
Log.error $ "Failed to download " <> package <> " from " <> packageUrl <> " because of a bad status code (" <> show status <> ") with body " <> bodyString
Except.throw $ "Could not download " <> package <> " from the storage backend."
Just (Right { body }) -> do
Succeeded { body } -> do
Log.debug $ "Successfully downloaded " <> package <> " into a buffer."
buffer :: Buffer <- Run.liftEffect $ Buffer.fromArrayBuffer body
pure buffer

withRetryListObjects :: Space -> PackageName -> Aff (Either String (Array String))
withRetryListObjects s3 name = do
let package = PackageName.print name
result <- withRetry (defaultRetry { retryOnFailure = \attempt _ -> attempt < 3 }) do
Aff.attempt (S3.listObjects s3 { prefix: package <> "/" })
pure $ case result of
Cancelled -> do
Left $ "Failed to list S3 objects for " <> package <> " because the process timed out."
Failed error -> do
Left $ "Failed to list S3 objects for " <> package <> " because of an exception: " <> Aff.message error
Succeeded objects ->
pure $ map _.key objects

-- | A key type for the storage cache. Only supports packages identified by
-- | their name and version.
data StorageCache (c :: Type -> Type -> Type) a = Package PackageName Version (c Buffer a)
Expand Down
Loading

0 comments on commit 38fc5d5

Please sign in to comment.