Skip to content

Commit

Permalink
feat(stats): improve indexing status err handling (#1114)
Browse files Browse the repository at this point in the history
* count retries + return err if passed

* panic whole service if update fails

* document conditional start
  • Loading branch information
bragov4ik authored Nov 12, 2024
1 parent 88062f3 commit 174a251
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 48 deletions.
7 changes: 6 additions & 1 deletion stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ by enabling word wrapping
| `STATS__CONCURRENT_​START_UPDATES` | | Amount of concurrent charts update on start | `3` |
| `STATS__​DEFAULT_​SCHEDULE` | | Schedule used for update groups with no config | `"0 0 1 * * * *"` |
| `STATS__LIMITS__REQUESTED_​POINTS_LIMIT` | | Maximum allowed number of requested points | `182500` |
| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. | `null` |
| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. Used for [conditional update start](#conditional-start). | `null` |
| `STATS__CONDITIONAL_​START__CHECK_PERIOD_SECS` | | Time between start condition checking (if they are not satisfied) | `5` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​ENABLED` | | Enable `blocks_​ratio` threshold | `true` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​THRESHOLD` | | Value for `blocks_​ratio` threshold | `0.98` |
Expand All @@ -95,6 +95,11 @@ by enabling word wrapping

[anchor]: <> (anchors.envs.end.service)

##### Conditional start
In order to prevent incorrect statistics from being collected, there is an option to automatically delay chart update. This is controlled by `STATS_CONDITIONAL_​START_*` environmental variables.

The service will periodically check the enabled start conditions and start updating charts once they are satisfied.

<details><summary>Server settings</summary>
<p>

Expand Down
72 changes: 29 additions & 43 deletions stats/stats-server/src/blockscout_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,10 @@ use crate::settings::{Settings, StartConditionSettings, ToggleableThreshold};

use anyhow::Context;
use blockscout_service_launcher::launcher::ConfigSettings;
use reqwest::StatusCode;
use tokio::time::sleep;
use tracing::{info, warn};

fn is_retryable_code(status_code: &reqwest::StatusCode) -> bool {
matches!(
*status_code,
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::IM_A_TEAPOT
)
}
const RETRIES: u64 = 10;

fn is_threshold_passed(
threshold: &ToggleableThreshold,
Expand Down Expand Up @@ -57,6 +47,7 @@ pub async fn wait_for_blockscout_indexing(
api_config: blockscout_client::Configuration,
wait_config: StartConditionSettings,
) -> Result<(), anyhow::Error> {
let mut consecutive_errors = 0;
loop {
match blockscout_client::apis::main_page_api::get_indexing_status(&api_config).await {
Ok(result)
Expand All @@ -75,16 +66,20 @@ pub async fn wait_for_blockscout_indexing(
info!("Blockscout indexing threshold passed");
return Ok(());
}
Ok(_) => {}
Err(blockscout_client::Error::ResponseError(r)) if is_retryable_code(&r.status) => {
warn!("Error from indexing status endpoint: {r:?}");
Ok(_) => {
info!("Blockscout indexing threshold is not passed");
consecutive_errors = 0;
}
Err(e) => {
return Err(e).context("Requesting indexing status");
if consecutive_errors >= RETRIES {
return Err(e).context("Requesting indexing status");
}
warn!("Error ({consecutive_errors}/{RETRIES}) requesting indexing status: {e:?}");
consecutive_errors += 1;
}
}
info!(
"Blockscout is not indexed enough. Checking again in {} secs",
"Rechecking indexing status in {} secs",
wait_config.check_period_secs
);
sleep(Duration::from_secs(wait_config.check_period_secs.into())).await;
Expand Down Expand Up @@ -139,11 +134,12 @@ mod tests {

async fn test_wait_indexing(
wait_config: StartConditionSettings,
timeout: Option<Duration>,
response: ResponseTemplate,
) -> Result<Result<(), anyhow::Error>, Elapsed> {
let server = mock_indexing_status(response).await;
tokio::time::timeout(
Duration::from_millis(500),
timeout.unwrap_or(Duration::from_millis(500)),
wait_for_blockscout_indexing(
blockscout_client::Configuration::new(Url::from_str(&server.uri()).unwrap()),
wait_config,
Expand All @@ -156,11 +152,12 @@ mod tests {
fn wait_config(
#[default(0.9)] blocks: f64,
#[default(0.9)] internal_transactions: f64,
#[default(0)] check_period_secs: u32,
) -> StartConditionSettings {
StartConditionSettings {
blocks_ratio: ToggleableThreshold::enabled(blocks),
internal_transactions_ratio: ToggleableThreshold::enabled(internal_transactions),
check_period_secs: 0,
check_period_secs,
}
}

Expand All @@ -171,6 +168,7 @@ mod tests {
) {
test_wait_indexing(
wait_config.clone(),
None,
ResponseTemplate::new(200).set_body_string(
r#"{
"finished_indexing": true,
Expand All @@ -186,6 +184,7 @@ mod tests {

test_wait_indexing(
wait_config,
None,
ResponseTemplate::new(200).set_body_string(
r#"{
"finished_indexing": false,
Expand All @@ -206,6 +205,7 @@ mod tests {
) {
test_wait_indexing(
wait_config,
None,
ResponseTemplate::new(200)
.set_body_string(
r#"{
Expand All @@ -229,6 +229,7 @@ mod tests {
) {
test_wait_indexing(
wait_config,
None,
ResponseTemplate::new(200)
.set_body_string(
r#"{
Expand All @@ -247,38 +248,23 @@ mod tests {
#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_retries_with_error_codes(
wait_config: StartConditionSettings,
#[with(0.9, 0.9, 1)] wait_config: StartConditionSettings,
) {
let timeout = Some(Duration::from_millis(1500));
let mut error_servers = JoinSet::from_iter([
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(429)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(500)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(503)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(504)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(429)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(500)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(503)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(504)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(400)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(403)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(404)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(405)),
]);
#[allow(for_loops_over_fallibles)]
for server in error_servers.join_next().await {
let test_result = server.unwrap();
test_result.expect_err("must time out");
}
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_fails_with_error_codes(
wait_config: StartConditionSettings,
) {
let mut error_servers = JoinSet::from_iter([
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(400)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(403)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(404)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(405)),
]);
#[allow(for_loops_over_fallibles)]
for server in error_servers.join_next().await {
let test_result = server.unwrap();
test_result
.expect("must fail immediately")
.expect_err("must report error");
}
}
}
21 changes: 17 additions & 4 deletions stats/stats-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> {
let update_service =
Arc::new(UpdateService::new(db.clone(), blockscout, charts.clone()).await?);

tokio::spawn(async move {
let update_service_handle = tokio::spawn(async move {
// Wait for blockscout to index, if necessary.
if let Some(config) = blockscout_api_config {
if let Err(e) = wait_for_blockscout_indexing(config, settings.conditional_start).await {
tracing::error!(error =? e, "Error starting update service. Failed while waiting for blockscout indexing");
return;
let error_msg =
"Error starting update service. Failed while waiting for blockscout indexing";
tracing::error!(error =? e, error_msg);
panic!("{}. {:?}", error_msg, e);
}
}

Expand All @@ -126,6 +128,7 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> {
settings.force_update_on_start,
)
.await;
Ok(())
});

if settings.metrics.enabled {
Expand All @@ -148,5 +151,15 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> {
metrics: settings.metrics,
};

launcher::launch(&launch_settings, http_router, grpc_router).await
let futures = vec![
update_service_handle,
tokio::spawn(
async move { launcher::launch(&launch_settings, http_router, grpc_router).await },
),
];
let (res, _, others) = futures::future::select_all(futures).await;
for future in others.into_iter() {
future.abort()
}
res?
}

0 comments on commit 174a251

Please sign in to comment.