From 092b385c612bc57211574cd7a6ec5b563aa8094f Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 08:20:08 +0500 Subject: [PATCH 1/3] use an unbounded channel to process transactions --- Cargo.lock | 15 ++++++++ core/src/db.rs | 2 +- indexer/Cargo.toml | 2 +- indexer/src/handler.rs | 75 ++++++++++++++++++++++++++++++++++++---- indexer/src/lib.rs | 3 ++ indexer/src/processor.rs | 45 +++--------------------- 6 files changed, 93 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63769e3..cd49ec5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,6 +1458,20 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -5406,6 +5420,7 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", + "crossbeam", "dashmap", "futures", "hex", diff --git a/core/src/db.rs b/core/src/db.rs index 2cc9c7f..18ce10b 100644 --- a/core/src/db.rs +++ b/core/src/db.rs @@ -12,7 +12,7 @@ pub struct DbArgs { pub connection_timeout: u64, #[arg(long, env, default_value_t = 10)] pub acquire_timeout: u64, - #[arg(long, env, default_value_t = 60)] + #[arg(long, env, default_value_t = 10)] pub idle_timeout: u64, #[arg(long, env)] pub database_url: String, diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index c9f3173..7b97ec1 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -29,7 +29,7 @@ dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } - +crossbeam = "0.8.2" [dependencies.hub-core] package = "holaplex-hub-core" diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index f70c3ee..223dd7e 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,16 +1,23 @@ use std::sync::Arc; +use crossbeam::channel::{self, Receiver, Sender}; +use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; use hub_core::{ backon::{ExponentialBuilder, Retryable}, prelude::*, producer::Producer, - tokio, + tokio::{ + self, + task::{self, JoinSet}, + }, }; use solana_client::rpc_client::RpcClient; use yellowstone_grpc_client::GeyserGrpcClientError; -use yellowstone_grpc_proto::prelude::SubscribeRequest; +use yellowstone_grpc_proto::prelude::{ + subscribe_update::UpdateOneof, SubscribeRequest, SubscribeUpdateTransaction, +}; use crate::{processor::Processor, Args, GeyserGrpcConnector}; @@ -18,6 +25,10 @@ use crate::{processor::Processor, Args, GeyserGrpcConnector}; pub struct MessageHandler { connector: GeyserGrpcConnector, processor: Processor, + dashmap: DashMap>, + tx: Sender, + rx: Receiver, + parallelism: usize, } impl MessageHandler { @@ -26,6 +37,7 @@ impl MessageHandler { dragon_mouth_endpoint, dragon_mouth_x_token, solana_endpoint, + parallelism, db, } = args; @@ -35,12 +47,16 @@ impl MessageHandler { let rpc = Arc::new(RpcClient::new(solana_endpoint)); let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token); - + let (tx, rx) = channel::unbounded(); let processor = Processor::new(db, rpc, producer); Ok(Self { connector, processor, + dashmap: DashMap::new(), + tx, + rx, + parallelism, }) } @@ -54,9 +70,23 @@ impl MessageHandler { .map_err(GeyserGrpcClientError::SubscribeSendError)?; while let Some(message) = stream.next().await { - self.processor.process(message).await?; + match message { + Ok(msg) => match msg.update_oneof { + Some(UpdateOneof::Transaction(tx)) => { + self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + }, + Some(UpdateOneof::Slot(slot)) => { + if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { + for tx in transactions { + self.tx.send(tx)?; + } + } + }, + _ => {}, + }, + Err(error) => bail!("stream error: {:?}", error), + }; } - Ok(()) }) .retry( @@ -81,12 +111,42 @@ impl MessageHandler { }); let mpl_bubblegum_stream = tokio::spawn({ + let handler = self.clone(); async move { - self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) + handler + .connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) .await } }); + let rx = self.rx.clone(); + let processor = self.processor; + + let process_task = task::spawn(async move { + let mut set = JoinSet::new(); + + loop { + let processor = processor.clone(); + + while set.len() >= self.parallelism { + match set.join_next().await { + Some(Err(e)) => bail!("failed to join task {:?}", e), + Some(Ok(_)) | None => (), + } + } + + match rx.recv() { + Ok(tx) => { + set.spawn(processor.process_transaction(tx)); + }, + Err(e) => { + error!("{:?}", e); + return Result::<(), Error>::Err(e.into()); + }, + } + } + }); + tokio::select! { Err(e) = spl_token_stream => { bail!("spl token stream error: {:?}", e) @@ -94,6 +154,9 @@ impl MessageHandler { Err(e) = mpl_bubblegum_stream => { bail!("mpl bumblegum stream error: {:?}", e) } + Err(e) = process_task => { + bail!("Receiver err: {:?}", e) + } } } } diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 5b66883..fbee2fe 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -18,6 +18,9 @@ pub struct Args { #[arg(long, env)] pub solana_endpoint: String, + #[arg(long, short = 'p', env, default_value_t = 8)] + pub parallelism: usize, + #[command(flatten)] pub db: db::DbArgs, } diff --git a/indexer/src/processor.rs b/indexer/src/processor.rs index 71bc0f2..6847de8 100644 --- a/indexer/src/processor.rs +++ b/indexer/src/processor.rs @@ -2,7 +2,6 @@ use std::{convert::TryInto, sync::Arc}; use anchor_lang::AnchorDeserialize; use backoff::ExponentialBackoff; -use dashmap::DashMap; use holaplex_hub_nfts_solana_core::{ db::Connection, proto::{ @@ -13,25 +12,19 @@ use holaplex_hub_nfts_solana_core::{ CollectionMint, CompressionLeaf, }; use holaplex_hub_nfts_solana_entity::compression_leafs; -use hub_core::{prelude::*, producer::Producer, tokio::task}; +use hub_core::{prelude::*, producer::Producer}; use mpl_bubblegum::utils::get_asset_id; use solana_client::rpc_client::RpcClient; use solana_program::program_pack::Pack; use solana_sdk::{pubkey::Pubkey, signature::Signature}; use spl_token::{instruction::TokenInstruction, state::Account}; -use yellowstone_grpc_proto::{ - prelude::{ - subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction, - }, - tonic::Status, -}; +use yellowstone_grpc_proto::prelude::{Message, SubscribeUpdateTransaction}; #[derive(Clone)] pub struct Processor { db: Connection, rpc: Arc, producer: Producer, - dashmap: DashMap>, } impl Processor { @@ -40,38 +33,7 @@ impl Processor { rpc: Arc, producer: Producer, ) -> Self { - Self { - db, - rpc, - producer, - dashmap: DashMap::new(), - } - } - - pub(crate) async fn process(&self, message: Result) -> Result<()> { - match message { - Ok(msg) => match msg.update_oneof { - Some(UpdateOneof::Transaction(tx)) => { - self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); - }, - Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { - let handles: Vec<_> = transactions - .into_iter() - .map(|tx| task::spawn(self.clone().process_transaction(tx))) - .collect(); - - for handle in handles { - handle.await?? - } - } - }, - _ => {}, - }, - Err(error) => bail!("stream error: {:?}", error), - }; - - Ok(()) + Self { db, rpc, producer } } pub(crate) async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> { @@ -140,6 +102,7 @@ impl Processor { if compression_leaf.is_none() { return Ok(()); } + let compression_leaf = compression_leaf.context("Compression leaf not found")?; let collection_mint_id = compression_leaf.id; From 6f7d4c298c91ccdc53986bb8d6310a4af7e29b10 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 09:30:51 +0500 Subject: [PATCH 2/3] use tokio unbounded channel --- Cargo.lock | 15 --------------- indexer/Cargo.toml | 1 - indexer/src/handler.rs | 40 ++++++++++++++++++++-------------------- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd49ec5..63769e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,20 +1458,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -5420,7 +5406,6 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", - "crossbeam", "dashmap", "futures", "hex", diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 7b97ec1..23ff826 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -29,7 +29,6 @@ dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } -crossbeam = "0.8.2" [dependencies.hub-core] package = "holaplex-hub-core" diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index 223dd7e..2e6b21e 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use crossbeam::channel::{self, Receiver, Sender}; use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; @@ -10,6 +9,10 @@ use hub_core::{ producer::Producer, tokio::{ self, + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Mutex, + }, task::{self, JoinSet}, }, }; @@ -25,9 +28,8 @@ use crate::{processor::Processor, Args, GeyserGrpcConnector}; pub struct MessageHandler { connector: GeyserGrpcConnector, processor: Processor, - dashmap: DashMap>, - tx: Sender, - rx: Receiver, + tx: UnboundedSender, + rx: Arc>>, parallelism: usize, } @@ -47,15 +49,14 @@ impl MessageHandler { let rpc = Arc::new(RpcClient::new(solana_endpoint)); let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token); - let (tx, rx) = channel::unbounded(); + let (tx, rx) = mpsc::unbounded_channel(); let processor = Processor::new(db, rpc, producer); Ok(Self { connector, processor, - dashmap: DashMap::new(), tx, - rx, + rx: Arc::new(Mutex::new(rx)), parallelism, }) } @@ -63,7 +64,7 @@ impl MessageHandler { async fn connect(&self, request: SubscribeRequest) -> Result<()> { (|| async { let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?; - + let dashmap = DashMap::new(); subscribe_tx .send(request.clone()) .await @@ -73,10 +74,10 @@ impl MessageHandler { match message { Ok(msg) => match msg.update_oneof { Some(UpdateOneof::Transaction(tx)) => { - self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); }, Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { + if let Some((_, transactions)) = dashmap.remove(&slot.slot) { for tx in transactions { self.tx.send(tx)?; } @@ -119,7 +120,6 @@ impl MessageHandler { } }); - let rx = self.rx.clone(); let processor = self.processor; let process_task = task::spawn(async move { @@ -127,22 +127,22 @@ impl MessageHandler { loop { let processor = processor.clone(); + let mut rx = self.rx.lock().await; while set.len() >= self.parallelism { match set.join_next().await { - Some(Err(e)) => bail!("failed to join task {:?}", e), + Some(Err(e)) => { + return Result::<(), Error>::Err(anyhow!( + "failed to join task {:?}", + e + )); + }, Some(Ok(_)) | None => (), } } - match rx.recv() { - Ok(tx) => { - set.spawn(processor.process_transaction(tx)); - }, - Err(e) => { - error!("{:?}", e); - return Result::<(), Error>::Err(e.into()); - }, + if let Some(tx) = rx.recv().await { + set.spawn(processor.process_transaction(tx)); } } }); From 3091f2aef9e7699891499897aa08b6e6cc700e38 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 12:15:44 +0500 Subject: [PATCH 3/3] replace dashmap with hashmap --- Cargo.lock | 14 -------------- indexer/Cargo.toml | 1 - indexer/src/handler.rs | 7 +++---- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63769e3..a14fdfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1595,19 +1595,6 @@ dependencies = [ "syn 2.0.37", ] -[[package]] -name = "dashmap" -version = "5.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" -dependencies = [ - "cfg-if", - "hashbrown 0.14.0", - "lock_api", - "once_cell", - "parking_lot_core 0.9.8", -] - [[package]] name = "data-encoding" version = "2.4.0" @@ -5406,7 +5393,6 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", - "dashmap", "futures", "hex", "holaplex-hub-core", diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 23ff826..3837b4c 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -25,7 +25,6 @@ solana-program = "1.14" anchor-lang = "0.26.0" yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.7.1+solana.1.16.1" } yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.7.1+solana.1.16.1" } -dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index 2e6b21e..0abc9c0 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; use hub_core::{ @@ -64,7 +63,7 @@ impl MessageHandler { async fn connect(&self, request: SubscribeRequest) -> Result<()> { (|| async { let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?; - let dashmap = DashMap::new(); + let mut hashmap = std::collections::HashMap::new(); subscribe_tx .send(request.clone()) .await @@ -74,10 +73,10 @@ impl MessageHandler { match message { Ok(msg) => match msg.update_oneof { Some(UpdateOneof::Transaction(tx)) => { - dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + hashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); }, Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = dashmap.remove(&slot.slot) { + if let Some(transactions) = hashmap.remove(&slot.slot) { for tx in transactions { self.tx.send(tx)?; }