From 174a2516c5005d29451b68aac42158aeae16651f Mon Sep 17 00:00:00 2001 From: Kirill Ivanov <8144358+bragov4ik@users.noreply.github.com> Date: Tue, 12 Nov 2024 11:42:11 +0300 Subject: [PATCH] feat(stats): improve indexing status err handling (#1114) * count retries + return err if passed * panic whole service if update fails * document conditional start --- stats/README.md | 7 +- stats/stats-server/src/blockscout_waiter.rs | 72 +++++++++------------ stats/stats-server/src/server.rs | 21 ++++-- 3 files changed, 52 insertions(+), 48 deletions(-) diff --git a/stats/README.md b/stats/README.md index 44b1721ec..fa73445ea 100644 --- a/stats/README.md +++ b/stats/README.md @@ -84,7 +84,7 @@ by enabling word wrapping | `STATS__CONCURRENT_​START_UPDATES` | | Amount of concurrent charts update on start | `3` | | `STATS__​DEFAULT_​SCHEDULE` | | Schedule used for update groups with no config | `"0 0 1 * * * *"` | | `STATS__LIMITS__REQUESTED_​POINTS_LIMIT` | | Maximum allowed number of requested points | `182500` | -| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. | `null` | +| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. Used for [conditional update start](#conditional-start). | `null` | | `STATS__CONDITIONAL_​START__CHECK_PERIOD_SECS` | | Time between start condition checking (if they are not satisfied) | `5` | | `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​ENABLED` | | Enable `blocks_​ratio` threshold | `true` | | `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​THRESHOLD` | | Value for `blocks_​ratio` threshold | `0.98` | @@ -95,6 +95,11 @@ by enabling word wrapping [anchor]: <> (anchors.envs.end.service) +##### Conditional start +In order to prevent incorrect statistics from being collected, there is an option to automatically delay chart update. This is controlled by `STATS_CONDITIONAL_​START_*` environmental variables. + +The service will periodically check the enabled start conditions and start updating charts once they are satisfied. +
Server settings

diff --git a/stats/stats-server/src/blockscout_waiter.rs b/stats/stats-server/src/blockscout_waiter.rs index 53924045e..6f46f1bae 100644 --- a/stats/stats-server/src/blockscout_waiter.rs +++ b/stats/stats-server/src/blockscout_waiter.rs @@ -4,20 +4,10 @@ use crate::settings::{Settings, StartConditionSettings, ToggleableThreshold}; use anyhow::Context; use blockscout_service_launcher::launcher::ConfigSettings; -use reqwest::StatusCode; use tokio::time::sleep; use tracing::{info, warn}; -fn is_retryable_code(status_code: &reqwest::StatusCode) -> bool { - matches!( - *status_code, - StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT - | StatusCode::TOO_MANY_REQUESTS - | StatusCode::IM_A_TEAPOT - ) -} +const RETRIES: u64 = 10; fn is_threshold_passed( threshold: &ToggleableThreshold, @@ -57,6 +47,7 @@ pub async fn wait_for_blockscout_indexing( api_config: blockscout_client::Configuration, wait_config: StartConditionSettings, ) -> Result<(), anyhow::Error> { + let mut consecutive_errors = 0; loop { match blockscout_client::apis::main_page_api::get_indexing_status(&api_config).await { Ok(result) @@ -75,16 +66,20 @@ pub async fn wait_for_blockscout_indexing( info!("Blockscout indexing threshold passed"); return Ok(()); } - Ok(_) => {} - Err(blockscout_client::Error::ResponseError(r)) if is_retryable_code(&r.status) => { - warn!("Error from indexing status endpoint: {r:?}"); + Ok(_) => { + info!("Blockscout indexing threshold is not passed"); + consecutive_errors = 0; } Err(e) => { - return Err(e).context("Requesting indexing status"); + if consecutive_errors >= RETRIES { + return Err(e).context("Requesting indexing status"); + } + warn!("Error ({consecutive_errors}/{RETRIES}) requesting indexing status: {e:?}"); + consecutive_errors += 1; } } info!( - "Blockscout is not indexed enough. Checking again in {} secs", + "Rechecking indexing status in {} secs", wait_config.check_period_secs ); sleep(Duration::from_secs(wait_config.check_period_secs.into())).await; @@ -139,11 +134,12 @@ mod tests { async fn test_wait_indexing( wait_config: StartConditionSettings, + timeout: Option, response: ResponseTemplate, ) -> Result, Elapsed> { let server = mock_indexing_status(response).await; tokio::time::timeout( - Duration::from_millis(500), + timeout.unwrap_or(Duration::from_millis(500)), wait_for_blockscout_indexing( blockscout_client::Configuration::new(Url::from_str(&server.uri()).unwrap()), wait_config, @@ -156,11 +152,12 @@ mod tests { fn wait_config( #[default(0.9)] blocks: f64, #[default(0.9)] internal_transactions: f64, + #[default(0)] check_period_secs: u32, ) -> StartConditionSettings { StartConditionSettings { blocks_ratio: ToggleableThreshold::enabled(blocks), internal_transactions_ratio: ToggleableThreshold::enabled(internal_transactions), - check_period_secs: 0, + check_period_secs, } } @@ -171,6 +168,7 @@ mod tests { ) { test_wait_indexing( wait_config.clone(), + None, ResponseTemplate::new(200).set_body_string( r#"{ "finished_indexing": true, @@ -186,6 +184,7 @@ mod tests { test_wait_indexing( wait_config, + None, ResponseTemplate::new(200).set_body_string( r#"{ "finished_indexing": false, @@ -206,6 +205,7 @@ mod tests { ) { test_wait_indexing( wait_config, + None, ResponseTemplate::new(200) .set_body_string( r#"{ @@ -229,6 +229,7 @@ mod tests { ) { test_wait_indexing( wait_config, + None, ResponseTemplate::new(200) .set_body_string( r#"{ @@ -247,13 +248,18 @@ mod tests { #[rstest] #[tokio::test] async fn wait_for_blockscout_indexing_retries_with_error_codes( - wait_config: StartConditionSettings, + #[with(0.9, 0.9, 1)] wait_config: StartConditionSettings, ) { + let timeout = Some(Duration::from_millis(1500)); let mut error_servers = JoinSet::from_iter([ - test_wait_indexing(wait_config.clone(), ResponseTemplate::new(429)), - test_wait_indexing(wait_config.clone(), ResponseTemplate::new(500)), - test_wait_indexing(wait_config.clone(), ResponseTemplate::new(503)), - test_wait_indexing(wait_config.clone(), ResponseTemplate::new(504)), + test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(429)), + test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(500)), + test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(503)), + test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(504)), + test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(400)), + test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(403)), + test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(404)), + test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(405)), ]); #[allow(for_loops_over_fallibles)] for server in error_servers.join_next().await { @@ -261,24 +267,4 @@ mod tests { test_result.expect_err("must time out"); } } - - #[rstest] - #[tokio::test] - async fn wait_for_blockscout_indexing_fails_with_error_codes( - wait_config: StartConditionSettings, - ) { - let mut error_servers = JoinSet::from_iter([ - test_wait_indexing(wait_config.clone(), ResponseTemplate::new(400)), - test_wait_indexing(wait_config.clone(), ResponseTemplate::new(403)), - test_wait_indexing(wait_config.clone(), ResponseTemplate::new(404)), - test_wait_indexing(wait_config.clone(), ResponseTemplate::new(405)), - ]); - #[allow(for_loops_over_fallibles)] - for server in error_servers.join_next().await { - let test_result = server.unwrap(); - test_result - .expect("must fail immediately") - .expect_err("must report error"); - } - } } diff --git a/stats/stats-server/src/server.rs b/stats/stats-server/src/server.rs index 4051afcfa..0bd45a44f 100644 --- a/stats/stats-server/src/server.rs +++ b/stats/stats-server/src/server.rs @@ -110,12 +110,14 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> { let update_service = Arc::new(UpdateService::new(db.clone(), blockscout, charts.clone()).await?); - tokio::spawn(async move { + let update_service_handle = tokio::spawn(async move { // Wait for blockscout to index, if necessary. if let Some(config) = blockscout_api_config { if let Err(e) = wait_for_blockscout_indexing(config, settings.conditional_start).await { - tracing::error!(error =? e, "Error starting update service. Failed while waiting for blockscout indexing"); - return; + let error_msg = + "Error starting update service. Failed while waiting for blockscout indexing"; + tracing::error!(error =? e, error_msg); + panic!("{}. {:?}", error_msg, e); } } @@ -126,6 +128,7 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> { settings.force_update_on_start, ) .await; + Ok(()) }); if settings.metrics.enabled { @@ -148,5 +151,15 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> { metrics: settings.metrics, }; - launcher::launch(&launch_settings, http_router, grpc_router).await + let futures = vec![ + update_service_handle, + tokio::spawn( + async move { launcher::launch(&launch_settings, http_router, grpc_router).await }, + ), + ]; + let (res, _, others) = futures::future::select_all(futures).await; + for future in others.into_iter() { + future.abort() + } + res? }