Skip to content

Commit

Permalink
implement exponential backoff (#34)
Browse files Browse the repository at this point in the history
* implement exponential backoff

* backoff on 5xx errors

* fix cargo fmt
  • Loading branch information
Jayko001 authored May 30, 2024
1 parent 10903d1 commit 9132b59
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ pg16 = ["pgrx/pg16", "pgrx-tests/pg16", "supabase-wrappers/pg16"]
pg_test = []

[dependencies]
backoff = { version = "0.4.0", features = ["tokio"] }
chrono = "0.4.26"
clerk-rs = "0.3.0"
pgrx = "=0.11.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
supabase-wrappers = { git = "https://github.com/supabase/wrappers.git", default-features = false}
supabase-wrappers = { git = "https://github.com/supabase/wrappers.git", default-features = false }
tokio = { version = "1", features = ["full"] }
reqwest = "0.11"

[dev-dependencies]
pgrx-tests = "=0.11.3"
pgrx-tests = "=0.11.3"
2 changes: 1 addition & 1 deletion Trunk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for Clerk.com Backend API"
homepage = "https://github.com/tembo-io/clerk_fdw"
documentation = "https://github.com/tembo-io/clerk_fdw"
categories = ["connectors"]
version = "0.3.0"
version = "0.3.1"

[build]
postgres_version = "15"
Expand Down
48 changes: 35 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use clerk_rs::{
apis::organizations_api::Organization, apis::users_api::User, clerk::Clerk, ClerkConfiguration,
};

// TODO: will have to incorporate offset at some point
use backoff::future::retry;
use backoff::ExponentialBackoff;

const PAGE_SIZE: usize = 500;

fn body_to_rows(
Expand Down Expand Up @@ -236,15 +238,38 @@ impl ForeignDataWrapper<ClerkFdwError> for ClerkFdw {
info!("clerk_fdw: received total organizations: {}", total_orgs);
let mut i_org = 0;
for org in org_res.data.iter() {
let membership_resp =
OrganizationMembership::list_organization_memberships(
&self.clerk_client,
&org.id,
Some(PAGE_SIZE as f32),
None,
)
.await;

let membership_resp = retry(ExponentialBackoff::default(), || {
async {
OrganizationMembership::list_organization_memberships(
&self.clerk_client,
&org.id,
Some(PAGE_SIZE as f32),
None,
)
.await
.map_err(|e| match e {
clerk_rs::apis::Error::Reqwest(ref reqwest_error) => {
if let Some(status_code) = reqwest_error.status() {
match status_code {
reqwest::StatusCode::TOO_MANY_REQUESTS |
reqwest::StatusCode::BAD_GATEWAY |
reqwest::StatusCode::SERVICE_UNAVAILABLE |
reqwest::StatusCode::GATEWAY_TIMEOUT |
reqwest::StatusCode::INTERNAL_SERVER_ERROR => {
info!("clerk_fdw: received {} error, backing off", status_code);
backoff::Error::transient(e)
},
_ => backoff::Error::Permanent(e),
}
} else {
backoff::Error::Permanent(e)
}
}
_ => backoff::Error::Permanent(e),
})
}
})
.await;
match membership_resp {
Ok(mem_res) => {
i_org += 1;
Expand All @@ -264,8 +289,6 @@ impl ForeignDataWrapper<ClerkFdwError> for ClerkFdw {
continue;
}
}
// Introduce a delay of 0.05 seconds
std::thread::sleep(std::time::Duration::from_millis(50));
}
if org_res.data.len() < PAGE_SIZE {
info!("clerk_fdw: finished fetching all memberships, total={}", result.len());
Expand All @@ -279,7 +302,6 @@ impl ForeignDataWrapper<ClerkFdwError> for ClerkFdw {
}
}
} else {
// this is where i need to make changes
let mut offset = 0;
loop {
let obj_js =
Expand Down

0 comments on commit 9132b59

Please sign in to comment.