From 17283cf73f5ddfb6f401ca9bd221679c8110a8aa Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 10 Oct 2024 15:26:44 +0800 Subject: [PATCH] check lock revision before watch delete --- src/query/service/src/locks/lock_manager.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/locks/lock_manager.rs b/src/query/service/src/locks/lock_manager.rs index 766f256ee79b..e1b86aa0f1c2 100644 --- a/src/query/service/src/locks/lock_manager.rs +++ b/src/query/service/src/locks/lock_manager.rs @@ -26,6 +26,7 @@ use databend_common_catalog::lock::Lock; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_meta_api::kv_pb_api::KVPbApi; use databend_common_meta_app::schema::CreateLockRevReq; use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::ExtendLockRevReq; @@ -162,14 +163,15 @@ impl LockManager { break; } + let elapsed = start.elapsed(); // if no need retry, return error directly. - if !should_retry { + if !should_retry || elapsed >= duration { catalog .delete_lock_revision(delete_table_lock_req.clone()) .await?; return Err(ErrorCode::TableAlreadyLocked(format!( "table is locked by other session, please retry later(elapsed: {:?})", - start.elapsed() + elapsed ))); } @@ -182,8 +184,18 @@ impl LockManager { filter_type: FilterType::Delete.into(), }; let mut watch_stream = meta_api.watch(req).await?; + + let lock_meta = meta_api.get_pb(&watch_delete_ident).await?; + if lock_meta.is_none() { + log::warn!( + "Lock revision '{}' already does not exist, skipping", + rev_list[position - 1] + ); + continue; + } + // Add a timeout period for watch. - match timeout(duration, async move { + match timeout(duration.abs_diff(elapsed), async move { while let Some(Ok(resp)) = watch_stream.next().await { if let Some(event) = resp.event { if event.current.is_none() {