Skip to content

Commit

Permalink
refactor: adopt get_pb() for get_background_task (#16308)
Browse files Browse the repository at this point in the history
* refactor: get_db_or_error returns SeqV<T>

* refactor: adopt `get_pb()` for get_background_task
  • Loading branch information
drmingdrmer authored Aug 22, 2024
1 parent 152589f commit 6bb25f2
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 69 deletions.
23 changes: 12 additions & 11 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
"BackgroundTaskApi: {}",
func_name!()
);
let name = &req.name;
let (_, resp) = get_background_task_by_name(self, name).await?;
Ok(GetBackgroundTaskReply { task_info: resp })

let resp = self.get_pb(&req.name).await?;
Ok(GetBackgroundTaskReply {
task_info: resp.into_value(),
})
}
}

Expand Down Expand Up @@ -332,19 +334,18 @@ pub fn assert_background_job_exist(
) -> Result<(), AppError> {
if seq == 0 {
debug!(seq = seq, name_ident :? =(name_ident); "background job does not exist");
let unknown = UnknownBackgroundJob::new(name_ident.job_name(), format!("{:?}", name_ident));
Err(AppError::UnknownBackgroundJob(unknown))
let err = unknown_background_job(name_ident);
Err(err)
} else {
Ok(())
}
}

async fn get_background_task_by_name(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
id: &BackgroundTaskIdent,
) -> Result<(u64, Option<BackgroundTaskInfo>), KVAppError> {
let (seq, res) = get_pb_value(kv_api, id).await?;
Ok((seq, res))
pub fn unknown_background_job(name_ident: &BackgroundJobIdent) -> AppError {
AppError::UnknownBackgroundJob(UnknownBackgroundJob::new(
name_ident.job_name(),
format!("{:?}", name_ident),
))
}

async fn update_background_job<F: FnOnce(&mut BackgroundJobInfo) -> bool>(
Expand Down
94 changes: 47 additions & 47 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ use databend_common_meta_app::app_error::VirtualColumnAlreadyExists;
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::primitive::Id;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdentRaw;
use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent;
Expand Down Expand Up @@ -753,10 +752,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let name_key = &req.inner;

let (_, db_id, db_meta) = get_db_or_err(self, name_key, "get_database").await?;
let (seq_db_id, db_meta) = get_db_or_err(self, name_key, "get_database").await?;

let db = DatabaseInfo {
database_id: DatabaseId::new(db_id),
database_id: seq_db_id.data,
name_ident: name_key.clone(),
meta: db_meta,
};
Expand Down Expand Up @@ -1583,8 +1582,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let seq_db_id = get_db_id_or_err(self, &tenant_dbname, "create_table").await?;

// fixed
let key_dbid = seq_db_id.data.into_inner();
let save_db_id = seq_db_id.data.into_inner();
let key_dbid = seq_db_id.data;
let save_db_id = seq_db_id.data;

// fixed
let key_dbid_tbname = DBIdTableName {
Expand Down Expand Up @@ -1899,7 +1898,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// Get db by name to ensure presence

let (_, db_id, db_meta) = get_db_or_err(self, &tenant_dbname, "rename_table").await?;
let (seq_db_id, db_meta) = get_db_or_err(self, &tenant_dbname, "rename_table").await?;

// cannot operate on shared database
if let Some(from_share) = &db_meta.from_share {
Expand All @@ -1911,7 +1910,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// Get table by db_id, table_name to assert presence.

let dbid_tbname = DBIdTableName {
db_id,
db_id: *seq_db_id.data,
table_name: tenant_dbname_tbname.table_name.clone(),
};

Expand All @@ -1934,7 +1933,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// get table id list from _fd_table_id_list/db_id/table_name
let dbid_tbname_idlist = TableIdHistoryIdent {
database_id: db_id,
database_id: *seq_db_id.data,
table_name: req.name_ident.table_name.clone(),
};
let (tb_id_list_seq, tb_id_list_opt): (_, Option<TableIdList>) =
Expand Down Expand Up @@ -1981,20 +1980,20 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// Get the renaming target db to ensure presence.

let tenant_newdbname = DatabaseNameIdent::new(tenant_dbname.tenant(), &req.new_db_name);
let (_, new_db_id, new_db_meta) =
let (new_seq_db_id, new_db_meta) =
get_db_or_err(self, &tenant_newdbname, "rename_table: new db").await?;

// Get the renaming target table to ensure absence

let newdbid_newtbname = DBIdTableName {
db_id: new_db_id,
db_id: *new_seq_db_id.data,
table_name: req.new_table_name.clone(),
};
let (new_tb_id_seq, _new_tb_id) = get_u64_value(self, &newdbid_newtbname).await?;
table_has_to_not_exist(new_tb_id_seq, &tenant_newdbname_newtbname, "rename_table")?;

let new_dbid_tbname_idlist = TableIdHistoryIdent {
database_id: new_db_id,
database_id: *new_seq_db_id.data,
table_name: req.new_table_name.clone(),
};
let (new_tb_id_list_seq, new_tb_id_list_opt): (_, Option<TableIdList>) =
Expand All @@ -2012,7 +2011,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let (table_id_to_name_seq, _): (_, Option<DBIdTableName>) =
get_pb_value(self, &table_id_to_name_key).await?;
let db_id_table_name = DBIdTableName {
db_id: new_db_id,
db_id: *new_seq_db_id.data,
table_name: req.new_table_name.clone(),
};

Expand All @@ -2025,8 +2024,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
condition: vec![
// db has not to change, i.e., no new table is created.
// Renaming db is OK and does not affect the seq of db_meta.
txn_cond_seq(&DatabaseId { db_id }, Eq, db_meta.seq),
txn_cond_seq(&DatabaseId { db_id: new_db_id }, Eq, new_db_meta.seq),
txn_cond_seq(&seq_db_id.data, Eq, db_meta.seq),
txn_cond_seq(&new_seq_db_id.data, Eq, new_db_meta.seq),
// table_name->table_id does not change.
// Updating the table meta is ok.
txn_cond_seq(&dbid_tbname, Eq, tb_id_seq),
Expand All @@ -2041,18 +2040,18 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put(&DatabaseId { db_id }, serialize_struct(&*db_meta)?), /* (db_id) -> db_meta */
txn_op_put(&seq_db_id.data, serialize_struct(&*db_meta)?), /* (db_id) -> db_meta */
txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/old_table_name -> tb_id_list */
txn_op_put(&new_dbid_tbname_idlist, serialize_struct(&new_tb_id_list)?), /* _fd_table_id_list/db_id/new_table_name -> tb_id_list */
txn_op_put(&table_id_to_name_key, serialize_struct(&db_id_table_name)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
],
else_then: vec![],
};

if db_id != new_db_id {
if *seq_db_id.data != *new_seq_db_id.data {
txn.if_then.push(
txn_op_put(
&DatabaseId { db_id: new_db_id },
&new_seq_db_id.data,
serialize_struct(&*new_db_meta)?,
), // (db_id) -> db_meta
);
Expand Down Expand Up @@ -2104,7 +2103,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn.if_then
.push(txn_op_put(&tbid, serialize_struct(&table_meta)?));

let share_object = ReplyShareObject::Table(db_id, table_id);
let share_object = ReplyShareObject::Table(*seq_db_id.data, table_id);
Some((spec_vec, share_object))
} else {
None
Expand Down Expand Up @@ -2153,7 +2152,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
)
.await;

let (_db_id_seq, db_id, db_meta) = match res {
let (seq_db_id, db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
Expand All @@ -2176,7 +2175,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// Get table by tenant,db_id, table_name to assert presence.

let dbid_tbname = DBIdTableName {
db_id,
db_id: *seq_db_id.data,
table_name: tenant_dbname_tbname.table_name.clone(),
};

Expand Down Expand Up @@ -2245,7 +2244,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
)
.await;

let (_db_id_seq, db_id, _db_meta) = match res {
let (seq_db_id, _db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
Expand All @@ -2254,7 +2253,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// List tables by tenant, db_id, table_name.
let table_id_history_ident = TableIdHistoryIdent {
database_id: db_id,
database_id: *seq_db_id.data,
table_name: "dummy".to_string(),
};

Expand All @@ -2268,7 +2267,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.iter()
.map(|table_id_list_key| {
TableIdHistoryIdent {
database_id: db_id,
database_id: *seq_db_id.data,
table_name: table_id_list_key.table_name.clone(),
}
.to_string_key()
Expand Down Expand Up @@ -2373,15 +2372,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
)
.await;

let (_db_id_seq, db_id, db_meta) = match res {
let (seq_db_id, db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
}
};

let tb_infos = match &db_meta.from_share {
None => list_tables_from_unshare_db(self, db_id, tenant_dbname).await?,
None => list_tables_from_unshare_db(self, *seq_db_id.data, tenant_dbname).await?,
Some(share) => {
let share_ident = share.clone().to_tident(());
error!(
Expand Down Expand Up @@ -3594,7 +3593,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
)
.await;

let (_db_id_seq, db_id, db_meta) = match res {
let (seq_db_id, db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
Expand All @@ -3610,7 +3609,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}

let db_info = Arc::new(DatabaseInfo {
database_id: DatabaseId::new(db_id),
database_id: seq_db_id.data,
name_ident: req.inner.clone(),
meta: db_meta,
});
Expand Down Expand Up @@ -4752,7 +4751,7 @@ async fn drop_database_meta(
)
.await;

let (db_id_seq, db_id, mut db_meta) = match res {
let (seq_db_id, mut db_meta) = match res {
Ok(x) => x,
Err(e) => {
if let KVAppError::AppError(AppError::UnknownDatabase(_)) = e {
Expand All @@ -4768,14 +4767,15 @@ async fn drop_database_meta(
// remove db_name -> db id
if drop_name_key {
txn.condition
.push(txn_cond_seq(tenant_dbname, Eq, db_id_seq));
.push(txn_cond_seq(tenant_dbname, Eq, seq_db_id.seq));
txn.if_then.push(txn_op_del(tenant_dbname)); // (tenant, db_name) -> db_id
}

// remove db from share
let mut share_specs = Vec::with_capacity(db_meta.shared_by.len());
for share_id in &db_meta.shared_by {
let res = remove_db_from_share(kv_api, *share_id, db_id, tenant_dbname, txn).await;
let res =
remove_db_from_share(kv_api, *share_id, *seq_db_id.data, tenant_dbname, txn).await;

match res {
Ok((share_name, share_meta)) => {
Expand All @@ -4800,7 +4800,7 @@ async fn drop_database_meta(

let (removed, _from_share) = is_db_need_to_be_remove(
kv_api,
db_id,
*seq_db_id.data,
// remove db directly if created from share
|db_meta| db_meta.from_share.is_some(),
txn,
Expand All @@ -4811,7 +4811,7 @@ async fn drop_database_meta(
// if db create from share then remove it directly and remove db id from share
debug!(
name :? =(tenant_dbname),
id :? =(&DatabaseId { db_id });
id :? =(&seq_db_id );
"drop_database from share"
);

Expand All @@ -4827,7 +4827,7 @@ async fn drop_database_meta(
db_id_list_opt.unwrap_or(DbIdList::new())
};
if let Some(last_db_id) = db_id_list.last() {
if *last_db_id == db_id {
if *last_db_id == *seq_db_id.data {
db_id_list.pop();
txn.condition
.push(txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq));
Expand All @@ -4840,10 +4840,10 @@ async fn drop_database_meta(
// del (tenant, db_name) -> db_id
// set db_meta.drop_on = now and update (db_id) -> db_meta

let db_id_key = DatabaseId { db_id };
let db_id_key = seq_db_id.data;

debug!(
db_id = db_id,
seq_db_id :? = seq_db_id,
name_key :? =(tenant_dbname);
"drop_database"
);
Expand Down Expand Up @@ -4873,12 +4873,12 @@ async fn drop_database_meta(

if db_id_list_seq == 0 || db_id_list_opt.is_none() {
warn!(
"drop db:{:?}, db_id:{:?} has no DbIdListKey",
tenant_dbname, db_id
"drop db:{:?}, seq_db_id:{:?} has no DbIdListKey",
tenant_dbname, seq_db_id
);

let mut db_id_list = DbIdList::new();
db_id_list.append(db_id);
db_id_list.append(*seq_db_id.data);

txn.condition
.push(txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq));
Expand All @@ -4889,9 +4889,9 @@ async fn drop_database_meta(
}

if share_specs.is_empty() {
Ok((db_id, None))
Ok((*seq_db_id.data, None))
} else {
Ok((db_id, Some(share_specs)))
Ok((*seq_db_id.data, Some(share_specs)))
}
}

Expand Down Expand Up @@ -4975,20 +4975,20 @@ pub(crate) async fn get_db_id_or_err(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &DatabaseNameIdent,
msg: impl Display,
) -> Result<SeqV<Id<DatabaseId>>, KVAppError> {
) -> Result<SeqV<DatabaseId>, KVAppError> {
let seq_db_id = kv_api.get_pb(name_key).await?;

let seq_db_id = seq_db_id.ok_or_else(|| unknown_database_error(name_key, msg))?;
Ok(seq_db_id)

Ok(seq_db_id.map(|x| x.into_inner()))
}

/// Returns (db_id_seq, db_id, db_meta_seq, db_meta)
pub(crate) async fn get_db_or_err(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &DatabaseNameIdent,
msg: impl Display,
) -> Result<(u64, u64, SeqV<DatabaseMeta>), KVAppError> {
// TODO: change to returning SeqV
) -> Result<(SeqV<DatabaseId>, SeqV<DatabaseMeta>), KVAppError> {
let seq_db_id = kv_api.get_pb(name_key).await?;
let seq_db_id = seq_db_id.ok_or_else(|| unknown_database_error(name_key, &msg))?;

Expand All @@ -4997,7 +4997,7 @@ pub(crate) async fn get_db_or_err(
let seq_db_meta = kv_api.get_pb(&id_key).await?;
let seq_db_meta = seq_db_meta.ok_or_else(|| unknown_database_error(name_key, &msg))?;

Ok((seq_db_id.seq, *seq_db_id.data, seq_db_meta))
Ok((seq_db_id.map(|x| x.into_inner()), seq_db_meta))
}

/// Returns (db_meta_seq, db_meta)
Expand Down Expand Up @@ -5867,9 +5867,9 @@ impl UndropTableStrategy for UndropTableReq {
kv_api: &'a (impl kvapi::KVApi<Error = MetaError> + ?Sized),
) -> Result<(u64, SeqV<DatabaseMeta>), KVAppError> {
// for plain un-drop table (by name), database meta is refreshed by name
let (_, db_id, db_meta) =
let (seq_db_id, db_meta) =
get_db_or_err(kv_api, &self.name_ident.db_name_ident(), "undrop_table").await?;
Ok((db_id, db_meta))
Ok((*seq_db_id.data, db_meta))
}

fn extract_and_validate_table_id(
Expand Down
Loading

0 comments on commit 6bb25f2

Please sign in to comment.