From e910f6c0e5481777138c4c1ea7db7dea759fdc0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 24 Sep 2024 17:15:38 +0800 Subject: [PATCH] refactor: remove get_pb_value() --- src/meta/api/src/lib.rs | 1 - src/meta/api/src/schema_api_impl.rs | 54 ++++++++++------------------- src/meta/api/src/util.rs | 16 --------- 3 files changed, 18 insertions(+), 53 deletions(-) diff --git a/src/meta/api/src/lib.rs b/src/meta/api/src/lib.rs index 7df722587b65..4299067172b2 100644 --- a/src/meta/api/src/lib.rs +++ b/src/meta/api/src/lib.rs @@ -52,7 +52,6 @@ pub use util::db_has_to_exist; pub use util::deserialize_struct; pub use util::deserialize_u64; pub use util::fetch_id; -pub use util::get_pb_value; pub use util::get_u64_value; pub use util::list_keys; pub use util::list_u64_value; diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 9e84ab0aa1fd..dfcfb422ab1c 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -53,7 +53,6 @@ use databend_common_meta_app::app_error::UnknownTable; use databend_common_meta_app::app_error::UnknownTableId; use databend_common_meta_app::app_error::ViewAlreadyExists; use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent; -use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::schema::catalog_id_ident::CatalogId; use databend_common_meta_app::schema::catalog_name_ident::CatalogNameIdentRaw; @@ -195,7 +194,6 @@ use crate::assert_table_exist; use crate::db_has_to_exist; use crate::deserialize_struct; use crate::fetch_id; -use crate::get_pb_value; use crate::get_u64_value; use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; @@ -220,6 +218,7 @@ use crate::util::deserialize_struct_get_response; use crate::util::mget_pb_values; use crate::util::txn_delete_exact; use crate::util::txn_op_put_pb; +use crate::util::txn_replace_exact; use crate::util::unknown_database_error; use crate::SchemaApi; use crate::DEFAULT_MGET_SIZE; @@ -3154,8 +3153,8 @@ async fn construct_drop_table_txn_operations( let tbid = TableId { table_id }; // Check if table exists. - let (tb_meta_seq, tb_meta): (_, Option) = get_pb_value(kv_api, &tbid).await?; - if tb_meta_seq == 0 || tb_meta.is_none() { + let (tb_meta_seq, tb_meta) = kv_api.get_pb_seq_and_value(&tbid).await?; + if tb_meta_seq == 0 { return Err(KVAppError::AppError(AppError::UnknownTableId( UnknownTableId::new(table_id, "drop_table_by_id failed to find valid tb_meta"), ))); @@ -3163,8 +3162,7 @@ async fn construct_drop_table_txn_operations( // Get db name, tenant name and related info for tx. let table_id_to_name = TableIdToName { table_id }; - let (_, table_name_opt): (_, Option) = - get_pb_value(kv_api, &table_id_to_name).await?; + let (_, table_name_opt) = kv_api.get_pb_seq_and_value(&table_id_to_name).await?; let dbid_tbname = if let Some(db_id_table_name) = table_name_opt { db_id_table_name @@ -3246,8 +3244,8 @@ async fn construct_drop_table_txn_operations( database_id: db_id, table_name: dbid_tbname.table_name.clone(), }; - let (tb_id_list_seq, _tb_id_list_opt): (_, Option) = - get_pb_value(kv_api, &dbid_tbname_idlist).await?; + let (tb_id_list_seq, _tb_id_list_opt) = + kv_api.get_pb_seq_and_value(&dbid_tbname_idlist).await?; if tb_id_list_seq == 0 { let mut tb_id_list = TableIdList::new(); tb_id_list.append(table_id); @@ -3334,8 +3332,7 @@ async fn drop_database_meta( // add DbIdListKey if not exists let dbid_idlist = DatabaseIdHistoryIdent::new(tenant_dbname.tenant(), tenant_dbname.database_name()); - let (db_id_list_seq, db_id_list_opt): (_, Option) = - get_pb_value(kv_api, &dbid_idlist).await?; + let (db_id_list_seq, db_id_list_opt) = kv_api.get_pb_seq_and_value(&dbid_idlist).await?; if db_id_list_seq == 0 || db_id_list_opt.is_none() { warn!( @@ -3482,7 +3479,7 @@ pub(crate) async fn get_db_by_id_or_err( ) -> Result<(u64, DatabaseMeta), KVAppError> { let id_key = DatabaseId { db_id }; - let (db_meta_seq, db_meta) = get_pb_value(kv_api, &id_key).await?; + let (db_meta_seq, db_meta) = kv_api.get_pb_seq_and_value(&id_key).await?; db_id_has_to_exist(db_meta_seq, db_id, msg)?; Ok(( @@ -3712,8 +3709,7 @@ async fn gc_dropped_db_by_id( ) -> Result<(), KVAppError> { // List tables by tenant, db_id, table_name. let dbid_idlist = DatabaseIdHistoryIdent::new(tenant, db_name); - let (db_id_list_seq, db_id_list_opt): (_, Option) = - get_pb_value(kv_api, &dbid_idlist).await?; + let (db_id_list_seq, db_id_list_opt) = kv_api.get_pb_seq_and_value(&dbid_idlist).await?; let mut db_id_list = match db_id_list_opt { Some(list) => list, @@ -3724,14 +3720,12 @@ async fn gc_dropped_db_by_id( continue; } let dbid = DatabaseId { db_id }; - let (db_meta_seq, _db_meta): (_, Option) = - get_pb_value(kv_api, &dbid).await?; + let (db_meta_seq, _db_meta) = kv_api.get_pb_seq_and_value(&dbid).await?; if db_meta_seq == 0 { return Ok(()); } let id_to_name = DatabaseIdToName { db_id }; - let (name_ident_seq, _name_ident): (_, Option) = - get_pb_value(kv_api, &id_to_name).await?; + let (name_ident_seq, _name_ident) = kv_api.get_pb_seq_and_value(&id_to_name).await?; if name_ident_seq == 0 { return Ok(()); } @@ -3946,18 +3940,13 @@ async fn update_mask_policy( key: MaskPolicyTableIdListIdent, f: impl FnOnce(&mut BTreeSet), ) -> Result<(), KVAppError> { - let (id_list_seq, id_list_opt): (_, Option) = - get_pb_value(kv_api, &key).await?; + let Some(mut seq_list) = kv_api.get_pb(&key).await? else { + return Ok(()); + }; - if let Some(mut id_list) = id_list_opt { - f(&mut id_list.id_list); + f(&mut seq_list.data.id_list); - txn_req.condition.push(txn_cond_seq(&key, Eq, id_list_seq)); - - txn_req - .if_then - .push(txn_op_put(&key, serialize_struct(&id_list)?)); - } + txn_replace_exact(txn_req, &key, seq_list.seq, &seq_list.data)?; Ok(()) } @@ -4127,20 +4116,13 @@ async fn handle_undrop_table( database_id: db_id, table_name: tenant_dbname_tbname.table_name.clone(), }; - let (tb_id_list_seq, tb_id_list_opt): (_, Option) = - get_pb_value(kv_api, &dbid_tbname_idlist).await?; - let mut tb_id_list = if tb_id_list_seq == 0 { + let Some(seq_list) = kv_api.get_pb(&dbid_tbname_idlist).await? else { return Err(KVAppError::AppError(AppError::UndropTableHasNoHistory( UndropTableHasNoHistory::new(&tenant_dbname_tbname.table_name), ))); - } else { - tb_id_list_opt.ok_or_else(|| { - KVAppError::AppError(AppError::UndropTableHasNoHistory( - UndropTableHasNoHistory::new(&tenant_dbname_tbname.table_name), - )) - })? }; + let mut tb_id_list = seq_list.data; let table_id = req.extract_and_validate_table_id(&mut tb_id_list)?; diff --git a/src/meta/api/src/util.rs b/src/meta/api/src/util.rs index 90debd670d70..72526372d0b7 100644 --- a/src/meta/api/src/util.rs +++ b/src/meta/api/src/util.rs @@ -28,7 +28,6 @@ use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::seq_value::SeqV; -use databend_common_meta_types::seq_value::SeqValue; use databend_common_meta_types::txn_condition::Target; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::InvalidArgument; @@ -122,21 +121,6 @@ where K: kvapi::Key { } } -/// Get value that are encoded with FromToProto. -/// -/// It returns seq number and the data. -pub async fn get_pb_value( - kv_api: &(impl kvapi::KVApi + ?Sized), - k: &K, -) -> Result<(u64, Option), MetaError> -where - K: kvapi::Key, - K::ValueType: FromToProto, -{ - let res = kv_api.get_pb(k).await?; - Ok((res.seq(), res.into_value())) -} - /// Batch get values that are encoded with FromToProto. pub async fn mget_pb_values( kv_api: &(impl kvapi::KVApi + ?Sized),