From a65376e58982d2fb8a3b5d794a6ddb1820fd4147 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Thu, 3 Oct 2024 13:25:06 +0400 Subject: [PATCH] fix(user-ops-indexer): stop indexing from already closed ws connections (#1072) * fix(user-ops-indexer): stop indexing from already closed ws connections * chore: bump launcher --- user-ops-indexer/Cargo.lock | 4 ++-- user-ops-indexer/Cargo.toml | 2 +- .../src/indexer/base_indexer.rs | 20 ++++++++++++------- .../user-ops-indexer-server/src/indexer.rs | 7 ++++++- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/user-ops-indexer/Cargo.lock b/user-ops-indexer/Cargo.lock index 092bf57b5..7eb3b8376 100644 --- a/user-ops-indexer/Cargo.lock +++ b/user-ops-indexer/Cargo.lock @@ -919,9 +919,9 @@ dependencies = [ [[package]] name = "blockscout-service-launcher" -version = "0.10.0" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "545355abddfc1617d40529141b7e849feed732328ccf54850f691dc31cf24856" +checksum = "b5cec0536a1cbb561e98f807bf3b5b913120edaa58f5caaa45d07fb58353e40e" dependencies = [ "actix-cors", "actix-web", diff --git a/user-ops-indexer/Cargo.toml b/user-ops-indexer/Cargo.toml index 9674e2dd3..343c415b6 100644 --- a/user-ops-indexer/Cargo.toml +++ b/user-ops-indexer/Cargo.toml @@ -9,4 +9,4 @@ members = [ ] [workspace.dependencies] -blockscout-service-launcher = "0.10.0" \ No newline at end of file +blockscout-service-launcher = "0.13.2" \ No newline at end of file diff --git a/user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs b/user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs index 48392569d..c9f6c3cfc 100644 --- a/user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs +++ b/user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs @@ -12,12 +12,12 @@ use ethers::prelude::{ abi::{AbiEncode, Error}, parse_log, types::{Address, Bytes, Filter, Log, TransactionReceipt}, - EthEvent, Middleware, NodeClient, Provider, ProviderError, H256, + EthEvent, Middleware, NodeClient, Provider, ProviderError, WsClientError, H256, }; use futures::{ stream, stream::{repeat_with, BoxStream}, - Stream, StreamExt, + Stream, StreamExt, TryStreamExt, }; use sea_orm::DatabaseConnection; use std::{future, num::NonZeroUsize, sync::Arc, time, time::Duration}; @@ -216,9 +216,16 @@ impl Indexer { .filter_map(|tx_hash| async move { tx_hash }); stream_txs - .for_each_concurrent(Some(self.settings.concurrency as usize), |tx| async move { + .map(Ok) + .try_for_each_concurrent(Some(self.settings.concurrency as usize), |tx| async move { let mut backoff = vec![5, 20, 120].into_iter().map(Duration::from_secs); - while let Err(err) = &self.handle_tx(tx, variant).await { + while let Err(err) = self.handle_tx(tx, variant).await { + // terminate stream if WS connection is closed, indexer will be restarted + if self.client.as_ref().supports_subscriptions() && err.to_string() == WsClientError::UnexpectedClose.to_string() { + tracing::error!(error = ?err, tx_hash = ?tx, "tx handler failed, ws connection closed, exiting"); + return Err(err); + } + match backoff.next() { None => { tracing::error!(error = ?err, tx_hash = ?tx, "tx handler failed, skipping"); @@ -230,10 +237,9 @@ impl Indexer { } }; } + Ok(()) }) - .await; - - Ok(()) + .await } async fn fetch_jobs_for_block_range( diff --git a/user-ops-indexer/user-ops-indexer-server/src/indexer.rs b/user-ops-indexer/user-ops-indexer-server/src/indexer.rs index ee8c8a952..d25519dff 100644 --- a/user-ops-indexer/user-ops-indexer-server/src/indexer.rs +++ b/user-ops-indexer/user-ops-indexer-server/src/indexer.rs @@ -70,7 +70,12 @@ async fn start_indexer_with_retries { - tracing::error!(error = ?err, version = L::version(), ?delay, "indexer startup failed, retrying"); + tracing::error!( + error = ?err, + version = L::version(), + ?delay, + "indexer stream ended with error, retrying" + ); } Ok(_) => { if !settings.realtime.enabled {