Skip to content

Commit

Permalink
refactor: add SchemaApi::get_table_in_db (#16582)
Browse files Browse the repository at this point in the history
In cases where the `database-id` is already known, use this method to
get a table as replacement of `get_table()`. `get_table()` re-fetch
database-id by database-name, thus leads to additional unnecessary delay
and potential consistency issues, because a database may already be
renamed, and using the original database-name just gets the wrong
database instance.
  • Loading branch information
drmingdrmer authored Oct 10, 2024
1 parent 530de62 commit b7acf5c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 33 deletions.
9 changes: 9 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_meta_app::schema::CreateTableReply;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::CreateVirtualColumnReq;
use databend_common_meta_app::schema::DBIdTableName;
use databend_common_meta_app::schema::DatabaseInfo;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::DictionaryMeta;
Expand Down Expand Up @@ -202,8 +203,16 @@ pub trait SchemaApi: Send + Sync {

async fn rename_table(&self, req: RenameTableReq) -> Result<RenameTableReply, KVAppError>;

/// Get a [`TableInfo`] by `tenant, database_name, table_name`.
///
/// This method should be deprecated,
/// where the database-id is already known and there is no need to re-fetch db by database-name.
/// In this case, use [`Self::get_table_in_db`] instead.
async fn get_table(&self, req: GetTableReq) -> Result<Arc<TableInfo>, KVAppError>;

/// Get a [`TableNIV`] by `database_id, table_name`.
async fn get_table_in_db(&self, req: &DBIdTableName) -> Result<Option<TableNIV>, MetaError>;

async fn get_table_meta_history(
&self,
database_name: &str,
Expand Down
65 changes: 40 additions & 25 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1477,43 +1477,28 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
};

let table_id = {
// Get table by tenant,db_id, table_name to assert presence.

let dbid_tbname = DBIdTableName {
db_id: *seq_db_id.data,
table_name: tenant_dbname_tbname.table_name.clone(),
};

let (tb_id_seq, table_id) = get_u64_value(self, &dbid_tbname).await?;
assert_table_exist(tb_id_seq, tenant_dbname_tbname, "get_table")?;

table_id
let dbid_tbname = DBIdTableName {
db_id: *seq_db_id.data,
table_name: tenant_dbname_tbname.table_name.clone(),
};

let tbid = TableId { table_id };

let seq_meta = self.get_pb(&tbid).await?;
let table_niv = self.get_table_in_db(&dbid_tbname).await?;

let Some(seq_meta) = seq_meta else {
Err(AppError::from(UnknownTable::new(
let Some(table_niv) = table_niv else {
return Err(AppError::from(UnknownTable::new(
&tenant_dbname_tbname.table_name,
format!("get_table: {}", tenant_dbname_tbname),
)))?
))
.into());
};

debug!(
ident :% =(&tbid),
name :% =(tenant_dbname_tbname),
table_meta :? =(&seq_meta);
"get_table"
);
let (_name, id, seq_meta) = table_niv.unpack();

let db_type = DatabaseType::NormalDB;

let tb_info = TableInfo {
ident: TableIdent {
table_id: tbid.table_id,
table_id: id.table_id,
seq: seq_meta.seq,
},
desc: format!(
Expand All @@ -1530,6 +1515,36 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
return Ok(Arc::new(tb_info));
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_table_in_db(
&self,
name_ident: &DBIdTableName,
) -> Result<Option<TableNIV>, MetaError> {
debug!(req :? =(name_ident); "SchemaApi: {}", func_name!());

let table_id = {
// Get table by tenant, db_id, table_name to assert presence.

let (tb_id_seq, table_id) = get_u64_value(self, name_ident).await?;
if tb_id_seq == 0 {
return Ok(None);
}

table_id
};

let tbid = TableId { table_id };

let seq_meta = self.get_pb(&tbid).await?;

let Some(seq_meta) = seq_meta else {
return Ok(None);
};

Ok(Some(TableNIV::new(name_ident.clone(), tbid, seq_meta)))
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_table_meta_history(
Expand Down
35 changes: 27 additions & 8 deletions src/query/service/src/databases/default/default_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ use std::sync::Arc;
use databend_common_catalog::table::Table;
use databend_common_exception::Result;
use databend_common_meta_api::SchemaApi;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::UnknownTable;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
use databend_common_meta_app::schema::CommitTableMetaReq;
use databend_common_meta_app::schema::CreateTableReply;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::DBIdTableName;
use databend_common_meta_app::schema::DatabaseInfo;
use databend_common_meta_app::schema::DatabaseType;
use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_meta_app::schema::DropTableReply;
use databend_common_meta_app::schema::GetTableCopiedFileReply;
use databend_common_meta_app::schema::GetTableCopiedFileReq;
use databend_common_meta_app::schema::GetTableReq;
use databend_common_meta_app::schema::ListTableReq;
use databend_common_meta_app::schema::RenameTableReply;
use databend_common_meta_app::schema::RenameTableReq;
Expand Down Expand Up @@ -137,15 +139,32 @@ impl Database for DefaultDatabase {
// Get one table by db and table name.
#[async_backtrace::framed]
async fn get_table(&self, table_name: &str) -> Result<Arc<dyn Table>> {
let table_info = self
.ctx
.meta
.get_table(GetTableReq::new(
self.get_tenant(),
self.get_db_name(),
let name_ident = DBIdTableName::new(self.get_db_info().database_id.db_id, table_name);
let table_niv = self.ctx.meta.get_table_in_db(&name_ident).await?;

let Some(table_niv) = table_niv else {
return Err(AppError::from(UnknownTable::new(
table_name,
format!("get_table: '{}'.'{}'", self.get_db_name(), table_name),
))
.await?;
.into());
};

let (_name, id, seq_meta) = table_niv.unpack();

let table_info = TableInfo {
ident: TableIdent {
table_id: id.table_id,
seq: seq_meta.seq,
},
desc: format!("'{}'.'{}'", self.get_db_name(), table_name),
name: table_name.to_string(),
meta: seq_meta.data,
db_type: DatabaseType::NormalDB,
catalog_info: Default::default(),
};

let table_info = Arc::new(table_info);

let table_info = if self.ctx.disable_table_info_refresh {
table_info
Expand Down

0 comments on commit b7acf5c

Please sign in to comment.