Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: continue vacuum drop table on per-table cleanup failures #16424

Merged
merged 15 commits into from
Sep 11, 2024
36 changes: 20 additions & 16 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2884,6 +2884,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
drop_ids.push(DroppedId::Db(
db_info.database_id.db_id,
db_info.name_ident.database_name().to_string(),
Arc::new(vec![]),
));
}
if num == left_num {
Expand All @@ -2893,27 +2894,30 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
});
}
} else {
table_infos.iter().for_each(|(table_info, db_id)| {
if !drop_db {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
))
}
});
drop_table_infos.extend(
table_infos
.into_iter()
.map(|(table_info, _)| table_info)
.collect::<Vec<_>>(),
);
if drop_db {
drop_ids.push(DroppedId::Db(
db_info.database_id.db_id,
db_info.name_ident.database_name().to_string(),
Arc::new(
table_infos
.iter()
.map(|(table_info, _)| {
(table_info.ident.table_id, table_info.name.clone())
})
.collect(),
),
));
} else {
table_infos.iter().for_each(|(table_info, db_id)| {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
))
});
}
drop_table_infos
.extend(table_infos.into_iter().map(|(table_info, _)| table_info));
}
}

Expand Down Expand Up @@ -2974,7 +2978,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<(), KVAppError> {
for drop_id in req.drop_ids {
match drop_id {
DroppedId::Db(db_id, db_name) => {
DroppedId::Db(db_id, db_name, _) => {
gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?
}
DroppedId::Table(db_id, table_id, table_name) => {
Expand Down
29 changes: 23 additions & 6 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4023,10 +4023,12 @@ impl SchemaApiTestSuite {
drop_ids_1.push(DroppedId::Db(
*res.db_id,
db_name.database_name().to_string(),
Arc::new(vec![]),
));
drop_ids_2.push(DroppedId::Db(
*res.db_id,
db_name.database_name().to_string(),
Arc::new(vec![]),
));

let req = CreateTableReq {
Expand Down Expand Up @@ -4063,7 +4065,7 @@ impl SchemaApiTestSuite {

let res = mt.create_database(create_db_req.clone()).await?;
let db_id = res.db_id;
drop_ids_2.push(DroppedId::Db(*db_id, "db2".to_string()));
drop_ids_2.push(DroppedId::Db(*db_id, "db2".to_string(), Arc::new(vec![])));

info!("--- create and drop db2.tb1");
{
Expand Down Expand Up @@ -4262,17 +4264,26 @@ impl SchemaApiTestSuite {
left_table_id.cmp(right_table_id)
}
}
(DroppedId::Db(left_db_id, _), DroppedId::Db(right_db_id, _)) => {
(DroppedId::Db(left_db_id, _, _), DroppedId::Db(right_db_id, _, _)) => {
left_db_id.cmp(right_db_id)
}
(DroppedId::Db(left_db_id, _), DroppedId::Table(right_db_id, _, _)) => {
(DroppedId::Db(left_db_id, _, _), DroppedId::Table(right_db_id, _, _)) => {
left_db_id.cmp(right_db_id)
}
(DroppedId::Table(left_db_id, _, _), DroppedId::Db(right_db_id, _)) => {
(DroppedId::Table(left_db_id, _, _), DroppedId::Db(right_db_id, _, _)) => {
left_db_id.cmp(right_db_id)
}
}
}
fn is_dropped_id_eq(l: &DroppedId, r: &DroppedId) -> bool {
match (l, r) {
(
DroppedId::Db(left_db_id, left_db_name, _),
DroppedId::Db(right_db_id, right_db_name, _),
) => left_db_id == right_db_id && left_db_name == right_db_name,
_ => l == r,
}
}
// case 1: test AllDroppedTables with filter time
{
let now = Utc::now();
Expand All @@ -4285,7 +4296,10 @@ impl SchemaApiTestSuite {
// sort drop id by table id
let mut sort_drop_ids = resp.drop_ids;
sort_drop_ids.sort_by(cmp_dropped_id);
assert_eq!(sort_drop_ids, drop_ids_1);
assert_eq!(sort_drop_ids.len(), drop_ids_1.len());
for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_1.iter()) {
assert!(is_dropped_id_eq(id1, id2));
}

let expected: BTreeSet<String> = [
"'db1'.'tb1'".to_string(),
Expand Down Expand Up @@ -4314,7 +4328,10 @@ impl SchemaApiTestSuite {
// sort drop id by table id
let mut sort_drop_ids = resp.drop_ids;
sort_drop_ids.sort_by(cmp_dropped_id);
assert_eq!(sort_drop_ids, drop_ids_2);
assert_eq!(sort_drop_ids.len(), drop_ids_2.len());
for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_2.iter()) {
assert!(is_dropped_id_eq(id1, id2));
}

let expected: BTreeSet<String> = [
"'db1'.'tb1'".to_string(),
Expand Down
21 changes: 18 additions & 3 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::time::Duration;
use anyerror::func_name;
use chrono::DateTime;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FieldIndex;
use databend_common_expression::TableField;
Expand Down Expand Up @@ -201,6 +202,20 @@ pub struct TableInfo {
pub db_type: DatabaseType,
}

impl TableInfo {
pub fn database_name(&self) -> Result<&str> {
if self.engine() != "FUSE" {
return Err(ErrorCode::Internal(format!(
"Invalid engine: {}",
self.engine()
)));
}
let database_name = self.desc.split('.').next().unwrap();
let database_name = &database_name[1..database_name.len() - 1];
Ok(database_name)
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
pub struct TableStatistics {
/// Number of rows
Expand Down Expand Up @@ -360,7 +375,7 @@ impl Default for TableMeta {
fn default() -> Self {
TableMeta {
schema: Arc::new(TableSchema::empty()),
engine: "".to_string(),
engine: "FUSE".to_string(),
engine_options: BTreeMap::new(),
storage_params: None,
part_prefix: "".to_string(),
Expand Down Expand Up @@ -907,8 +922,8 @@ pub struct ListDroppedTableReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DroppedId {
// db id, db name
Db(u64, String),
// db id, db name, (table id, table name)s
Db(u64, String, Arc<Vec<(u64, String)>>),
// db id, table id, table name
Table(u64, u64, String),
}
Expand Down
5 changes: 2 additions & 3 deletions src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult;
use databend_enterprise_vacuum_handler::VacuumHandler;
use databend_enterprise_vacuum_handler::VacuumHandlerWrapper;

use crate::storages::fuse::do_vacuum;
use crate::storages::fuse::operations::vacuum_temporary_files::do_vacuum_temporary_files;
use crate::storages::fuse::vacuum_drop_tables;

pub struct RealVacuumHandler {}

#[async_trait::async_trait]
Expand All @@ -49,7 +48,7 @@ impl VacuumHandler for RealVacuumHandler {
threads_nums: usize,
tables: Vec<Arc<dyn Table>>,
dry_run_limit: Option<usize>,
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
) -> VacuumDropTablesResult {
vacuum_drop_tables(threads_nums, tables, dry_run_limit).await
}

Expand Down
Loading
Loading