Skip to content

Commit

Permalink
feat(cluster): allow specify a client timeout on creating flight clie…
Browse files Browse the repository at this point in the history
…nt (databendlabs#12529)

* add timeout for create_client

* feat: add timeout for rpc client

* chore: refactor rpc client initialization

* chore: add log about ready time for connections

* fix: ut
  • Loading branch information
flaneur2020 authored and andylokandy committed Nov 27, 2023
1 parent 23a2269 commit dfead6f
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 16 deletions.
7 changes: 6 additions & 1 deletion src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> {
precheck_services(conf).await?;

let mut shutdown_handle = ShutdownHandle::create()?;
let start_time = std::time::Instant::now();

info!("Databend Query start with config: {:?}", conf);

Expand Down Expand Up @@ -346,7 +347,11 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> {
println!(" {}={}", k, v);
}

info!("Ready for connections.");
info!(
"Ready for connections after {}s.",
start_time.elapsed().as_secs_f32()
);

if conf.background.enable {
println!("Start background service");
get_background_service_handler().start().await?;
Expand Down
5 changes: 5 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,9 @@ pub struct QueryConfig {
#[clap(long, default_value = "localhost")]
pub rpc_tls_query_service_domain_name: String,

#[clap(long, default_value = "0")]
pub rpc_client_timeout_secs: u64,

/// Table engine memory enabled
#[clap(long, parse(try_from_str), default_value = "true")]
pub table_engine_memory_enabled: bool,
Expand Down Expand Up @@ -1548,6 +1551,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
rpc_tls_server_key: self.rpc_tls_server_key,
rpc_tls_query_server_root_ca_cert: self.rpc_tls_query_server_root_ca_cert,
rpc_tls_query_service_domain_name: self.rpc_tls_query_service_domain_name,
rpc_client_timeout_secs: self.rpc_client_timeout_secs,
table_engine_memory_enabled: self.table_engine_memory_enabled,
wait_timeout_mills: self.wait_timeout_mills,
max_query_log_size: self.max_query_log_size,
Expand Down Expand Up @@ -1621,6 +1625,7 @@ impl From<InnerQueryConfig> for QueryConfig {
rpc_tls_server_key: inner.rpc_tls_server_key,
rpc_tls_query_server_root_ca_cert: inner.rpc_tls_query_server_root_ca_cert,
rpc_tls_query_service_domain_name: inner.rpc_tls_query_service_domain_name,
rpc_client_timeout_secs: inner.rpc_client_timeout_secs,
table_engine_memory_enabled: inner.table_engine_memory_enabled,
wait_timeout_mills: inner.wait_timeout_mills,
max_query_log_size: inner.max_query_log_size,
Expand Down
2 changes: 2 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ pub struct QueryConfig {
/// Certificate for client to identify query rpc server
pub rpc_tls_query_server_root_ca_cert: String,
pub rpc_tls_query_service_domain_name: String,
pub rpc_client_timeout_secs: u64,
/// Table engine memory enabled
pub table_engine_memory_enabled: bool,
pub wait_timeout_mills: u64,
Expand Down Expand Up @@ -247,6 +248,7 @@ impl Default for QueryConfig {
rpc_tls_server_key: "".to_string(),
rpc_tls_query_server_root_ca_cert: "".to_string(),
rpc_tls_query_service_domain_name: "localhost".to_string(),
rpc_client_timeout_secs: 0,
table_engine_memory_enabled: true,
wait_timeout_mills: 5000,
max_query_log_size: 10_000,
Expand Down
36 changes: 21 additions & 15 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient;
use common_base::base::tokio;
Expand Down Expand Up @@ -235,10 +236,13 @@ impl ClusterDiscovery {
let mut res = Vec::with_capacity(cluster_nodes.len());
for node in &cluster_nodes {
if node.id != self.local_id {
let start_at = Instant::now();
if let Err(cause) = create_client(config, &node.flight_address).await {
warn!(
"Cannot connect node [{:?}], remove it in query. cause: {:?}",
node.flight_address, cause
"Cannot connect node [{:?}] after {:?}s, remove it in query. cause: {:?}",
node.flight_address,
start_at.elapsed().as_secs_f32(),
cause
);

continue;
Expand Down Expand Up @@ -519,17 +523,19 @@ impl ClusterHeartbeat {

#[async_backtrace::framed]
pub async fn create_client(config: &InnerConfig, address: &str) -> Result<FlightClient> {
match config.tls_query_cli_enabled() {
true => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(
address.to_owned(),
None,
Some(config.query.to_rpc_client_tls_config()),
)
.await?,
))),
false => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
))),
}
let timeout = if config.query.rpc_client_timeout_secs > 0 {
Some(Duration::from_secs(config.query.rpc_client_timeout_secs))
} else {
None
};

let rpc_tls_config = if config.tls_query_cli_enabled() {
Some(config.query.to_rpc_client_tls_config())
} else {
None
};

Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address.to_owned(), timeout, rpc_tls_config).await?,
)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'query' | 'openai_api_version' | '' | '' |
| 'query' | 'parquet_fast_read_bytes' | 'null' | '' |
| 'query' | 'quota' | 'null' | '' |
| 'query' | 'rpc_client_timeout_secs' | '0' | '' |
| 'query' | 'rpc_tls_query_server_root_ca_cert' | '' | '' |
| 'query' | 'rpc_tls_query_service_domain_name' | 'localhost' | '' |
| 'query' | 'rpc_tls_server_cert' | '' | '' |
Expand Down

0 comments on commit dfead6f

Please sign in to comment.