diff --git a/Cargo.lock b/Cargo.lock index 74d3192a279e4..5af5cc46444f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3702,6 +3702,7 @@ version = "0.1.0" dependencies = [ "anyerror", "anyhow", + "async-backtrace", "databend-common-arrow", "databend-common-base", "databend-common-building", diff --git a/src/common/base/src/runtime/mod.rs b/src/common/base/src/runtime/mod.rs index 17c3890fdf90a..bd34e051ef9a9 100644 --- a/src/common/base/src/runtime/mod.rs +++ b/src/common/base/src/runtime/mod.rs @@ -42,6 +42,7 @@ pub use runtime::execute_futures_in_parallel; pub use runtime::spawn; pub use runtime::spawn_blocking; pub use runtime::spawn_local; +pub use runtime::spawn_named; pub use runtime::try_block_on; pub use runtime::try_spawn_blocking; pub use runtime::Dropper; diff --git a/src/common/base/src/runtime/runtime.rs b/src/common/base/src/runtime/runtime.rs index 920c342ebc8f3..204e37ae1e735 100644 --- a/src/common/base/src/runtime/runtime.rs +++ b/src/common/base/src/runtime/runtime.rs @@ -27,6 +27,7 @@ use databend_common_exception::Result; use databend_common_exception::ResultExt; use futures::future; use futures::FutureExt; +use log::info; use log::warn; use tokio::runtime::Builder; use tokio::runtime::Handle; @@ -34,7 +35,6 @@ use tokio::sync::oneshot; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; -// use tokio::task::JoinHandle; use crate::runtime::catch_unwind::CatchUnwindFuture; use crate::runtime::drop_guard; use crate::runtime::memory::MemStat; @@ -88,7 +88,7 @@ pub trait TrySpawn { /// /// It allows to return an error before spawning the task. #[track_caller] - fn try_spawn(&self, task: T) -> Result> + fn try_spawn(&self, task: T, name: Option) -> Result> where T: Future + Send + 'static, T::Output: Send + 'static; @@ -102,18 +102,18 @@ pub trait TrySpawn { T: Future + Send + 'static, T::Output: Send + 'static, { - self.try_spawn(task).unwrap() + self.try_spawn(task, None).unwrap() } } impl TrySpawn for Arc { #[track_caller] - fn try_spawn(&self, task: T) -> Result> + fn try_spawn(&self, task: T, name: Option) -> Result> where T: Future + Send + 'static, T::Output: Send + 'static, { - self.as_ref().try_spawn(task) + self.as_ref().try_spawn(task, name) } #[track_caller] @@ -149,10 +149,14 @@ impl Runtime { let handle = runtime.handle().clone(); + let n = name.clone(); // Block the runtime to shutdown. let join_handler = Thread::spawn(move || { - // We ignore channel is closed. - let _ = runtime.block_on(recv_stop); + let _ = recv_stop.blocking_recv(); + info!( + "Runtime({:?}) received shutdown signal, start to shut down", + n + ); match !cfg!(debug_assertions) { true => false, @@ -257,7 +261,11 @@ impl Runtime { #[allow(clippy::disallowed_methods)] tokio::task::block_in_place(|| { self.handle - .block_on(location_future(future, std::panic::Location::caller())) + .block_on(location_future( + future, + std::panic::Location::caller(), + None, + )) .with_context(|| "failed to block on future".to_string()) .flatten() }) @@ -348,20 +356,28 @@ impl Runtime { impl TrySpawn for Runtime { #[track_caller] - fn try_spawn(&self, task: T) -> Result> + fn try_spawn(&self, task: T, name: Option) -> Result> where T: Future + Send + 'static, T::Output: Send + 'static, { let task = ThreadTracker::tracking_future(task); - let task = match ThreadTracker::query_id() { - None => async_backtrace::location!(String::from(GLOBAL_TASK_DESC)).frame(task), - Some(query_id) => { - async_backtrace::location!(format!("Running query {} spawn task", query_id)) - .frame(task) + + let location_name = { + if let Some(name) = name { + name + } else { + match ThreadTracker::query_id() { + None => String::from(GLOBAL_TASK_DESC), + Some(query_id) => { + format!("Running query {} spawn task", query_id) + } + } } }; + let task = async_backtrace::location!(location_name).frame(task); + #[expect(clippy::disallowed_methods)] Ok(JoinHandle::create(self.handle.spawn(task))) } @@ -380,6 +396,7 @@ impl Drop for Dropper { // Send a signal to say i am dropping. if let Some(close_sender) = self.close.take() { if close_sender.send(()).is_ok() { + info!("close_sender to shutdown Runtime is sent"); match self.join_handler.take().unwrap().join() { Err(e) => warn!("Runtime dropper panic, {:?}", e), Ok(true) => { @@ -436,7 +453,25 @@ where F::Output: Send + 'static, { #[expect(clippy::disallowed_methods)] - tokio::spawn(location_future(future, std::panic::Location::caller())) + tokio::spawn(location_future( + future, + std::panic::Location::caller(), + None, + )) +} + +#[track_caller] +pub fn spawn_named(future: F, name: String) -> tokio::task::JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + #[expect(clippy::disallowed_methods)] + tokio::spawn(location_future( + future, + std::panic::Location::caller(), + Some(name), + )) } #[track_caller] @@ -446,7 +481,11 @@ where F::Output: Send + 'static, { #[expect(clippy::disallowed_methods)] - tokio::task::spawn_local(location_future(future, std::panic::Location::caller())) + tokio::task::spawn_local(location_future( + future, + std::panic::Location::caller(), + None, + )) } #[track_caller] @@ -476,8 +515,11 @@ where pub fn block_on(future: F) -> F::Output { #[expect(clippy::disallowed_methods)] tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(location_future(future, std::panic::Location::caller())) + tokio::runtime::Handle::current().block_on(location_future( + future, + std::panic::Location::caller(), + None, + )) }) } @@ -487,7 +529,11 @@ pub fn try_block_on(future: F) -> std::result::Result { Err(_) => Err(future), #[expect(clippy::disallowed_methods)] Ok(handler) => Ok(tokio::task::block_in_place(|| { - handler.block_on(location_future(future, std::panic::Location::caller())) + handler.block_on(location_future( + future, + std::panic::Location::caller(), + None, + )) })), } } @@ -495,6 +541,7 @@ pub fn try_block_on(future: F) -> std::result::Result { fn location_future( future: F, frame_location: &'static Location, + frame_name: Option, ) -> impl Future where F: Future, @@ -506,9 +553,13 @@ where // TODO: tracking payload let future = ThreadTracker::tracking_future(future); - let frame_name = std::any::type_name::() - .trim_end_matches("::{{closure}}") - .to_string(); + let frame_name = if let Some(n) = frame_name { + n + } else { + std::any::type_name::() + .trim_end_matches("::{{closure}}") + .to_string() + }; async_backtrace::location!( frame_name, diff --git a/src/meta/client/Cargo.toml b/src/meta/client/Cargo.toml index a84fb13dec351..0a9f431bc7ab9 100644 --- a/src/meta/client/Cargo.toml +++ b/src/meta/client/Cargo.toml @@ -15,6 +15,7 @@ test = true [dependencies] anyerror = { workspace = true } +async-backtrace = { workspace = true } databend-common-arrow = { workspace = true } databend-common-base = { workspace = true } databend-common-grpc = { workspace = true } diff --git a/src/meta/client/src/established_client.rs b/src/meta/client/src/established_client.rs index 770da606b2832..5aa9dd9850b7f 100644 --- a/src/meta/client/src/established_client.rs +++ b/src/meta/client/src/established_client.rs @@ -147,6 +147,7 @@ impl EstablishedClient { self.error.lock().take() } + #[async_backtrace::framed] pub async fn kv_api( &mut self, request: impl tonic::IntoRequest, @@ -154,6 +155,7 @@ impl EstablishedClient { self.client.kv_api(request).await.update_client(self) } + #[async_backtrace::framed] pub async fn kv_read_v1( &mut self, request: impl tonic::IntoRequest, @@ -162,6 +164,7 @@ impl EstablishedClient { resp.update_client(self) } + #[async_backtrace::framed] pub async fn export( &mut self, request: impl tonic::IntoRequest, @@ -169,6 +172,7 @@ impl EstablishedClient { self.client.export(request).await.update_client(self) } + #[async_backtrace::framed] pub async fn export_v1( &mut self, request: impl tonic::IntoRequest, @@ -176,6 +180,7 @@ impl EstablishedClient { self.client.export_v1(request).await.update_client(self) } + #[async_backtrace::framed] pub async fn watch( &mut self, request: impl tonic::IntoRequest, @@ -183,6 +188,7 @@ impl EstablishedClient { self.client.watch(request).await.update_client(self) } + #[async_backtrace::framed] pub async fn transaction( &mut self, request: impl tonic::IntoRequest, @@ -190,6 +196,7 @@ impl EstablishedClient { self.client.transaction(request).await.update_client(self) } + #[async_backtrace::framed] pub async fn member_list( &mut self, request: impl tonic::IntoRequest, @@ -197,6 +204,7 @@ impl EstablishedClient { self.client.member_list(request).await.update_client(self) } + #[async_backtrace::framed] pub async fn get_cluster_status( &mut self, request: impl tonic::IntoRequest, @@ -207,6 +215,7 @@ impl EstablishedClient { .update_client(self) } + #[async_backtrace::framed] pub async fn get_client_info( &mut self, request: impl tonic::IntoRequest, diff --git a/src/meta/client/src/grpc_client.rs b/src/meta/client/src/grpc_client.rs index f2afbe0e9116f..6d9bbfe011d2a 100644 --- a/src/meta/client/src/grpc_client.rs +++ b/src/meta/client/src/grpc_client.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; @@ -28,7 +29,6 @@ use databend_common_base::base::tokio::sync::mpsc; use databend_common_base::base::tokio::sync::mpsc::UnboundedReceiver; use databend_common_base::base::tokio::sync::mpsc::UnboundedSender; use databend_common_base::base::tokio::sync::oneshot; -use databend_common_base::base::tokio::sync::oneshot::Receiver as OneRecv; use databend_common_base::base::tokio::sync::oneshot::Sender as OneSend; use databend_common_base::base::tokio::time::sleep; use databend_common_base::containers::ItemManager; @@ -65,6 +65,7 @@ use databend_common_meta_types::MetaNetworkError; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; use databend_common_metrics::count::Count; +use fastrace::func_name; use fastrace::func_path; use fastrace::future::FutureExt as MTFutureExt; use fastrace::Span; @@ -139,6 +140,7 @@ impl MetaChannelManager { } } + #[async_backtrace::framed] async fn new_established_client( &self, addr: &String, @@ -198,6 +200,7 @@ impl MetaChannelManager { (client, once) } + #[async_backtrace::framed] async fn build_channel(&self, addr: &String) -> Result { info!("MetaChannelManager::build_channel to {}", addr); @@ -226,12 +229,14 @@ impl ItemManager for MetaChannelManager { #[logcall::logcall(err = "debug")] #[fastrace::trace] + #[async_backtrace::framed] async fn build(&self, addr: &Self::Key) -> Result { self.new_established_client(addr).await } #[logcall::logcall(err = "debug")] #[fastrace::trace] + #[async_backtrace::framed] async fn check(&self, ch: Self::Item) -> Result { // The underlying `tonic::transport::channel::Channel` reconnects when server is down. // But we still need to assert the readiness, e.g., when handshake token expires @@ -246,17 +251,52 @@ impl ItemManager for MetaChannelManager { /// A handle to access meta-client worker. /// The worker will be actually running in a dedicated runtime: `MetaGrpcClient.rt`. pub struct ClientHandle { + /// For debug purpose only. + pub endpoints: Vec, /// For sending request to meta-client worker. pub(crate) req_tx: UnboundedSender, /// Notify auto sync to stop. /// `oneshot::Receiver` impl `Drop` by sending a closed notification to the `Sender` half. #[allow(dead_code)] - cancel_auto_sync_rx: OneRecv<()>, + cancel_auto_sync_tx: oneshot::Sender<()>, + + /// The reference to the dedicated runtime. + /// + /// If all ClientHandle are dropped, the runtime will be destroyed. + /// + /// In order not to let a blocking operation(such as calling the new PipelinePullingExecutor) + /// in a tokio runtime block meta-client background tasks. + /// If a background task is blocked, no meta-client will be able to proceed if meta-client is reused. + /// + /// Note that a thread_pool tokio runtime does not help: + /// a scheduled tokio-task resides in `filo_slot` won't be stolen by other tokio-workers. + /// + /// This `rt` previously is stored in `MetaGrpcClient`, which leads to a deadlock: + /// - When all `ClientHandle` are dropped, the two workers `worker_loop()` and `auto_sync_interval()` + /// will quit. + /// - These two futures both held a reference to `MetaGrpcClient`. + /// - The last of these(say, `F`) two will drop `MetaGrpcClient.rt` and `Runtime::_dropper` + /// will block waiting for the runtime to shut down. + /// - But `F` is still held, deadlock occurs. + _rt: Arc, +} + +impl Display for ClientHandle { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "ClientHandle({})", self.endpoints.join(",")) + } +} + +impl Drop for ClientHandle { + fn drop(&mut self) { + info!("{} handle dropped", self); + } } impl ClientHandle { /// Send a request to the internal worker task, which will be running in another runtime. #[fastrace::trace] + #[async_backtrace::framed] pub async fn request(&self, req: Req) -> Result where Req: RequestFor, @@ -311,8 +351,8 @@ impl ClientHandle { }; debug!( - worker_request :? =(&worker_request); - "Meta ClientHandle send request to meta client worker" + "{} send request to meta client worker: request: {:?}", + self, worker_request ); self.req_tx.send(worker_request).map_err(|e| { @@ -358,19 +398,23 @@ impl ClientHandle { res } + #[async_backtrace::framed] pub async fn get_cluster_status(&self) -> Result { self.request(message::GetClusterStatus {}).await } + #[async_backtrace::framed] pub async fn get_client_info(&self) -> Result { self.request(message::GetClientInfo {}).await } + #[async_backtrace::framed] pub async fn make_established_client(&self) -> Result { self.request(message::MakeEstablishedClient {}).await } /// Return the endpoints list cached on this client. + #[async_backtrace::framed] pub async fn get_cached_endpoints(&self) -> Result, MetaError> { self.request(message::GetEndpoints {}).await } @@ -388,16 +432,8 @@ impl ClientHandle { pub struct MetaGrpcClient { conn_pool: Pool, endpoints: Arc>, + endpoints_str: Vec, auto_sync_interval: Option, - - /// Dedicated runtime to support meta client background tasks. - /// - /// In order not to let a blocking operation(such as calling the new PipelinePullingExecutor) in a tokio runtime block meta-client background tasks. - /// If a background task is blocked, no meta-client will be able to proceed if meta-client is reused. - /// - /// Note that a thread_pool tokio runtime does not help: a scheduled tokio-task resides in `filo_slot` won't be stolen by other tokio-workers. - #[allow(dead_code)] - rt: Arc, } impl Debug for MetaGrpcClient { @@ -409,6 +445,12 @@ impl Debug for MetaGrpcClient { } } +impl fmt::Display for MetaGrpcClient { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "MetaGrpcClient({})", self.endpoints_str.join(",")) + } +} + impl MetaGrpcClient { /// Create a new client of metasrv. /// @@ -433,26 +475,29 @@ impl MetaGrpcClient { #[fastrace::trace] pub fn try_create( - endpoints: Vec, + endpoints_str: Vec, username: &str, password: &str, timeout: Option, auto_sync_interval: Option, tls_config: Option, ) -> Result, MetaClientError> { - Self::endpoints_non_empty(&endpoints)?; + Self::endpoints_non_empty(&endpoints_str)?; - let endpoints = Arc::new(Mutex::new(Endpoints::new(endpoints))); + let endpoints = Arc::new(Mutex::new(Endpoints::new(endpoints_str.clone()))); let mgr = MetaChannelManager::new(username, password, timeout, tls_config, endpoints.clone()); - let rt = - Runtime::with_worker_threads(1, Some("meta-client-rt".to_string())).map_err(|e| { - MetaClientError::ClientRuntimeError( - AnyError::new(&e).add_context(|| "when creating meta-client"), - ) - })?; + let rt = Runtime::with_worker_threads( + 1, + Some(format!("meta-client-rt-{}", endpoints_str.join(","))), + ) + .map_err(|e| { + MetaClientError::ClientRuntimeError( + AnyError::new(&e).add_context(|| "when creating meta-client"), + ) + })?; let rt = Arc::new(rt); // Build the handle-worker pair @@ -461,41 +506,50 @@ impl MetaGrpcClient { let (one_tx, one_rx) = oneshot::channel::<()>(); let handle = Arc::new(ClientHandle { + endpoints: endpoints_str.clone(), req_tx: tx, - cancel_auto_sync_rx: one_rx, + cancel_auto_sync_tx: one_tx, + _rt: rt.clone(), }); let worker = Arc::new(Self { conn_pool: Pool::new(mgr, Duration::from_millis(50)), endpoints, + endpoints_str, auto_sync_interval, - rt: rt.clone(), }); - rt.spawn(UnlimitedFuture::create(Self::worker_loop( - worker.clone(), - rx, - ))); - rt.spawn(UnlimitedFuture::create(Self::auto_sync_endpoints( - worker, one_tx, - ))); + let worker_name = worker.to_string(); + + rt.try_spawn( + UnlimitedFuture::create(Self::worker_loop(worker.clone(), rx)), + Some(format!("{}::worker_loop()", worker_name)), + ) + .unwrap(); + + rt.try_spawn( + UnlimitedFuture::create(Self::auto_sync_endpoints(worker, one_rx)), + Some(format!("{}::auto_sync_endpoints()", worker_name)), + ) + .unwrap(); Ok(handle) } /// A worker runs a receiving-loop to accept user-request to metasrv and deals with request in the dedicated runtime. #[fastrace::trace] + #[async_backtrace::framed] async fn worker_loop(self: Arc, mut req_rx: UnboundedReceiver) { - info!("MetaGrpcClient::worker spawned"); + info!("{}::worker spawned", self); loop { let recv_res = req_rx.recv().await; let Some(mut worker_request) = recv_res else { - warn!("MetaGrpcClient handle closed. worker quit"); + warn!("{} handle closed. worker quit", self); return; }; - debug!(worker_request :? =(&worker_request); "MetaGrpcClient worker handle request"); + debug!(worker_request :? =(&worker_request); "{} worker handle request", self); let _guard = ThreadTracker::tracking(worker_request.tracking_payload.take().unwrap()); let span = Span::enter_with_parent(func_path!(), &worker_request.span); @@ -503,7 +557,7 @@ impl MetaGrpcClient { if worker_request.resp_tx.is_closed() { info!( req :? =(&worker_request.req); - "MetaGrpcClient request.resp_tx is closed, cancel handling this request" + "{} request.resp_tx is closed, cancel handling this request", self ); continue; } @@ -514,22 +568,29 @@ impl MetaGrpcClient { message::Request::GetEndpoints(_) => { let endpoints = self.get_all_endpoints(); let resp = Response::GetEndpoints(Ok(endpoints)); - Self::send_response(worker_request.resp_tx, worker_request.request_id, resp); + Self::send_response( + self.clone(), + worker_request.resp_tx, + worker_request.request_id, + resp, + ); continue; } _ => {} } - databend_common_base::runtime::spawn( + databend_common_base::runtime::spawn_named( self.clone() .handle_rpc_request(worker_request) .in_span(span), + format!("{}::handle_rpc_request()", self), ); } } /// Handle a RPC request in a separate task. #[fastrace::trace] + #[async_backtrace::framed] async fn handle_rpc_request(self: Arc, worker_request: ClientWorkerRequest) { let request_id = worker_request.request_id; let resp_tx = worker_request.resp_tx; @@ -600,22 +661,20 @@ impl MetaGrpcClient { self.update_rpc_metrics(req_name, &req_str, request_id, start, resp.err()); - Self::send_response(resp_tx, request_id, resp); + Self::send_response(self.clone(), resp_tx, request_id, resp); } - fn send_response(tx: OneSend, request_id: u64, resp: Response) { + fn send_response(self: Arc, tx: OneSend, request_id: u64, resp: Response) { debug!( - request_id :? =(&request_id), - resp :? =(&resp); - "MetaGrpcClient send response to the handle" + "{} send response to the handle; request_id={}, resp={:?}", + self, request_id, resp ); let send_res = tx.send(resp); if let Err(err) = send_res { error!( - request_id :% =(request_id), - err :? =(&err); - "MetaGrpcClient failed to send response to the handle. recv-end closed" + "{} failed to send response to the handle. recv-end closed; request_id={}, error={:?}", + self, request_id, err ); } } @@ -641,12 +700,8 @@ impl MetaGrpcClient { if elapsed > 1000_f64 { warn!( - request_id :% =(request_id); - "MetaGrpcClient slow request {} to {} takes {} ms: {}", - req_name, - endpoint, - elapsed, - req_str, + "{} slow request {} to {} takes {} ms; request_id={}; request: {}", + self, req_name, endpoint, elapsed, request_id, req_str, ); } } @@ -654,10 +709,7 @@ impl MetaGrpcClient { // Error metrics if let Some(err) = resp_err { grpc_metrics::incr_meta_grpc_client_request_failed(&endpoint, req_name, err); - error!( - request_id :% =(request_id); - "MetaGrpcClient error: {:?}", err - ); + error!("{} request_id={} error: {:?}", self, request_id, err); } else { grpc_metrics::incr_meta_grpc_client_request_success(&endpoint, req_name); } @@ -665,6 +717,7 @@ impl MetaGrpcClient { /// Return a client for communication, and a server version in form of `{major:03}.{minor:03}.{patch:03}`. #[fastrace::trace] + #[async_backtrace::framed] pub async fn get_established_client(&self) -> Result { let (endpoints_str, n) = { let eps = self.endpoints.lock(); @@ -672,7 +725,7 @@ impl MetaGrpcClient { }; debug_assert!(n > 0); - debug!("meta-service endpoints: {}", endpoints_str); + debug!("{}::{}; endpoints: {}", self, func_name!(), endpoints_str); let mut last_err = None::; @@ -682,7 +735,7 @@ impl MetaGrpcClient { es.current_or_next().to_string() }; - debug!("get or build ReadClient to {}", addr); + debug!("{} get or build ReadClient to {}", self, addr); let res = self.conn_pool.get(&addr).await; @@ -730,6 +783,7 @@ impl MetaGrpcClient { } #[fastrace::trace] + #[async_backtrace::framed] pub async fn set_endpoints(&self, endpoints: Vec) -> Result<(), MetaError> { Self::endpoints_non_empty(&endpoints)?; @@ -753,6 +807,7 @@ impl MetaGrpcClient { } #[fastrace::trace] + #[async_backtrace::framed] pub async fn sync_endpoints(&self) -> Result<(), MetaError> { let mut client = self.get_established_client().await?; let result = client @@ -782,21 +837,27 @@ impl MetaGrpcClient { Ok(()) } - async fn auto_sync_endpoints(self: Arc, mut cancel_tx: OneSend<()>) { + #[fastrace::trace] + #[async_backtrace::framed] + async fn auto_sync_endpoints(self: Arc, mut cancel_rx: oneshot::Receiver<()>) { info!( - "start auto sync endpoints: interval: {:?}", - self.auto_sync_interval + "{} start auto_sync_endpoints: interval: {:?}", + self, self.auto_sync_interval ); + if let Some(interval) = self.auto_sync_interval { loop { + debug!("{} auto_sync_endpoints loop start", self); + select! { - _ = cancel_tx.closed() => { + _ = &mut cancel_rx => { + info!("{} auto_sync_endpoints received quit signal, quit", self); return; } _ = sleep(interval) => { let r = self.sync_endpoints().await; if let Err(e) = r { - warn!("auto sync endpoints failed: {:?}", e); + warn!("{} auto_sync_endpoints failed: {:?}", self, e); } } } @@ -841,6 +902,7 @@ impl MetaGrpcClient { /// S.ver: 2 3 4 /// ``` #[fastrace::trace] + #[async_backtrace::framed] pub async fn handshake( client: &mut RealClient, client_ver: &Version, @@ -916,14 +978,12 @@ impl MetaGrpcClient { /// Create a watching stream that receives KV change events. #[fastrace::trace] + #[async_backtrace::framed] pub(crate) async fn watch( &self, watch_request: WatchRequest, ) -> Result, MetaError> { - debug!( - watch_request :? =(&watch_request); - "MetaGrpcClient worker: handle watch request" - ); + debug!("{}: handle watch request: {:?}", self, watch_request); let mut client = self.get_established_client().await?; let res = client.watch(watch_request).await?; @@ -932,13 +992,14 @@ impl MetaGrpcClient { /// Export all data in json from metasrv. #[fastrace::trace] + #[async_backtrace::framed] pub(crate) async fn export( &self, export_request: message::ExportReq, ) -> Result, MetaError> { debug!( - export_request :? =(&export_request); - "MetaGrpcClient worker: handle export request" + "{} worker: handle export request: {:?}", + self, export_request ); let mut client = self.get_established_client().await?; @@ -957,8 +1018,9 @@ impl MetaGrpcClient { /// Get cluster status #[fastrace::trace] + #[async_backtrace::framed] pub(crate) async fn get_cluster_status(&self) -> Result { - debug!("MetaGrpcClient::get_cluster_status"); + debug!("{}::get_cluster_status", self); let mut client = self.get_established_client().await?; let res = client.get_cluster_status(Empty {}).await?; @@ -967,8 +1029,9 @@ impl MetaGrpcClient { /// Export all data in json from metasrv. #[fastrace::trace] + #[async_backtrace::framed] pub(crate) async fn get_client_info(&self) -> Result { - debug!("MetaGrpcClient::get_client_info"); + debug!("{}::get_client_info", self); let mut client = self.get_established_client().await?; let res = client.get_client_info(Empty {}).await?; @@ -976,6 +1039,7 @@ impl MetaGrpcClient { } #[fastrace::trace] + #[async_backtrace::framed] pub(crate) async fn kv_api(&self, v: T) -> Result where T: RequestFor, @@ -984,10 +1048,7 @@ impl MetaGrpcClient { { let grpc_req: MetaGrpcReq = v.into(); - debug!( - req :? =(&grpc_req); - "MetaGrpcClient::kv_api request" - ); + debug!("{}::kv_api request: {:?}", self, grpc_req); let raft_req: RaftRequest = grpc_req.into(); @@ -1047,14 +1108,12 @@ impl MetaGrpcClient { } #[fastrace::trace] + #[async_backtrace::framed] pub(crate) async fn kv_read_v1( &self, grpc_req: MetaGrpcReadReq, ) -> Result, MetaError> { - debug!( - req :? =(&grpc_req); - "MetaGrpcClient::kv_read_v1 request" - ); + debug!("{}::kv_read_v1 request: {:?}", self, grpc_req); let mut failures = vec![]; @@ -1076,17 +1135,18 @@ impl MetaGrpcClient { .await; debug!( - result :? =(&result); - "MetaGrpcClient::kv_read_v1 result, {}-th try", i + "{}::kv_read_v1 result, {}-th try; result: {:?}", + self, i, result ); if let Err(ref e) = result { warn!( - req :? =(&grpc_req), - error :? =(&e); - "MetaGrpcClient::kv_read_v1 error, retryable: {}, target={}", + "{}::kv_read_v1 error, retryable: {}, target={}; error: {:?}; request: {:?}", + self, is_status_retryable(e), - established_client.target_endpoint() + established_client.target_endpoint(), + e, + grpc_req ); if is_status_retryable(e) { @@ -1113,13 +1173,11 @@ impl MetaGrpcClient { } #[fastrace::trace] + #[async_backtrace::framed] pub(crate) async fn transaction(&self, req: TxnRequest) -> Result { let txn: TxnRequest = req; - debug!( - req :% =(&txn); - "MetaGrpcClient::transaction request" - ); + debug!("{}::transaction request: {}", self, txn); let req = traced_req(txn.clone()); @@ -1143,10 +1201,7 @@ impl MetaGrpcClient { let reply = result?; - debug!( - reply :% =(&reply); - "MetaGrpcClient::transaction reply" - ); + debug!("{}::transaction reply: {}", self, reply); Ok(reply) } @@ -1162,7 +1217,7 @@ impl MetaGrpcClient { es.choose_next().to_string() }; - info!("MetaGrpcClient choose_next_endpoint: {}", next); + info!("{} choose_next_endpoint: {}", self, next); } } diff --git a/src/query/service/src/servers/http/clickhouse_handler.rs b/src/query/service/src/servers/http/clickhouse_handler.rs index f8a8e99e56a26..734fea6f44b4f 100644 --- a/src/query/service/src/servers/http/clickhouse_handler.rs +++ b/src/query/service/src/servers/http/clickhouse_handler.rs @@ -141,74 +141,82 @@ async fn execute( // // P.S. I think it will be better/more reasonable if we could avoid using pthread_join inside an async stack. - ctx.try_spawn({ - let ctx = ctx.clone(); - async move { - let mut data_stream = interpreter.execute(ctx.clone()).await?; - let table_schema = infer_table_schema(&schema)?; - let mut output_format = FileFormatOptionsExt::get_output_format_from_clickhouse_format( - format, - table_schema, - &ctx.get_settings(), - )?; - - let prefix = Ok(output_format.serialize_prefix()?); - - let compress_fn = move |rb: Result>| -> Result> { - if params.compress() { - match rb { - Ok(b) => compress_block(b), - Err(e) => Err(e), + ctx.try_spawn( + { + let ctx = ctx.clone(); + async move { + let mut data_stream = interpreter.execute(ctx.clone()).await?; + let table_schema = infer_table_schema(&schema)?; + let mut output_format = + FileFormatOptionsExt::get_output_format_from_clickhouse_format( + format, + table_schema, + &ctx.get_settings(), + )?; + + let prefix = Ok(output_format.serialize_prefix()?); + + let compress_fn = move |rb: Result>| -> Result> { + if params.compress() { + match rb { + Ok(b) => compress_block(b), + Err(e) => Err(e), + } + } else { + rb } - } else { - rb - } - }; - - // try to catch runtime error before http response, so user can client can get http 500 - let first_block = match data_stream.next().await { - Some(block) => match block { - Ok(block) => Some(compress_fn(output_format.serialize_block(&block))), - Err(err) => return Err(err), - }, - None => None, - }; - - let session = ctx.get_current_session(); - let stream = stream! { - yield compress_fn(prefix); - let mut ok = true; - // do not pull data_stream if we already meet a None - if let Some(block) = first_block { - yield block; - while let Some(block) = data_stream.next().await { - match block{ - Ok(block) => { - yield compress_fn(output_format.serialize_block(&block)); - }, - Err(err) => { - let message = format!("{}", err); - yield compress_fn(Ok(message.into_bytes())); - ok = false; - break - } - }; + }; + + // try to catch runtime error before http response, so user can client can get http 500 + let first_block = match data_stream.next().await { + Some(block) => match block { + Ok(block) => Some(compress_fn(output_format.serialize_block(&block))), + Err(err) => return Err(err), + }, + None => None, + }; + + let session = ctx.get_current_session(); + let stream = stream! { + yield compress_fn(prefix); + let mut ok = true; + // do not pull data_stream if we already meet a None + if let Some(block) = first_block { + yield block; + while let Some(block) = data_stream.next().await { + match block{ + Ok(block) => { + yield compress_fn(output_format.serialize_block(&block)); + }, + Err(err) => { + let message = format!("{}", err); + yield compress_fn(Ok(message.into_bytes())); + ok = false; + break + } + }; + } } + if ok { + yield compress_fn(output_format.finalize()); + } + // to hold session ref until stream is all consumed + let _ = session.get_id(); + }; + if let Some(handle) = handle { + handle.await.expect("must") } - if ok { - yield compress_fn(output_format.finalize()); - } - // to hold session ref until stream is all consumed - let _ = session.get_id(); - }; - if let Some(handle) = handle { - handle.await.expect("must") - } - let stream = stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)); - Ok(Body::from_bytes_stream(stream).with_content_type(format_typ.get_content_type())) - } - })? + let stream = + stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)); + Ok( + Body::from_bytes_stream(stream) + .with_content_type(format_typ.get_content_type()), + ) + } + }, + None, + )? .await .map_err(|err| { ErrorCode::from_string(format!( diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index c42e654f6d580..88af5abffa3b6 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -566,6 +566,7 @@ impl HttpQuery { } } .in_span(span), + None, )?; let data = Arc::new(TokioMutex::new(PageManager::new( diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 35046f59b8cd5..8856572abacf7 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -408,24 +408,27 @@ impl InteractiveWorkerBase { )> { let instant = Instant::now(); - let query_result = context.try_spawn({ - let ctx = context.clone(); - async move { - let mut data_stream = interpreter.execute(ctx.clone()).await?; - observe_mysql_interpreter_used_time(instant.elapsed()); - - // Wrap the data stream, log finish event at the end of stream - let intercepted_stream = async_stream::stream! { - - while let Some(item) = data_stream.next().await { - yield item + let query_result = context.try_spawn( + { + let ctx = context.clone(); + async move { + let mut data_stream = interpreter.execute(ctx.clone()).await?; + observe_mysql_interpreter_used_time(instant.elapsed()); + + // Wrap the data stream, log finish event at the end of stream + let intercepted_stream = async_stream::stream! { + + while let Some(item) = data_stream.next().await { + yield item + }; }; - }; - Ok::<_, ErrorCode>(intercepted_stream.boxed()) - } - .in_span(Span::enter_with_local_parent(func_path!())) - })?; + Ok::<_, ErrorCode>(intercepted_stream.boxed()) + } + .in_span(Span::enter_with_local_parent(func_path!())) + }, + None, + )?; let query_result = query_result.await.map_err_to_code( ErrorCode::TokioError, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index c3987e5bea23b..8bd57e9229813 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1426,12 +1426,12 @@ impl TableContext for QueryContext { impl TrySpawn for QueryContext { /// Spawns a new asynchronous task, returning a tokio::JoinHandle for it. /// The task will run in the current context thread_pool not the global. - fn try_spawn(&self, task: T) -> Result> + fn try_spawn(&self, task: T, name: Option) -> Result> where T: Future + Send + 'static, T::Output: Send + 'static, { - Ok(self.shared.try_get_runtime()?.spawn(task)) + self.shared.try_get_runtime()?.try_spawn(task, name) } } diff --git a/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs b/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs index 85dd7ddbb132c..659f4a10bd808 100644 --- a/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs +++ b/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs @@ -209,7 +209,7 @@ where #[async_backtrace::framed] async fn execute_in_runtime(self, runtime: &Runtime) -> Result { runtime - .try_spawn(self)? + .try_spawn(self, None)? .await .map_err(|e| ErrorCode::TokioError(format!("runtime join error. {}", e))) } diff --git a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs index 8a958662c6f80..0de5484253890 100644 --- a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs +++ b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs @@ -64,7 +64,7 @@ where #[async_backtrace::framed] async fn execute_in_runtime(self, runtime: &Runtime) -> Result { runtime - .try_spawn(self)? + .try_spawn(self, None)? .await .map_err(|e| ErrorCode::TokioError(format!("runtime join error. {}", e))) }