Skip to content

Commit

Permalink
Revert "Replacing notify by cancellation token to correctly shutdown …
Browse files Browse the repository at this point in the history
…tasks"

This reverts commit d1abc34.
  • Loading branch information
godmodegalactus committed Mar 28, 2024
1 parent 0222906 commit 1e956a2
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 42 deletions.
5 changes: 1 addition & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ dotenv = "0.15.0"
async-channel = "1.8.0"
merge-streams = "0.1.2"
jemallocator = "0.5"
tokio-util = "0.7.10"

quinn = "0.10.2"
quinn-proto = "0.10.5"
Expand Down
3 changes: 1 addition & 2 deletions cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "AGPL"

[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize5", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }

solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }
Expand Down Expand Up @@ -48,4 +48,3 @@ itertools = {workspace = true}
prometheus = { workspace = true }
lazy_static = { workspace = true }
tonic-health = { workspace = true }
tokio-util = { workspace = true }
24 changes: 12 additions & 12 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use solana_sdk::commitment_config::CommitmentConfig;

use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use tokio_util::sync::CancellationToken;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Instant};
use tracing::debug_span;
Expand All @@ -30,7 +31,7 @@ use crate::grpc_subscription::from_grpc_block_update;
fn create_grpc_multiplex_processed_block_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_sender: tokio::sync::mpsc::Sender<ProducedBlock>,
exit_notfier: CancellationToken,
exit_notify: Arc<Notify>,
) -> Vec<JoinHandle<()>> {
const COMMITMENT_CONFIG: CommitmentConfig = CommitmentConfig::processed();

Expand All @@ -42,7 +43,7 @@ fn create_grpc_multiplex_processed_block_task(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).blocks_and_txs(),
autoconnect_tx.clone(),
exit_notfier.clone(),
exit_notify.clone(),
);
tasks.push(task);
}
Expand All @@ -65,7 +66,7 @@ fn create_grpc_multiplex_processed_block_task(
res = blocks_rx.recv() => {
res
},
_ = exit_notfier.cancelled() => {
_ = exit_notify.notified() => {
break;
}
};
Expand Down Expand Up @@ -129,7 +130,7 @@ fn create_grpc_multiplex_block_info_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_info_sender: tokio::sync::mpsc::Sender<BlockInfo>,
commitment_config: CommitmentConfig,
exit_notifier: CancellationToken,
exit_notify: Arc<Notify>,
) -> Vec<JoinHandle<()>> {
let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10);
let mut tasks = vec![];
Expand All @@ -138,7 +139,7 @@ fn create_grpc_multiplex_block_info_task(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
autoconnect_tx.clone(),
exit_notifier.clone(),
exit_notify.clone(),
);
tasks.push(task);
}
Expand All @@ -150,7 +151,7 @@ fn create_grpc_multiplex_block_info_task(
res = blocks_rx.recv() => {
res
},
_ = exit_notifier.cancelled() => {
_ = exit_notify.notified() => {
break;
}
};
Expand Down Expand Up @@ -262,8 +263,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
tokio::sync::mpsc::channel::<BlockInfo>(500);
let (block_info_sender_finalized, mut block_info_reciever_finalized) =
tokio::sync::mpsc::channel::<BlockInfo>(500);

let exit_notify = CancellationToken::new();
let exit_notify = Arc::new(Notify::new());

let processed_block_sender = processed_block_sender.clone();
reconnect_attempts += 1;
Expand Down Expand Up @@ -442,7 +442,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
} // -- END receiver loop
exit_notify.cancel();
exit_notify.notify_waiters();
futures::future::join_all(task_list).await;
} // -- END reconnect loop
});
Expand Down Expand Up @@ -474,7 +474,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
let jh_multiplex_task = tokio::spawn(async move {
loop {
let (autoconnect_tx, mut slots_rx) = tokio::sync::mpsc::channel(10);
let exit_notify = CancellationToken::new();
let exit_notify = Arc::new(Notify::new());

let tasks = grpc_sources
.clone()
Expand Down Expand Up @@ -537,7 +537,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
}
}
} // -- END receiver loop
exit_notify.cancel();
exit_notify.notify_waiters();
futures::future::join_all(tasks).await;
} // -- END reconnect loop
});
Expand Down
5 changes: 2 additions & 3 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use solana_sdk::{
transaction::TransactionError,
};
use solana_transaction_status::{Reward, RewardType};
use tokio_util::sync::CancellationToken;
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -276,7 +275,7 @@ pub fn create_block_processing_task(
grpc_x_token: Option<String>,
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
exit_notfier: CancellationToken,
exit_notfier: Arc<Notify>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -339,7 +338,7 @@ pub fn create_block_processing_task(
}
};
},
_ = exit_notfier.cancelled() => {
_ = exit_notfier.notified() => {
break;
}
}
Expand Down
1 change: 0 additions & 1 deletion services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ chrono = { workspace = true }
rustls = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }
tokio-util = { workspace = true }

[dev-dependencies]
tracing = { workspace = true }
Expand Down
15 changes: 7 additions & 8 deletions services/src/quic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use std::{
Arc,
},
};
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
use tokio_util::sync::CancellationToken;
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};

pub type EndpointPool = RotatingQueue<Endpoint>;

Expand All @@ -41,7 +40,7 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: CancellationToken,
exit_notify: Arc<Notify>,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
Expand All @@ -52,7 +51,7 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: CancellationToken,
exit_notify: Arc<Notify>,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -135,7 +134,7 @@ impl QuicConnection {
conn = self.get_connection() => {
conn
},
_ = exit_notify.cancelled() => {
_ = exit_notify.notified() => {
break;
}
};
Expand All @@ -150,7 +149,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.cancelled() => {
_ = exit_notify.notified() => {
break;
}
};
Expand All @@ -165,7 +164,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.cancelled() => {
_ = exit_notify.notified() => {
break;
}
};
Expand Down Expand Up @@ -248,7 +247,7 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_notify: CancellationToken,
exit_notify: Arc<Notify>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
Expand Down
9 changes: 4 additions & 5 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tokio::{sync::Notify, time::timeout};

lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
Expand Down Expand Up @@ -220,7 +219,7 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_notified: CancellationToken,
exit_notified: Arc<Notify>,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
Expand All @@ -229,7 +228,7 @@ impl QuicConnectionUtils {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.cancelled() => {
_ = exit_notified.notified() => {
break;
}
}
Expand All @@ -239,7 +238,7 @@ impl QuicConnectionUtils {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.cancelled() => {
_ = exit_notified.notified() => {
break;
}
}
Expand Down
11 changes: 5 additions & 6 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use tokio::sync::{
broadcast::{Receiver, Sender},
Notify,
};
use tokio_util::sync::CancellationToken;

use crate::{
quic_connection::{PooledConnection, QuicConnectionPool},
Expand Down Expand Up @@ -49,7 +48,7 @@ struct ActiveConnection {
tpu_address: SocketAddr,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: CancellationToken,
exit_notifier: Arc<Notify>,
}

impl ActiveConnection {
Expand All @@ -66,7 +65,7 @@ impl ActiveConnection {
identity,
data_cache,
connection_parameters,
exit_notifier: CancellationToken::new(),
exit_notifier: Arc::new(Notify::new()),
}
}

Expand Down Expand Up @@ -116,7 +115,7 @@ impl ActiveConnection {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.cancelled() => {
_ = exit_notifier.notified() => {
break;
}
};
Expand Down Expand Up @@ -209,7 +208,7 @@ impl ActiveConnection {
});
}
},
_ = exit_notifier.cancelled() => {
_ = exit_notifier.notified() => {
break 'main_loop;
}
}
Expand Down Expand Up @@ -287,7 +286,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value.exit_notifier.cancel();
value.exit_notifier.notify_waiters();
false
} else {
true
Expand Down

0 comments on commit 1e956a2

Please sign in to comment.