diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 85bca0d09995..404e6b7f1cf2 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -2684,7 +2684,7 @@ impl + ?Sized> SchemaApi for KV { let table_infos = do_get_table_history(self, db_filter, capacity).await?; // A DB can be removed only when all its tables are removed. - if vacuum_db && capacity >= table_infos.len() { + if vacuum_db && capacity > table_infos.len() { vacuum_ids.push(DroppedId::Db { db_id: db_info.database_id.db_id, db_name: db_info.name_ident.database_name().to_string(), @@ -3578,31 +3578,6 @@ async fn batch_filter_table_info( Ok(()) } -type TableFilterInfoList<'a> = Vec<( - Range>>, - &'a Arc, - u64, - String, -)>; - -#[logcall::logcall(input = "")] -#[fastrace::trace] -async fn get_gc_table_info( - kv_api: &(impl kvapi::KVApi + ?Sized), - limit: usize, - table_id_list: &TableFilterInfoList<'_>, -) -> Result, u64)>, KVAppError> { - let table_id_list = &table_id_list[..std::cmp::min(limit, table_id_list.len())]; - - let mut filter_tb_infos = vec![]; - - for chunk in table_id_list.chunks(DEFAULT_MGET_SIZE) { - batch_filter_table_info(kv_api, chunk, &mut filter_tb_infos).await?; - } - - Ok(filter_tb_infos) -} - #[logcall::logcall(input = "")] #[fastrace::trace] async fn do_get_table_history( @@ -3610,88 +3585,35 @@ async fn do_get_table_history( db_filter: (Range>>, Arc), limit: usize, ) -> Result, u64)>, KVAppError> { - let mut filter_tb_infos = vec![]; - - // step 1: list db table name with db id - let mut filter_db_info_with_table_id_key_list: Vec<_> = vec![]; - let (drop_time_range, db_info) = db_filter; let db_id = db_info.database_id.db_id; - // List tables by tenant, db_id, table_name. let dbid_tbname_idlist = TableIdHistoryIdent { database_id: db_id, table_name: "dummy".to_string(), }; let dir_name = DirName::new(dbid_tbname_idlist); - let strm = kv_api.list_pb_keys(&dir_name).await?; - let table_id_list_keys = strm.try_collect::>().await?; - - let keys = table_id_list_keys - .iter() - .map(|table_id_list_key| (drop_time_range.clone(), &db_info, table_id_list_key.clone())) - .collect::>(); - - filter_db_info_with_table_id_key_list.extend(keys); - - // step 2: list all table id of table by table name - let keys = filter_db_info_with_table_id_key_list - .iter() - .map(|(_, db_info, table_id_list_key)| TableIdHistoryIdent { - database_id: db_info.database_id.db_id, - table_name: table_id_list_key.table_name.clone(), - }) - .collect::>(); - - let mut filter_db_info_with_table_id_list: TableFilterInfoList<'_> = vec![]; - let mut table_id_list_keys_iter = filter_db_info_with_table_id_key_list.into_iter(); - for c in keys.chunks(DEFAULT_MGET_SIZE) { - let strm = kv_api.get_pb_values(c.to_vec()).await?; - let table_id_list_vec = strm - .try_filter_map(|x| async move { Ok(x) }) - .try_collect::>() - .await?; + let table_history_kvs = kv_api.list_pb_vec(&dir_name).await?; - for seq_table_id_list in table_id_list_vec { - let (filter, db_info, table_id_list_key) = table_id_list_keys_iter.next().unwrap(); - let tb_id_list = seq_table_id_list.data; - - let id_list: Vec<_> = tb_id_list - .id_list - .iter() - .map(|id| { - ( - filter.clone(), - db_info, - *id, - table_id_list_key.table_name.clone(), - ) - }) - .collect(); + let mut the_list = vec![]; - filter_db_info_with_table_id_list.extend(id_list); - if filter_db_info_with_table_id_list.len() < DEFAULT_MGET_SIZE { - continue; - } - - let ret = get_gc_table_info(kv_api, limit, &filter_db_info_with_table_id_list).await?; - filter_tb_infos.extend(ret); - filter_db_info_with_table_id_list.clear(); - - if filter_tb_infos.len() >= limit { - return Ok(filter_tb_infos); - } + for (ident, table_history) in table_history_kvs { + for table_id in table_history.id_list.iter() { + the_list.push(( + drop_time_range.clone(), + &db_info, + *table_id, + ident.table_name.clone(), + )); } + } - if !filter_db_info_with_table_id_list.is_empty() { - let ret = get_gc_table_info(kv_api, limit, &filter_db_info_with_table_id_list).await?; - filter_tb_infos.extend(ret); - filter_db_info_with_table_id_list.clear(); + let mut filter_tb_infos = vec![]; - if filter_tb_infos.len() >= limit { - return Ok(filter_tb_infos); - } - } + for c in the_list[..std::cmp::min(limit, the_list.len())].chunks(DEFAULT_MGET_SIZE) { + let mut infos = vec![]; + batch_filter_table_info(kv_api, c, &mut infos).await?; + filter_tb_infos.extend(infos); } Ok(filter_tb_infos) @@ -3880,6 +3802,7 @@ async fn update_txn_to_remove_table_history( txn.if_then .push(txn_op_put_pb(table_id_history_ident, &history, None)?); } + Ok(()) }