Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): first check privilege in SystemEngine get_full_data #16421

Merged
merged 2 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -223,6 +224,13 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
// Get the db name by meta id.
async fn get_db_name_by_id(&self, db_ids: MetaId) -> Result<String>;

// Mget dbs by DatabaseNameIdent.
async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>>;

// Mget the dbs name by meta ids.
async fn mget_database_names_by_ids(
&self,
Expand Down
33 changes: 33 additions & 0 deletions src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -338,6 +339,38 @@ impl Catalog for DatabaseCatalog {
}
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let sys_dbs = self.immutable_catalog.list_databases(tenant).await?;
let sys_db_names: Vec<_> = sys_dbs
.iter()
.map(|sys_db| sys_db.get_db_info().name_ident.database_name())
.collect();

let mut mut_db_names: Vec<_> = Vec::new();
for db_name in db_names {
if !sys_db_names.contains(&db_name.database_name()) {
mut_db_names.push(db_name.clone());
}
}

let mut dbs = self
.immutable_catalog
.mget_databases(tenant, db_names)
.await?;

let other = self
.mutable_catalog
.mget_databases(tenant, &mut_db_names)
.await?;

dbs.extend(other);
Ok(dbs)
}

#[async_backtrace::framed]
async fn mget_database_names_by_ids(
&self,
Expand Down
18 changes: 18 additions & 0 deletions src/query/service/src/catalogs/default/immutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_catalog::catalog::Catalog;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -232,6 +233,23 @@ impl Catalog for ImmutableCatalog {
}
}

async fn mget_databases(
&self,
_tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let mut res: Vec<Arc<dyn Database>> = vec![];
for db_name in db_names {
let db_name = db_name.database_name();
if db_name == "system" {
res.push(self.sys_db.clone());
} else if db_name == "information_schema" {
res.push(self.info_schema_db.clone());
}
}
Ok(res)
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
29 changes: 29 additions & 0 deletions src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_catalog::catalog::Catalog;
use databend_common_config::InnerConfig;
use databend_common_exception::Result;
use databend_common_meta_api::kv_app_error::KVAppError;
use databend_common_meta_api::name_id_value_api::NameIdValueApiCompat;
use databend_common_meta_api::SchemaApi;
use databend_common_meta_api::SequenceApi;
use databend_common_meta_app::app_error::AppError;
Expand Down Expand Up @@ -422,6 +423,34 @@ impl Catalog for MutableCatalog {
Ok(res)
}

// Mget dbs by DatabaseNameIdent.
async fn mget_databases(
&self,
_tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let res = self
.ctx
.meta
.mget_id_value_compat(db_names.iter().cloned())
.await?;
let dbs = res
.map(|(name_ident, database_id, meta)| {
Arc::new(DatabaseInfo {
database_id,
name_ident,
meta,
})
})
.collect::<Vec<Arc<DatabaseInfo>>>();

dbs.iter().try_fold(vec![], |mut acc, item| {
let db = self.build_db_instance(item)?;
acc.push(db);
Ok(acc)
})
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/catalogs/default/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_catalog::table_args::TableArgs;
use databend_common_catalog::table_function::TableFunction;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -305,6 +306,14 @@ impl Catalog for SessionCatalog {
self.inner.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.inner.mget_databases(tenant, db_names).await
}

// Mget the dbs name by meta ids.
async fn mget_database_names_by_ids(
&self,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use databend_common_meta_app::principal::RoleInfo;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -208,6 +209,14 @@ impl Catalog for FakedCatalog {
self.cat.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.cat.mget_databases(tenant, db_names).await
}

async fn mget_database_names_by_ids(
&self,
tenant: &Tenant,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use databend_common_meta_app::principal::RoleInfo;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -957,6 +958,14 @@ impl Catalog for FakedCatalog {
self.cat.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.cat.mget_databases(tenant, db_names).await
}

#[async_backtrace::framed]
async fn mget_database_names_by_ids(
&self,
Expand Down
11 changes: 11 additions & 0 deletions src/query/storages/hive/hive/src/hive_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogOption;
Expand Down Expand Up @@ -385,6 +386,16 @@ impl Catalog for HiveCatalog {
))
}

async fn mget_databases(
&self,
_tenant: &Tenant,
_db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
Err(ErrorCode::Unimplemented(
"Cannot mget databases in HIVE catalog",
))
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
10 changes: 10 additions & 0 deletions src/query/storages/iceberg/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogOption;
Expand Down Expand Up @@ -295,6 +296,15 @@ impl Catalog for IcebergCatalog {
"Cannot get db name by id in ICEBERG catalog",
))
}
async fn mget_databases(
&self,
_tenant: &Tenant,
_db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
Err(ErrorCode::Unimplemented(
"Cannot mget databases in ICEBERG catalog",
))
}

async fn mget_database_names_by_ids(
&self,
Expand Down
63 changes: 52 additions & 11 deletions src/query/storages/system/src/columns_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchemaRefExt;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -277,16 +278,7 @@ pub(crate) async fn dump_tables(

let mut final_dbs: Vec<(String, u64)> = Vec::new();

if databases.is_empty() {
let all_databases = catalog.list_databases(&tenant).await?;
for db in all_databases {
let db_id = db.get_db_info().database_id.db_id;
let db_name = db.name();
if visibility_checker.check_database_visibility(CATALOG_DEFAULT, db_name, db_id) {
final_dbs.push((db_name.to_string(), db_id));
}
}
} else {
if !databases.is_empty() {
for db in databases {
let db_id = catalog
.get_database(&tenant, &db)
Expand All @@ -298,12 +290,61 @@ pub(crate) async fn dump_tables(
final_dbs.push((db.to_string(), db_id));
}
}
} else {
let catalog_dbs = visibility_checker.get_visibility_database();
// None means has global level privileges
if let Some(catalog_dbs) = catalog_dbs {
for (catalog_name, dbs) in catalog_dbs {
if catalog_name == CATALOG_DEFAULT {
let mut catalog_db_ids = vec![];
let mut catalog_db_names = vec![];
catalog_db_names.extend(
dbs.iter()
.filter_map(|(db_name, _)| *db_name)
.map(|db_name| db_name.to_string()),
);
catalog_db_ids.extend(dbs.iter().filter_map(|(_, db_id)| *db_id));
if let Ok(databases) = catalog
.mget_database_names_by_ids(&tenant, &catalog_db_ids)
.await
{
catalog_db_names.extend(databases.into_iter().flatten());
} else {
let msg = format!("Failed to get database name by id: {}", catalog.name());
warn!("{}", msg);
}
let db_idents = catalog_db_names
.iter()
.map(|name| DatabaseNameIdent::new(&tenant, name))
.collect::<Vec<DatabaseNameIdent>>();
let dbs: Vec<(String, u64)> = catalog
.mget_databases(&tenant, &db_idents)
.await?
.iter()
.map(|db| (db.name().to_string(), db.get_db_info().database_id.db_id))
.collect();
final_dbs.extend(dbs);
}
}
} else {
let all_databases = catalog.list_databases(&tenant).await?;
for db in all_databases {
let db_id = db.get_db_info().database_id.db_id;
let db_name = db.name();
if visibility_checker.check_database_visibility(CATALOG_DEFAULT, db_name, db_id) {
final_dbs.push((db_name.to_string(), db_id));
}
}
}
}

let mut final_tables: Vec<(String, Vec<Arc<dyn Table>>)> = Vec::with_capacity(final_dbs.len());
for (database, db_id) in final_dbs {
let tables = if tables.is_empty() {
(catalog.list_tables(&tenant, &database).await).unwrap_or_default()
catalog
.list_tables(&tenant, &database)
.await
.unwrap_or_default()
} else {
let mut res = Vec::new();
for table in &tables {
Expand Down
Loading
Loading