Skip to content

Commit

Permalink
refactor: optimize share table performance (#16218)
Browse files Browse the repository at this point in the history
* refactor: reuse http client in share endpoint client

* refactor: add global share cache manager

* refactor: add global share cache manager

* refactor: add share table by db name API

* refactor: reuse http client in share endpoint client

* refactor: add global share cache manager

* refactor: add global share cache manager

* refactor: add global share cache manager
  • Loading branch information
lichuang authored Aug 13, 2024
1 parent 3d294c7 commit c09bc28
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 82 deletions.
155 changes: 102 additions & 53 deletions src/query/service/src/catalogs/share/share_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use databend_common_meta_app::schema::CreateVirtualColumnReq;
use databend_common_meta_app::schema::DatabaseIdent;
use databend_common_meta_app::schema::DatabaseInfo;
use databend_common_meta_app::schema::DatabaseMeta;
use databend_common_meta_app::schema::DatabaseType;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::DropDatabaseReply;
use databend_common_meta_app::schema::DropDatabaseReq;
Expand Down Expand Up @@ -165,6 +166,8 @@ pub struct ShareCatalog {
info: Arc<CatalogInfo>,

option: ShareCatalogOption,

client: ShareEndpointClient,
}

impl Debug for ShareCatalog {
Expand All @@ -190,8 +193,14 @@ impl ShareCatalog {
},
disable_table_info_refresh: false,
};
let client = ShareEndpointClient::new();

Ok(Self { info, option, ctx })
Ok(Self {
info,
option,
ctx,
client,
})
}

async fn get_share_spec(&self) -> Result<ShareSpec> {
Expand Down Expand Up @@ -219,8 +228,8 @@ impl ShareCatalog {

// 2. check if ShareSpec exists using share endpoint
let share_endpoint_meta = &reply.share_endpoint_meta_vec[0].1;
let client = ShareEndpointClient::new();
let share_spec = client
let share_spec = self
.client
.get_share_spec_by_name(share_endpoint_meta, tenant, provider, share_name)
.await?;

Expand Down Expand Up @@ -390,72 +399,112 @@ impl Catalog for ShareCatalog {
db_name: &str,
table_name: &str,
) -> Result<Arc<dyn Table>> {
let share_spec = self.get_share_spec().await?;
if let Some(use_database) = &share_spec.use_database {
if use_database.name == db_name {
let db_info = self.generate_share_database_info(use_database);
let db = ShareDatabase::try_create(self.ctx.clone(), db_info)?;
db.get_table(table_name).await
} else {
Err(ErrorCode::UnknownDatabase(format!(
"cannot find database {} from share {}",
db_name, self.option.share_name,
)))
}
let share_option = &self.option;
let share_name = &share_option.share_name;
let share_endpoint = &share_option.share_endpoint;
let provider = &share_option.provider;
let tenant = &self.info.name_ident.tenant;

// 1. get share endpoint
let meta_api = UserApiProvider::instance().get_meta_store_client();
let req = GetShareEndpointReq {
tenant: Tenant {
tenant: tenant.to_owned(),
},
endpoint: Some(share_endpoint.clone()),
};
let reply = meta_api.get_share_endpoint(req).await?;
if reply.share_endpoint_meta_vec.is_empty() {
return Err(ErrorCode::UnknownShareEndpoint(format!(
"UnknownShareEndpoint {:?}",
share_endpoint
)));
}

// 2. check if ShareSpec exists using share endpoint
let share_endpoint_meta = reply.share_endpoint_meta_vec[0].1.clone();
let mut table_info = self
.client
.get_share_table(
&share_endpoint_meta,
tenant,
provider,
share_name,
db_name,
table_name,
)
.await?;

let db_type = table_info.db_type.clone();
if let DatabaseType::ShareDB(params) = db_type {
let mut params = params;
params.share_endpoint_url = share_endpoint_meta.url.clone();
params.share_endpoint_credential = share_endpoint_meta.credential.clone().unwrap();
table_info.db_type = DatabaseType::ShareDB(params);

self.ctx.storage_factory.get_table(&table_info)
} else {
Err(ErrorCode::ShareHasNoGrantedDatabase(format!(
"share {}.{} has no granted database",
self.option.provider, self.option.share_name,
)))
unreachable!()
}
}

#[async_backtrace::framed]
async fn list_tables(&self, _tenant: &Tenant, db_name: &str) -> Result<Vec<Arc<dyn Table>>> {
let share_spec = self.get_share_spec().await?;
if let Some(use_database) = &share_spec.use_database {
if use_database.name == db_name {
let db_info = self.generate_share_database_info(use_database);
let db = ShareDatabase::try_create(self.ctx.clone(), db_info)?;
db.list_tables().await
let share_option = &self.option;
let share_name = &share_option.share_name;
let share_endpoint = &share_option.share_endpoint;
let provider = &share_option.provider;
let tenant = &self.info.name_ident.tenant;

// 1. get share endpoint
let meta_api = UserApiProvider::instance().get_meta_store_client();
let req = GetShareEndpointReq {
tenant: Tenant {
tenant: tenant.to_owned(),
},
endpoint: Some(share_endpoint.clone()),
};
let reply = meta_api.get_share_endpoint(req).await?;
if reply.share_endpoint_meta_vec.is_empty() {
return Err(ErrorCode::UnknownShareEndpoint(format!(
"UnknownShareEndpoint {:?}",
share_endpoint
)));
}

// 2. check if ShareSpec exists using share endpoint
let share_endpoint_meta = reply.share_endpoint_meta_vec[0].1.clone();
let table_info_map = self
.client
.get_share_tables(&share_endpoint_meta, tenant, provider, share_name, db_name)
.await?;

let mut table_info_vec = vec![];
for info in table_info_map.values() {
let mut table_info = info.clone();
if let DatabaseType::ShareDB(params) = &table_info.db_type {
let mut params = params.clone();
params.share_endpoint_url = share_endpoint_meta.url.clone();
params.share_endpoint_credential = share_endpoint_meta.credential.clone().unwrap();
table_info.db_type = DatabaseType::ShareDB(params);

table_info_vec.push(self.ctx.storage_factory.get_table(&table_info)?);
} else {
Err(ErrorCode::UnknownDatabase(format!(
"cannot find database {} from share {}",
db_name, self.option.share_name,
)))
unreachable!()
}
} else {
Err(ErrorCode::ShareHasNoGrantedDatabase(format!(
"share {}.{} has no granted database",
self.option.provider, self.option.share_name,
)))
}
Ok(table_info_vec)
}

#[async_backtrace::framed]
async fn list_tables_history(
&self,
_tenant: &Tenant,
db_name: &str,
_db_name: &str,
) -> Result<Vec<Arc<dyn Table>>> {
let share_spec = self.get_share_spec().await?;
if let Some(use_database) = &share_spec.use_database {
if use_database.name == db_name {
let db_info = self.generate_share_database_info(use_database);
let db = ShareDatabase::try_create(self.ctx.clone(), db_info)?;
db.list_tables_history().await
} else {
Err(ErrorCode::UnknownDatabase(format!(
"cannot find database {} from share {}",
db_name, self.option.share_name,
)))
}
} else {
Err(ErrorCode::ShareHasNoGrantedDatabase(format!(
"share {}.{} has no granted database",
self.option.provider, self.option.share_name,
)))
}
Err(ErrorCode::PermissionDenied(
"Permission denied, cannot list table history from a shared database".to_string(),
))
}

#[async_backtrace::framed]
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/databases/share/share_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl ShareDatabase {

let client = ShareEndpointClient::new();
let table_info_map = client
.get_share_tables(
.get_share_tables_by_db_id(
&share_endpoint_meta,
self.get_tenant().tenant_name(),
from_share.tenant_name(),
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_config::InnerConfig;
use databend_common_exception::Result;
use databend_common_meta_app::schema::CatalogType;
use databend_common_sharing::ShareEndpointManager;
use databend_common_sharing::SharePresignedCacheManager;
use databend_common_storage::DataOperator;
use databend_common_storage::ShareTableConfig;
use databend_common_storages_hive::HiveCreator;
Expand Down Expand Up @@ -113,6 +114,7 @@ impl GlobalServices {
SessionManager::init(config)?;
LockManager::init()?;
AuthMgr::init(config)?;
SharePresignedCacheManager::init()?;

// Init user manager.
// Builtin users and udfs are created here.
Expand Down
3 changes: 3 additions & 0 deletions src/query/sharing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ pub use signer::SharedSigner;

mod share_endpoint;
pub use share_endpoint::ShareEndpointManager;

mod share_presigned_cache_manager;
pub use share_presigned_cache_manager::SharePresignedCacheManager;
91 changes: 81 additions & 10 deletions src/query/sharing/src/share_endpoint_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ use ring::hmac;

use crate::signer::HMAC_AUTH_METHOD;

pub struct ShareEndpointClient {}
#[derive(Clone)]
pub struct ShareEndpointClient {
client: reqwest::Client,
}

impl ShareEndpointClient {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {}
Self {
client: reqwest::Client::new(),
}
}

pub fn generate_auth_headers(
Expand Down Expand Up @@ -80,8 +85,7 @@ impl ShareEndpointClient {
HeaderMap::new()
};

let client = reqwest::Client::new();
let resp = client.get(&uri).headers(headers).send().await;
let resp = self.client.get(&uri).headers(headers).send().await;

match resp {
Ok(resp) => {
Expand Down Expand Up @@ -117,8 +121,44 @@ impl ShareEndpointClient {
} else {
HeaderMap::new()
};
let client = reqwest::Client::new();
let resp = client.get(&uri).headers(headers).send().await;
let resp = self.client.get(&uri).headers(headers).send().await;

match resp {
Ok(resp) => {
let body = resp.text().await?;
let ret: TableInfo = serde_json::from_str(&body)?;
Ok(ret)
}
Err(err) => {
error!("get_share_spec_by_name fail: {:?}", err);
Err(err.into())
}
}
}

#[async_backtrace::framed]
pub async fn get_share_table(
&self,
share_endpoint_meta: &ShareEndpointMeta,
from_tenant: &str,
to_tenant: &str,
share_name: &str,
db_name: &str,
table_name: &str,
) -> Result<TableInfo> {
let path = format!(
"/{}/{}/{}/{}/v2/share_table",
to_tenant, share_name, db_name, table_name
);
// skip path first `/` char
let uri = format!("{}{}", share_endpoint_meta.url, &path[1..]);
let headers = if let Some(credential) = &share_endpoint_meta.credential {
Self::generate_auth_headers(&path, credential, from_tenant)
} else {
HeaderMap::new()
};

let resp = self.client.get(&uri).headers(headers).send().await;

match resp {
Ok(resp) => {
Expand All @@ -135,6 +175,39 @@ impl ShareEndpointClient {

#[async_backtrace::framed]
pub async fn get_share_tables(
&self,
share_endpoint_meta: &ShareEndpointMeta,
from_tenant: &str,
to_tenant: &str,
share_name: &str,
db_name: &str,
) -> Result<BTreeMap<String, TableInfo>> {
let path = format!("/{}/{}/{}/v2/share_tables", to_tenant, share_name, db_name);
// skip path first `/` char
let uri = format!("{}{}", share_endpoint_meta.url, &path[1..]);
let headers = if let Some(credential) = &share_endpoint_meta.credential {
Self::generate_auth_headers(&path, credential, from_tenant)
} else {
HeaderMap::new()
};

let resp = self.client.get(&uri).headers(headers).send().await;

match resp {
Ok(resp) => {
let body = resp.text().await?;
let ret: BTreeMap<String, TableInfo> = serde_json::from_str(&body)?;
Ok(ret)
}
Err(err) => {
error!("get_share_tables fail: {:?}", err);
Err(err.into())
}
}
}

#[async_backtrace::framed]
pub async fn get_share_tables_by_db_id(
&self,
share_endpoint_meta: &ShareEndpointMeta,
from_tenant: &str,
Expand All @@ -153,8 +226,7 @@ impl ShareEndpointClient {
} else {
HeaderMap::new()
};
let client = reqwest::Client::new();
let resp = client.get(&uri).headers(headers).send().await;
let resp = self.client.get(&uri).headers(headers).send().await;

match resp {
Ok(resp) => {
Expand Down Expand Up @@ -190,8 +262,7 @@ impl ShareEndpointClient {
&share_params.share_endpoint_credential,
from_tenant,
);
let client = reqwest::Client::new();
let resp = client.get(&uri).headers(headers).send().await;
let resp = self.client.get(&uri).headers(headers).send().await;

match resp {
Ok(resp) => {
Expand Down
Loading

0 comments on commit c09bc28

Please sign in to comment.