Skip to content

Commit

Permalink
fix: Only when all tables are included, the db can be dropped
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 28, 2024
1 parent 022c5a7 commit 8d878b0
Showing 1 changed file with 18 additions and 95 deletions.
113 changes: 18 additions & 95 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2684,7 +2684,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let table_infos = do_get_table_history(self, db_filter, capacity).await?;

// A DB can be removed only when all its tables are removed.
if vacuum_db && capacity >= table_infos.len() {
if vacuum_db && capacity > table_infos.len() {
vacuum_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
Expand Down Expand Up @@ -3578,120 +3578,42 @@ async fn batch_filter_table_info(
Ok(())
}

type TableFilterInfoList<'a> = Vec<(
Range<Option<DateTime<Utc>>>,
&'a Arc<DatabaseInfo>,
u64,
String,
)>;

#[logcall::logcall(input = "")]
#[fastrace::trace]
async fn get_gc_table_info(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
limit: usize,
table_id_list: &TableFilterInfoList<'_>,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let table_id_list = &table_id_list[..std::cmp::min(limit, table_id_list.len())];

let mut filter_tb_infos = vec![];

for chunk in table_id_list.chunks(DEFAULT_MGET_SIZE) {
batch_filter_table_info(kv_api, chunk, &mut filter_tb_infos).await?;
}

Ok(filter_tb_infos)
}

#[logcall::logcall(input = "")]
#[fastrace::trace]
async fn do_get_table_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
db_filter: (Range<Option<DateTime<Utc>>>, Arc<DatabaseInfo>),
limit: usize,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];

// step 1: list db table name with db id
let mut filter_db_info_with_table_id_key_list: Vec<_> = vec![];

let (drop_time_range, db_info) = db_filter;
let db_id = db_info.database_id.db_id;

// List tables by tenant, db_id, table_name.
let dbid_tbname_idlist = TableIdHistoryIdent {
database_id: db_id,
table_name: "dummy".to_string(),
};
let dir_name = DirName::new(dbid_tbname_idlist);
let strm = kv_api.list_pb_keys(&dir_name).await?;
let table_id_list_keys = strm.try_collect::<Vec<_>>().await?;

let keys = table_id_list_keys
.iter()
.map(|table_id_list_key| (drop_time_range.clone(), &db_info, table_id_list_key.clone()))
.collect::<Vec<_>>();

filter_db_info_with_table_id_key_list.extend(keys);

// step 2: list all table id of table by table name
let keys = filter_db_info_with_table_id_key_list
.iter()
.map(|(_, db_info, table_id_list_key)| TableIdHistoryIdent {
database_id: db_info.database_id.db_id,
table_name: table_id_list_key.table_name.clone(),
})
.collect::<Vec<_>>();

let mut filter_db_info_with_table_id_list: TableFilterInfoList<'_> = vec![];
let mut table_id_list_keys_iter = filter_db_info_with_table_id_key_list.into_iter();
for c in keys.chunks(DEFAULT_MGET_SIZE) {
let strm = kv_api.get_pb_values(c.to_vec()).await?;
let table_id_list_vec = strm
.try_filter_map(|x| async move { Ok(x) })
.try_collect::<Vec<_>>()
.await?;
let table_history_kvs = kv_api.list_pb_vec(&dir_name).await?;

for seq_table_id_list in table_id_list_vec {
let (filter, db_info, table_id_list_key) = table_id_list_keys_iter.next().unwrap();
let tb_id_list = seq_table_id_list.data;

let id_list: Vec<_> = tb_id_list
.id_list
.iter()
.map(|id| {
(
filter.clone(),
db_info,
*id,
table_id_list_key.table_name.clone(),
)
})
.collect();
let mut the_list = vec![];

filter_db_info_with_table_id_list.extend(id_list);
if filter_db_info_with_table_id_list.len() < DEFAULT_MGET_SIZE {
continue;
}

let ret = get_gc_table_info(kv_api, limit, &filter_db_info_with_table_id_list).await?;
filter_tb_infos.extend(ret);
filter_db_info_with_table_id_list.clear();

if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
for (ident, table_history) in table_history_kvs {
for table_id in table_history.id_list.iter() {
the_list.push((
drop_time_range.clone(),
&db_info,
*table_id,
ident.table_name.clone(),
));
}
}

if !filter_db_info_with_table_id_list.is_empty() {
let ret = get_gc_table_info(kv_api, limit, &filter_db_info_with_table_id_list).await?;
filter_tb_infos.extend(ret);
filter_db_info_with_table_id_list.clear();
let mut filter_tb_infos = vec![];

if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}
for c in the_list[..std::cmp::min(limit, the_list.len())].chunks(DEFAULT_MGET_SIZE) {
let mut infos = vec![];
batch_filter_table_info(kv_api, c, &mut infos).await?;
filter_tb_infos.extend(infos);
}

Ok(filter_tb_infos)
Expand Down Expand Up @@ -3880,6 +3802,7 @@ async fn update_txn_to_remove_table_history(
txn.if_then
.push(txn_op_put_pb(table_id_history_ident, &history, None)?);
}

Ok(())
}

Expand Down

0 comments on commit 8d878b0

Please sign in to comment.