Skip to content

Commit

Permalink
fix: move TableInfo building out of SchemaApi
Browse files Browse the repository at this point in the history
`SchemaApi` can not provide enough information such as `CatalogInfo` and
`DatabaseType`. Therefore `SchemaApi` should not build a `TableInfo`,
leave such task to its caller.
  • Loading branch information
drmingdrmer committed Sep 30, 2024
1 parent 0ed932b commit ec474aa
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 44 deletions.
38 changes: 14 additions & 24 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2658,13 +2658,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
)
.await?;

let mut vacuum_table_infos = vec![];
let mut vacuum_tables = vec![];
let mut vacuum_ids = vec![];

for db_info in db_infos {
if vacuum_table_infos.len() >= the_limit {
if vacuum_tables.len() >= the_limit {
return Ok(ListDroppedTableResp {
drop_table_infos: vacuum_table_infos,
vacuum_tables,
drop_ids: vacuum_ids,
});
}
Expand All @@ -2679,7 +2679,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
drop_time_range.clone()
};

let capacity = the_limit - vacuum_table_infos.len();
let capacity = the_limit - vacuum_tables.len();
let table_nivs = get_history_tables_for_gc(
self,
table_drop_time_range,
Expand All @@ -2692,26 +2692,22 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
vacuum_ids.push(DroppedId::from(table_niv.clone()));
}

let db_name = db_info.name_ident.database_name().to_string();
let db_name_ident = db_info.name_ident.clone();

// A DB can be removed only when all its tables are removed.
if vacuum_db && capacity > table_nivs.len() {
vacuum_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
db_name: db_name.clone(),
});
}

vacuum_table_infos.extend(table_nivs.iter().take(capacity).map(|niv| {
Arc::new(TableInfo::new(
db_info.name_ident.database_name(),
&niv.name().table_name,
TableIdent::new(niv.id().table_id, niv.value().seq),
niv.value().data.clone(),
))
}));
vacuum_tables.extend(std::iter::repeat(db_name_ident).zip(table_nivs));
}

return Ok(ListDroppedTableResp {
drop_table_infos: vacuum_table_infos,
vacuum_tables,
drop_ids: vacuum_ids,
});
}
Expand Down Expand Up @@ -2748,21 +2744,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.await?;

let mut drop_ids = vec![];
let mut drop_table_infos = vec![];
let mut vacuum_tables = vec![];

for niv in table_nivs.iter() {
for niv in table_nivs {
drop_ids.push(DroppedId::from(niv.clone()));

drop_table_infos.push(Arc::new(TableInfo::new(
db_info.name_ident.database_name(),
&niv.name().table_name,
TableIdent::new(niv.id().table_id, niv.value().seq),
niv.value().data.clone(),
)));
vacuum_tables.push((tenant_dbname.clone(), niv));
}

Ok(ListDroppedTableResp {
drop_table_infos,
vacuum_tables,
drop_ids,
})
}
Expand Down
47 changes: 31 additions & 16 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4077,6 +4077,7 @@ impl SchemaApiTestSuite {

// first create a database drop within filter time
info!("--- create db1");
let db1_id;
{
let db_name = DatabaseNameIdent::new(&tenant, "db1");
let req = CreateDatabaseReq {
Expand All @@ -4086,7 +4087,7 @@ impl SchemaApiTestSuite {
};

let res = mt.create_database(req).await?;
let db1_id = res.db_id.db_id;
db1_id = res.db_id;

let req = CreateTableReq {
create_option: CreateOption::Create,
Expand All @@ -4103,21 +4104,22 @@ impl SchemaApiTestSuite {
})
.await?;

drop_ids_boundary.push(DroppedId::new_table(db1_id, db1_tb1_id, "tb1"));
drop_ids_boundary.push(DroppedId::new_table(*db1_id, db1_tb1_id, "tb1"));
drop_ids_boundary.push(DroppedId::Db {
db_id: db1_id,
db_id: *db1_id,
db_name: db_name.database_name().to_string(),
});

drop_ids_no_boundary.push(DroppedId::new_table(db1_id, db1_tb1_id, "tb1"));
drop_ids_no_boundary.push(DroppedId::new_table(*db1_id, db1_tb1_id, "tb1"));
drop_ids_no_boundary.push(DroppedId::Db {
db_id: db1_id,
db_id: *db1_id,
db_name: db_name.database_name().to_string(),
});
}

// second create a database drop outof filter time, but has a table drop within filter time
info!("--- create db2");
let db2_id;
{
let create_db_req = CreateDatabaseReq {
create_option: CreateOption::Create,
Expand All @@ -4126,7 +4128,7 @@ impl SchemaApiTestSuite {
};

let res = mt.create_database(create_db_req.clone()).await?;
let db2_id = res.db_id;
db2_id = res.db_id;
drop_ids_no_boundary.push(DroppedId::Db {
db_id: *db2_id,
db_name: "db2".to_string(),
Expand Down Expand Up @@ -4231,6 +4233,7 @@ impl SchemaApiTestSuite {
}

// third create a database not dropped, but has a table drop within filter time
let db3_id;
{
let create_db_req = CreateDatabaseReq {
create_option: CreateOption::Create,
Expand All @@ -4239,7 +4242,7 @@ impl SchemaApiTestSuite {
};

let res = mt.create_database(create_db_req.clone()).await?;
let db_id = res.db_id;
db3_id = res.db_id;

info!("--- create and drop db3.tb1");
{
Expand All @@ -4250,12 +4253,12 @@ impl SchemaApiTestSuite {
as_dropped: false,
};
let resp = mt.create_table(req.clone()).await?;
drop_ids_boundary.push(DroppedId::new_table(*db_id, resp.table_id, "tb1"));
drop_ids_no_boundary.push(DroppedId::new_table(*db_id, resp.table_id, "tb1"));
drop_ids_boundary.push(DroppedId::new_table(*db3_id, resp.table_id, "tb1"));
drop_ids_no_boundary.push(DroppedId::new_table(*db3_id, resp.table_id, "tb1"));
mt.drop_table_by_id(DropTableByIdReq {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
db_id: *db_id,
db_id: *db3_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
Expand All @@ -4274,11 +4277,11 @@ impl SchemaApiTestSuite {
as_dropped: false,
};
let resp = mt.create_table(req.clone()).await?;
drop_ids_no_boundary.push(DroppedId::new_table(*db_id, resp.table_id, "tb2"));
drop_ids_no_boundary.push(DroppedId::new_table(*db3_id, resp.table_id, "tb2"));
mt.drop_table_by_id(DropTableByIdReq {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
db_id: *db_id,
db_id: *db3_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
Expand Down Expand Up @@ -4331,9 +4334,15 @@ impl SchemaApiTestSuite {
.into_iter()
.collect();
let actual: BTreeSet<String> = resp
.drop_table_infos
.vacuum_tables
.iter()
.map(|table_info| table_info.desc.clone())
.map(|(db_name_ident, table_niv)| {
format!(
"'{}'.'{}'",
db_name_ident.database_name(),
&table_niv.name().table_name
)
})
.collect();
assert_eq!(expected, actual);
}
Expand Down Expand Up @@ -4368,9 +4377,15 @@ impl SchemaApiTestSuite {
.cloned()
.collect();
let actual: BTreeSet<String> = resp
.drop_table_infos
.vacuum_tables
.iter()
.map(|table_info| table_info.desc.clone())
.map(|(db_name_ident, table_niv)| {
format!(
"'{}'.'{}'",
db_name_ident.database_name(),
&table_niv.name().table_name
)
})
.collect();
assert_eq!(expected, actual);
}
Expand Down
25 changes: 24 additions & 1 deletion src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ impl DBIdTableName {
table_name: table_name.to_string(),
}
}
pub fn display(&self) -> impl Display {
format!("{}.'{}'", self.db_id, self.table_name)
}
}

impl Display for DBIdTableName {
Expand Down Expand Up @@ -342,6 +345,7 @@ impl TableInfo {
}
}

/// Deprecated: use `new_full()`. This method sets default values for some fields.
pub fn new(db_name: &str, table_name: &str, ident: TableIdent, meta: TableMeta) -> TableInfo {
TableInfo {
ident,
Expand All @@ -352,6 +356,24 @@ impl TableInfo {
}
}

pub fn new_full(
db_name: &str,
table_name: &str,
ident: TableIdent,
meta: TableMeta,
catalog_info: Arc<CatalogInfo>,
db_type: DatabaseType,
) -> TableInfo {
TableInfo {
ident,
desc: format!("'{}'.'{}'", db_name, table_name),
name: table_name.to_string(),
meta,
catalog_info,
db_type,
}
}

pub fn schema(&self) -> Arc<TableSchema> {
self.meta.schema.clone()
}
Expand Down Expand Up @@ -1021,7 +1043,8 @@ impl DroppedId {

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ListDroppedTableResp {
pub drop_table_infos: Vec<Arc<TableInfo>>,
/// The **database_name, (name, id, value)** of a table to vacuum.
pub vacuum_tables: Vec<(DatabaseNameIdent, TableNIV)>,
pub drop_ids: Vec<DroppedId>,
}

Expand Down
14 changes: 11 additions & 3 deletions src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use databend_common_meta_app::schema::RenameTableReply;
use databend_common_meta_app::schema::RenameTableReq;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TruncateTableReply;
Expand Down Expand Up @@ -524,13 +525,20 @@ impl Catalog for MutableCatalog {
let resp = ctx.meta.get_drop_table_infos(req).await?;

let drop_ids = resp.drop_ids.clone();
let drop_table_infos = resp.drop_table_infos;

let storage = ctx.storage_factory;

let mut tables = vec![];
for table_info in drop_table_infos {
tables.push(storage.get_table(table_info.as_ref())?);
for (db_name_ident, niv) in resp.vacuum_tables {
let table_info = TableInfo::new_full(
db_name_ident.database_name(),
&niv.name().table_name,
TableIdent::new(niv.id().table_id, niv.value().seq),
niv.value().data.clone(),
self.info(),
DatabaseType::NormalDB,
);
tables.push(storage.get_table(&table_info)?);
}
Ok((tables, drop_ids))
}
Expand Down

0 comments on commit ec474aa

Please sign in to comment.