diff --git a/src/meta/api/src/base_api.rs b/src/meta/api/src/base_api.rs new file mode 100644 index 000000000000..96612c57bbcf --- /dev/null +++ b/src/meta/api/src/base_api.rs @@ -0,0 +1,134 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_app::data_id::DataId; +use databend_common_meta_app::id_generator::IdGenerator; +use databend_common_meta_app::tenant_key::ident::TIdent; +use databend_common_meta_app::tenant_key::resource::TenantResource; +use databend_common_meta_app::KeyWithTenant; +use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_types::MetaError; +use databend_common_meta_types::SeqV; +use databend_common_meta_types::TxnOp; +use databend_common_meta_types::TxnRequest; +use databend_common_proto_conv::FromToProto; +use fastrace::func_name; +use log::debug; + +use crate::kv_pb_api::KVPbApi; +use crate::meta_txn_error::MetaTxnError; +use crate::txn_backoff::txn_backoff; +use crate::txn_op_del; +use crate::util::fetch_id; +use crate::util::send_txn; +use crate::util::txn_cond_eq_seq; +use crate::util::txn_op_put_pb; + +/// BaseApi provide several generic meta-service access pattern implementations +/// +/// These implementations are used by other meta-service APIs. +#[tonic::async_trait] +pub trait BaseApi: KVApi { + /// Create a two level `name -> id -> value` mapping. + /// + /// `IdResource` is the Resource definition for id. + /// `associated_ops` is used to generate additional key-values to add or remove along with the main operation. + /// Such operations do not have any condition constraints. + /// For example, a `name -> id` mapping can have a reverse `id -> name` mapping. + /// + /// If there is already a `name_ident` exists, return the existing id in a `Ok(Err(exist))`. + /// Otherwise, create `name -> id -> value` and returns the created id in a `Ok(Ok(created))`. + async fn create_id_value( + &self, + name_ident: &K, + value: &V, + override_exist: bool, + associated_records: A, + ) -> Result, SeqV>>, MetaTxnError> + where + K: kvapi::Key>, + K: KeyWithTenant, + K: Sync, + IdResource: TenantResource + Send + Sync + 'static, + ::ValueType: FromToProto, + TIdent>: kvapi::Key, + V: FromToProto + Sync + 'static, + A: Fn(DataId) -> Vec<(String, Vec)> + Send, + { + debug!(name_ident :? =name_ident; "SchemaApi: {}", func_name!()); + + let tenant = name_ident.tenant(); + + let mut trials = txn_backoff(None, func_name!()); + loop { + trials.next().unwrap()?.await; + + let mut txn = TxnRequest::default(); + + let mut current_id_seq = 0; + + { + let get_res = self.get_id_and_value(name_ident).await?; + + if let Some((seq_id, seq_meta)) = get_res { + if override_exist { + // Override take place only when the id -> value does not change. + // If it does not override, no such condition is required. + let id_ident = seq_id.data.into_t_ident(tenant); + txn.condition.push(txn_cond_eq_seq(&id_ident, seq_meta.seq)); + txn.if_then.push(txn_op_del(&id_ident)); + + // Following txn must match this seq to proceed. + current_id_seq = seq_id.seq; + + // Remove existing associated. + let kvs = associated_records(seq_id.data); + for (k, _v) in kvs { + txn.if_then.push(TxnOp::delete(k)); + } + } else { + return Ok(Err(seq_id)); + } + }; + } + + let idu64 = fetch_id(self, IdGenerator::generic()).await?; + let id = DataId::::new(idu64); + let id_ident = id.into_t_ident(name_ident.tenant()); + debug!(id :? = id,name_ident :? =name_ident; "new id"); + + txn.condition + .extend(vec![txn_cond_eq_seq(name_ident, current_id_seq)]); + + txn.if_then.push(txn_op_put_pb(name_ident, &id)?); // (tenant, name) -> id + txn.if_then.push(txn_op_put_pb(&id_ident, value)?); // (id) -> value + + // Add associated + let kvs = associated_records(id); + for (k, v) in kvs { + txn.if_then.push(TxnOp::put(k, v)); + } + + let (succ, _responses) = send_txn(self, txn).await?; + debug!(name_ident :? =name_ident, id :? =&id_ident,succ = succ; "{}", func_name!()); + + if succ { + return Ok(Ok(id)); + } + } + } +} + +impl BaseApi for T where T: KVApi + ?Sized {} diff --git a/src/meta/api/src/kv_app_error.rs b/src/meta/api/src/kv_app_error.rs index 2f0e29d858c6..57f1ce1d35b5 100644 --- a/src/meta/api/src/kv_app_error.rs +++ b/src/meta/api/src/kv_app_error.rs @@ -27,6 +27,8 @@ use databend_common_meta_types::MetaError; use databend_common_meta_types::MetaNetworkError; use tonic::Status; +use crate::meta_txn_error::MetaTxnError; + /// Errors for a kvapi::KVApi based application, such SchemaApi, ShareApi. /// /// There are three subset of errors in it: @@ -61,6 +63,15 @@ impl From for ErrorCode { } } +impl From for KVAppError { + fn from(value: MetaTxnError) -> Self { + match value { + MetaTxnError::TxnRetryMaxTimes(e) => Self::AppError(AppError::from(e)), + MetaTxnError::MetaError(e) => Self::MetaError(e), + } + } +} + impl From for KVAppError { fn from(value: TxnRetryMaxTimes) -> Self { KVAppError::AppError(AppError::from(value)) diff --git a/src/meta/api/src/lib.rs b/src/meta/api/src/lib.rs index 84a513242845..94ed76cc7c79 100644 --- a/src/meta/api/src/lib.rs +++ b/src/meta/api/src/lib.rs @@ -21,10 +21,12 @@ extern crate databend_common_meta_types; mod background_api; mod background_api_impl; mod background_api_test_suite; +pub mod base_api; mod data_mask_api; mod data_mask_api_impl; pub mod kv_app_error; pub mod kv_pb_api; +pub mod meta_txn_error; pub mod reply; mod schema_api; mod schema_api_impl; diff --git a/src/meta/api/src/meta_txn_error.rs b/src/meta/api/src/meta_txn_error.rs new file mode 100644 index 000000000000..a6db0a044fa7 --- /dev/null +++ b/src/meta/api/src/meta_txn_error.rs @@ -0,0 +1,35 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_app::app_error::TxnRetryMaxTimes; +use databend_common_meta_types::InvalidArgument; +use databend_common_meta_types::MetaError; +use databend_common_meta_types::MetaNetworkError; + +/// A non-business error occurs when executing a meta-service transaction. +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +pub enum MetaTxnError { + #[error(transparent)] + TxnRetryMaxTimes(#[from] TxnRetryMaxTimes), + + #[error("fail to access meta-store: {0}")] + MetaError(#[from] MetaError), +} + +impl From for MetaTxnError { + fn from(value: InvalidArgument) -> Self { + let network_error = MetaNetworkError::from(value); + Self::MetaError(MetaError::from(network_error)) + } +} diff --git a/src/meta/api/src/schema_api.rs b/src/meta/api/src/schema_api.rs index 065f55ac88b6..13b23897b37b 100644 --- a/src/meta/api/src/schema_api.rs +++ b/src/meta/api/src/schema_api.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use databend_common_meta_app::schema::catalog_id_ident::CatalogId; +use databend_common_meta_app::schema::index_id_ident::IndexId; use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent; use databend_common_meta_app::schema::CatalogInfo; use databend_common_meta_app::schema::CatalogMeta; @@ -39,8 +40,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; -use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; use databend_common_meta_app::schema::DropTableReply; @@ -58,6 +57,7 @@ use databend_common_meta_app::schema::GetTableCopiedFileReply; use databend_common_meta_app::schema::GetTableCopiedFileReq; use databend_common_meta_app::schema::GetTableReq; use databend_common_meta_app::schema::IndexMeta; +use databend_common_meta_app::schema::IndexNameIdent; use databend_common_meta_app::schema::ListCatalogReq; use databend_common_meta_app::schema::ListDatabaseReq; use databend_common_meta_app::schema::ListDictionaryReq; @@ -143,7 +143,13 @@ pub trait SchemaApi: Send + Sync { async fn create_index(&self, req: CreateIndexReq) -> Result; - async fn drop_index(&self, req: DropIndexReq) -> Result; + /// Drop index and returns the dropped id and meta. + /// + /// If there is no such record, it returns `Ok(None)`. + async fn drop_index( + &self, + name_ident: &IndexNameIdent, + ) -> Result, SeqV)>, KVAppError>; async fn get_index(&self, req: GetIndexReq) -> Result; diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 774bea460556..aef546ba45f8 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -28,16 +28,13 @@ use databend_common_meta_app::app_error::CannotAccessShareTable; use databend_common_meta_app::app_error::CommitTableMetaError; use databend_common_meta_app::app_error::CreateAsDropTableWithoutDropTime; use databend_common_meta_app::app_error::CreateDatabaseWithDropTime; -use databend_common_meta_app::app_error::CreateIndexWithDropTime; use databend_common_meta_app::app_error::CreateTableWithDropTime; use databend_common_meta_app::app_error::DatabaseAlreadyExists; use databend_common_meta_app::app_error::DictionaryAlreadyExists; use databend_common_meta_app::app_error::DropDbWithDropTime; -use databend_common_meta_app::app_error::DropIndexWithDropTime; use databend_common_meta_app::app_error::DropTableWithDropTime; use databend_common_meta_app::app_error::DuplicatedIndexColumnId; use databend_common_meta_app::app_error::GetIndexWithDropTime; -use databend_common_meta_app::app_error::IndexAlreadyExists; use databend_common_meta_app::app_error::IndexColumnIdNotFound; use databend_common_meta_app::app_error::MultiStmtTxnCommitFailed; use databend_common_meta_app::app_error::ShareHasNoGrantedPrivilege; @@ -53,7 +50,6 @@ use databend_common_meta_app::app_error::UndropTableHasNoHistory; use databend_common_meta_app::app_error::UndropTableWithNoDropTime; use databend_common_meta_app::app_error::UnknownDatabaseId; use databend_common_meta_app::app_error::UnknownDictionary; -use databend_common_meta_app::app_error::UnknownIndex; use databend_common_meta_app::app_error::UnknownStreamId; use databend_common_meta_app::app_error::UnknownTable; use databend_common_meta_app::app_error::UnknownTableId; @@ -65,6 +61,10 @@ use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::schema::catalog_id_ident::CatalogId; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdentRaw; +use databend_common_meta_app::schema::index_id_ident::IndexId; +use databend_common_meta_app::schema::index_id_ident::IndexIdIdent; +use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent; +use databend_common_meta_app::schema::index_name_ident::IndexName; use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent; use databend_common_meta_app::schema::CatalogIdIdent; use databend_common_meta_app::schema::CatalogIdToNameIdent; @@ -102,8 +102,6 @@ use databend_common_meta_app::schema::DictionaryIdentity; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; -use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; use databend_common_meta_app::schema::DropTableReply; @@ -121,8 +119,6 @@ use databend_common_meta_app::schema::GetLVTReq; use databend_common_meta_app::schema::GetTableCopiedFileReply; use databend_common_meta_app::schema::GetTableCopiedFileReq; use databend_common_meta_app::schema::GetTableReq; -use databend_common_meta_app::schema::IndexId; -use databend_common_meta_app::schema::IndexIdToName; use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::IndexNameIdent; use databend_common_meta_app::schema::IndexNameIdentRaw; @@ -192,11 +188,11 @@ use databend_common_meta_app::share::ShareObject; use databend_common_meta_app::share::ShareSpec; use databend_common_meta_app::share::ShareVecTableInfo; use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::tenant_key::errors::UnknownError; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::Key; -use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::anyerror::AnyError; use databend_common_meta_types::protobuf as pb; use databend_common_meta_types::seq_value::KVMeta; @@ -225,6 +221,7 @@ use log::warn; use ConditionResult::Eq; use crate::assert_table_exist; +use crate::base_api::BaseApi; use crate::convert_share_meta_to_spec; use crate::db_has_to_exist; use crate::deserialize_struct; @@ -237,6 +234,7 @@ use crate::get_u64_value; use crate::is_db_need_to_be_remove; use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; +use crate::kv_pb_api::UpsertPB; use crate::list_keys; use crate::list_u64_value; use crate::remove_db_from_share; @@ -963,105 +961,45 @@ impl + ?Sized> SchemaApi for KV { async fn create_index(&self, req: CreateIndexReq) -> Result { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); - let tenant_index = &req.name_ident; - - if req.meta.dropped_on.is_some() { - return Err(KVAppError::AppError(AppError::CreateIndexWithDropTime( - CreateIndexWithDropTime::new(tenant_index.index_name()), - ))); - } - - let mut trials = txn_backoff(None, func_name!()); - loop { - trials.next().unwrap()?.await; - - // Get index by name to ensure absence - let (index_id_seq, index_id) = get_u64_value(self, tenant_index).await?; - debug!( - index_id_seq = index_id_seq, - index_id = index_id, - tenant_index :? =(tenant_index); - "get_index_seq_id" - ); - - let mut txn = TxnRequest::default(); + let name_ident = &req.name_ident; + let meta = &req.meta; + let overriding = req.create_option.is_overriding(); + let name_ident_raw = serialize_struct(&IndexNameIdentRaw::from(name_ident))?; + + let create_res = self + .create_id_value(name_ident, meta, overriding, |id| { + vec![( + IndexIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(), + name_ident_raw.clone(), + )] + }) + .await?; - let (_, index_id_seq) = if index_id_seq > 0 { - match req.create_option { - CreateOption::Create => { - return Err(KVAppError::AppError(AppError::IndexAlreadyExists( - IndexAlreadyExists::new( - tenant_index.index_name(), - format!("create index with tenant: {}", tenant_index.tenant_name()), - ), - ))); - } - CreateOption::CreateIfNotExists => { - return Ok(CreateIndexReply { index_id }); - } - CreateOption::CreateOrReplace => { - construct_drop_index_txn_operations( - self, - tenant_index, - false, - false, - &mut txn, - ) - .await? - } + match create_res { + Ok(id) => Ok(CreateIndexReply { index_id: *id }), + Err(existent) => match req.create_option { + CreateOption::Create => { + Err(AppError::from(name_ident.exist_error(func_name!())).into()) } - } else { - (0, 0) - }; - - // Create index by inserting these record: - // (tenant, index_name) -> index_id - // (index_id) -> index_meta - // (index_id) -> (tenant,index_name) - - let index_id = fetch_id(self, IdGenerator::index_id()).await?; - let id_key = IndexId { index_id }; - let id_to_name_key = IndexIdToName { index_id }; - - debug!( - index_id = index_id, - index_key :? =(tenant_index); - "new index id" - ); - - { - txn.condition.extend(vec![ - txn_cond_seq(tenant_index, Eq, index_id_seq), - txn_cond_seq(&id_to_name_key, Eq, 0), - ]); - txn.if_then.extend(vec![ - txn_op_put(tenant_index, serialize_u64(index_id)?), /* (tenant, index_name) -> index_id */ - txn_op_put(&id_key, serialize_struct(&req.meta)?), // (index_id) -> index_meta - txn_op_put(&id_to_name_key, serialize_struct(&IndexNameIdentRaw::from(tenant_index))?), /* __fd_index_id_to_name/ -> (tenant,index_name) */ - ]); - - let (succ, _responses) = send_txn(self, txn).await?; - - debug!( - index_name :? =(tenant_index), - id :? =(&id_key), - succ = succ; - "create_index" - ); - - if succ { - return Ok(CreateIndexReply { index_id }); + CreateOption::CreateIfNotExists => Ok(CreateIndexReply { + index_id: *existent.data, + }), + CreateOption::CreateOrReplace => { + unreachable!( + "create_index: CreateOrReplace should never conflict with existent" + ); } - } + }, } } #[logcall::logcall] #[fastrace::trace] - async fn drop_index(&self, req: DropIndexReq) -> Result { - debug!(req :? =(&req); "SchemaApi: {}", func_name!()); - - let tenant_index = &req.name_ident; + async fn drop_index( + &self, + name_ident: &IndexNameIdent, + ) -> Result, SeqV)>, KVAppError> { + debug!(req :? =(&name_ident); "SchemaApi: {}", func_name!()); let mut trials = txn_backoff(None, func_name!()); loop { @@ -1069,33 +1007,26 @@ impl + ?Sized> SchemaApi for KV { let mut txn = TxnRequest::default(); - let (index_id, index_id_seq) = construct_drop_index_txn_operations( - self, - tenant_index, - req.if_exists, - true, - &mut txn, - ) - .await?; + let get_res = self.get_id_and_value(name_ident).await?; + let Some((seq_id, seq_meta)) = get_res else { + return Ok(None); + }; - if index_id_seq == 0 { - return Ok(DropIndexReply {}); - } + let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); + let id_to_name_ident = + IndexIdToNameIdent::new_generic(name_ident.tenant(), seq_id.data); - let (succ, _responses) = send_txn(self, txn).await?; + txn_delete_exact(&mut txn, name_ident, seq_id.seq); + txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); + txn.if_then.push(txn_op_del(&id_to_name_ident)); - debug!( - name :? =(tenant_index), - id :? =(&IndexId { index_id }), - succ = succ; - "drop_index" - ); + let (succ, _responses) = send_txn(self, txn).await?; + debug!(name_ident :? =name_ident,id :? = seq_id,succ = succ;"drop_index"); if succ { - break; + return Ok(Some((seq_id, seq_meta))); } } - Ok(DropIndexReply {}) } #[logcall::logcall] @@ -1110,8 +1041,8 @@ impl + ?Sized> SchemaApi for KV { let (index_id_seq, index_id, _, index_meta) = res; if index_id_seq == 0 { - return Err(KVAppError::AppError(AppError::UnknownIndex( - UnknownIndex::new(tenant_index.index_name(), "get_index"), + return Err(KVAppError::AppError(AppError::from( + tenant_index.unknown_error("get_index"), ))); } @@ -1142,22 +1073,22 @@ impl + ?Sized> SchemaApi for KV { async fn update_index(&self, req: UpdateIndexReq) -> Result { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); - let index_id_key = IndexId { - index_id: req.index_id, - }; + let tenant = &req.tenant; + let index_id = IndexId::new(req.index_id); + let id_ident = IndexIdIdent::new_generic(tenant, index_id); let reply = self - .upsert_kv(UpsertKVReq::new( - index_id_key.to_string_key(), + .upsert_pb(&UpsertPB::new( + id_ident, MatchSeq::GE(1), - Operation::Update(serialize_struct(&req.index_meta)?), + Operation::Update(req.index_meta), None, )) .await?; if !reply.is_changed() { Err(KVAppError::AppError(AppError::UnknownIndex( - UnknownIndex::new(&req.index_name, "update_index"), + UnknownError::new(index_id.to_string(), func_name!()), ))) } else { Ok(UpdateIndexReply {}) @@ -1194,7 +1125,7 @@ impl + ?Sized> SchemaApi for KV { // filter the dropped indexes. let index_metas = { - let index_metas = get_index_metas_by_ids(self, id_name_list).await?; + let index_metas = get_index_metas_by_ids(self, &req.tenant, id_name_list).await?; index_metas .into_iter() .filter(|(_, _, meta)| { @@ -1238,7 +1169,7 @@ impl + ?Sized> SchemaApi for KV { } let index_ids = { - let index_metas = get_index_metas_by_ids(self, id_name_list).await?; + let index_metas = get_index_metas_by_ids(self, &req.tenant, id_name_list).await?; index_metas .into_iter() .filter(|(_, _, meta)| req.table_id == meta.table_id) @@ -1277,7 +1208,7 @@ impl + ?Sized> SchemaApi for KV { } let indexes = { - let index_metas = get_index_metas_by_ids(self, id_name_list).await?; + let index_metas = get_index_metas_by_ids(self, &req.tenant, id_name_list).await?; index_metas .into_iter() .filter(|(_, _, meta)| req.table_id == meta.table_id) @@ -3315,9 +3246,10 @@ impl + ?Sized> SchemaApi for KV { if indexes.contains_key(&req.name) { match req.create_option { CreateOption::Create => { - return Err(KVAppError::AppError(AppError::IndexAlreadyExists( - IndexAlreadyExists::new(&req.name, "create table index".to_string()), - ))); + return Err(AppError::IndexAlreadyExists( + IndexNameIdent::new(&req.tenant, &req.name).exist_error(func_name!()), + ) + .into()); } CreateOption::CreateIfNotExists => { return Ok(()); @@ -3421,7 +3353,7 @@ impl + ?Sized> SchemaApi for KV { let indexes = &mut table_meta.indexes; if !indexes.contains_key(&req.name) && !req.if_exists { return Err(KVAppError::AppError(AppError::UnknownIndex( - UnknownIndex::new(&req.name, "drop table index".to_string()), + UnknownError::::new(req.name.clone(), "drop table index"), ))); } indexes.remove(&req.name); @@ -3438,12 +3370,7 @@ impl + ?Sized> SchemaApi for KV { }; let (succ, _responses) = send_txn(self, txn_req).await?; - - debug!( - id :? =(&tbid), - succ = succ; - "drop_table_index" - ); + debug!(id :? =(&tbid),succ = succ;"drop_table_index"); if succ { return Ok(()); @@ -4459,60 +4386,6 @@ async fn construct_drop_virtual_column_txn_operations( Ok(seq) } -async fn construct_drop_index_txn_operations( - kv_api: &(impl kvapi::KVApi + ?Sized), - tenant_index: &IndexNameIdent, - drop_if_exists: bool, - if_delete: bool, - txn: &mut TxnRequest, -) -> Result<(u64, u64), KVAppError> { - let res = get_index_or_err(kv_api, tenant_index).await?; - - let (index_id_seq, index_id, index_meta_seq, index_meta) = res; - - if index_id_seq == 0 { - return if drop_if_exists { - Ok((index_id, index_id_seq)) - } else { - return Err(KVAppError::AppError(AppError::UnknownIndex( - UnknownIndex::new(tenant_index.index_name(), "drop_index"), - ))); - }; - } - - let index_id_key = IndexId { index_id }; - // Safe unwrap(): index_meta_seq > 0 implies index_meta is not None. - let mut index_meta = index_meta.unwrap(); - - debug!(index_id = index_id, name_key :? =(tenant_index); "drop_index"); - - // drop an index with drop time - if index_meta.dropped_on.is_some() { - return Err(KVAppError::AppError(AppError::DropIndexWithDropTime( - DropIndexWithDropTime::new(tenant_index.index_name()), - ))); - } - // update drop on time - index_meta.dropped_on = Some(Utc::now()); - - // Delete index by these operations: - // del (tenant, index_name) -> index_id - // set index_meta.drop_on = now and update (index_id) -> index_meta - txn.condition - .push(txn_cond_seq(&index_id_key, Eq, index_meta_seq)); - // (index_id) -> index_meta - txn.if_then - .push(txn_op_put(&index_id_key, serialize_struct(&index_meta)?)); - if if_delete { - txn.condition - .push(txn_cond_seq(tenant_index, Eq, index_id_seq)); - // (tenant, index_name) -> index_id - txn.if_then.push(txn_op_del(tenant_index)); - } - - Ok((index_id, index_id_seq)) -} - async fn construct_drop_table_txn_operations( kv_api: &(impl kvapi::KVApi + ?Sized), table_name: String, @@ -5362,7 +5235,7 @@ pub(crate) async fn get_index_or_err( name_key: &IndexNameIdent, ) -> Result<(u64, u64, u64, Option), KVAppError> { let (index_id_seq, index_id) = get_u64_value(kv_api, name_key).await?; - let id_key = IndexId { index_id }; + let id_key = IndexIdIdent::new_generic(name_key.tenant(), IndexId::new(index_id)); let (index_meta_seq, index_meta) = get_pb_value(kv_api, &id_key).await?; Ok((index_id_seq, index_id, index_meta_seq, index_meta)) @@ -5610,7 +5483,7 @@ async fn gc_dropped_table_index( // Get index ids of this table let index_ids = { - let index_metas = get_index_metas_by_ids(kv_api, id_name_list).await?; + let index_metas = get_index_metas_by_ids(kv_api, tenant, id_name_list).await?; index_metas .into_iter() .filter(|(_, _, meta)| table_id == meta.table_id) @@ -5620,7 +5493,7 @@ async fn gc_dropped_table_index( let id_to_name_keys = index_ids .iter() - .map(|id| IndexIdToName { index_id: *id }.to_string_key()) + .map(|id| IndexIdToNameIdent::new_generic(tenant, IndexId::new(*id)).to_string_key()) .collect::>(); // Get (tenant, index_name) list by index ids @@ -5641,14 +5514,13 @@ async fn gc_dropped_table_index( debug_assert_eq!(index_ids.len(), index_name_list.len()); for (index_id, index_name_ident_raw) in index_ids.iter().zip(index_name_list.iter()) { - let id_key = IndexId { - index_id: *index_id, - }; - let id_to_name_key = IndexIdToName { - index_id: *index_id, - }; - txn.if_then.push(txn_op_del(&id_key)); // (index_id) -> index_meta - txn.if_then.push(txn_op_del(&id_to_name_key)); // __fd_index_id_to_name/ -> (tenant,index_name) + let index_id = IndexId::new(*index_id); + + let id_ident = IndexIdIdent::new_generic(tenant, index_id); + let id_to_name_ident = IndexIdToNameIdent::new_generic(tenant, index_id); + + txn.if_then.push(txn_op_del(&id_ident)); // (index_id) -> index_meta + txn.if_then.push(txn_op_del(&id_to_name_ident)); // __fd_index_id_to_name/ -> (tenant,index_name) let index_name_ident = index_name_ident_raw.clone().to_tident(()); txn.if_then.push(txn_op_del(&index_name_ident)); // (tenant, index_name) -> index_id diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index 50dff3490bfa..3574f2ae443e 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -40,6 +40,9 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent; use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdentRaw; +use databend_common_meta_app::schema::index_id_ident::IndexId; +use databend_common_meta_app::schema::index_id_ident::IndexIdIdent; +use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent; use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent; use databend_common_meta_app::schema::CatalogMeta; use databend_common_meta_app::schema::CatalogNameIdent; @@ -67,7 +70,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryIdentity; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReq; use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; @@ -84,8 +86,6 @@ use databend_common_meta_app::schema::GetTableCopiedFileReq; use databend_common_meta_app::schema::GetTableReq; use databend_common_meta_app::schema::IcebergCatalogOption; use databend_common_meta_app::schema::IcebergRestCatalogOption; -use databend_common_meta_app::schema::IndexId; -use databend_common_meta_app::schema::IndexIdToName; use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::IndexNameIdent; use databend_common_meta_app::schema::IndexNameIdentRaw; @@ -3921,11 +3921,12 @@ impl SchemaApiTestSuite { // check table's indexes have been cleaned { - let id_key = IndexId { index_id }; - let id_to_name_key = IndexIdToName { index_id }; + let index_id = IndexId::new(index_id); + let id_ident = IndexIdIdent::new_generic(&tenant, index_id); + let id_to_name_key = IndexIdToNameIdent::new_generic(tenant, index_id); let agg_index_meta: Result = - get_kv_data(mt.as_kv_api(), &id_key).await; + get_kv_data(mt.as_kv_api(), &id_ident).await; let agg_index_name_ident: Result = get_kv_data(mt.as_kv_api(), &id_to_name_key).await; @@ -6078,6 +6079,7 @@ impl SchemaApiTestSuite { info!("--- create table index 1"); let req = CreateTableIndexReq { create_option: CreateOption::Create, + tenant: tenant.clone(), table_id, name: index_name_1.clone(), column_ids: index_column_ids_1.clone(), @@ -6091,6 +6093,7 @@ impl SchemaApiTestSuite { let req = CreateTableIndexReq { create_option: CreateOption::Create, table_id, + tenant: tenant.clone(), name: index_name_2.clone(), column_ids: index_column_ids_1.clone(), sync_creation: true, @@ -6103,6 +6106,7 @@ impl SchemaApiTestSuite { let req = CreateTableIndexReq { create_option: CreateOption::Create, table_id, + tenant: tenant.clone(), name: index_name_2.clone(), column_ids: index_column_ids_2.clone(), sync_creation: true, @@ -6117,6 +6121,7 @@ impl SchemaApiTestSuite { let req = CreateTableIndexReq { create_option: CreateOption::Create, table_id, + tenant: tenant.clone(), name: index_name_1.clone(), column_ids: index_column_ids_1.clone(), sync_creation: true, @@ -6136,6 +6141,7 @@ impl SchemaApiTestSuite { let req = CreateTableIndexReq { create_option: CreateOption::CreateIfNotExists, table_id, + tenant: tenant.clone(), name: index_name_1.clone(), column_ids: index_column_ids_1.clone(), sync_creation: true, @@ -6151,6 +6157,7 @@ impl SchemaApiTestSuite { let req = CreateTableIndexReq { create_option: CreateOption::Create, table_id, + tenant: tenant.clone(), name: index_name_3.clone(), column_ids: index_column_ids_3.clone(), sync_creation: true, @@ -6248,8 +6255,8 @@ impl SchemaApiTestSuite { created_on, dropped_on: None, updated_on: None, - original_query: "SELECT a, SUM(b) FROM tb1 WHERE a > 1 GROUP BY b".to_string(), - query: "SELECT a, SUM(b) FROM tb1 WHERE a > 1 GROUP BY b".to_string(), + original_query: "SELECT a".to_string(), + query: "SELECT b".to_string(), sync_creation: false, }; @@ -6288,6 +6295,14 @@ impl SchemaApiTestSuite { let res = mt.create_index(req).await?; index_id = res.index_id; + // check reverse index id -> name + { + let index_id = IndexId::new(index_id); + let id_ident = IndexIdToNameIdent::new_generic(&tenant, index_id); + let raw_name: IndexNameIdentRaw = get_kv_data(mt.as_kv_api(), &id_ident).await?; + assert_eq!(name_ident_1.to_raw(), raw_name); + } + let req = CreateIndexReq { create_option: CreateOption::Create, name_ident: name_ident_2.clone(), @@ -6347,13 +6362,9 @@ impl SchemaApiTestSuite { { info!("--- drop index"); - let req = DropIndexReq { - if_exists: false, - name_ident: name_ident_2.clone(), - }; - let res = mt.drop_index(req).await; - assert!(res.is_ok()) + let res = mt.drop_index(&name_ident_2).await?; + assert!(res.is_some()) } { @@ -6378,13 +6389,8 @@ impl SchemaApiTestSuite { { info!("--- list index after drop all"); - let req = DropIndexReq { - if_exists: false, - name_ident: name_ident_1.clone(), - }; - - let res = mt.drop_index(req).await; - assert!(res.is_ok()); + let res = mt.drop_index(&name_ident_1).await?; + assert!(res.is_some()); let req = ListIndexesReq::new(&tenant, Some(table_id)); @@ -6393,25 +6399,9 @@ impl SchemaApiTestSuite { } { - info!("--- drop index with if exists = false"); - let req = DropIndexReq { - if_exists: false, - name_ident: name_ident_1.clone(), - }; - - let res = mt.drop_index(req).await; - assert!(res.is_err()) - } - - { - info!("--- drop index with if exists = true"); - let req = DropIndexReq { - if_exists: true, - name_ident: name_ident_1.clone(), - }; - - let res = mt.drop_index(req).await; - assert!(res.is_ok()) + info!("--- drop unknown index"); + let res = mt.drop_index(&name_ident_1).await?; + assert!(res.is_none()) } // create or replace index @@ -6430,11 +6420,10 @@ impl SchemaApiTestSuite { }; let res = mt.create_index(req).await?; - let old_index_id = res.index_id; - let old_index_id_key = IndexId { - index_id: old_index_id, - }; - let meta: IndexMeta = get_kv_data(mt.as_kv_api(), &old_index_id_key).await?; + let old_index_id = IndexId::new(res.index_id); + let old_index_id_ident = old_index_id.into_t_ident(&tenant); + + let meta: IndexMeta = get_kv_data(mt.as_kv_api(), &old_index_id_ident).await?; assert_eq!(meta, index_meta_1); let resp = mt.get_index(get_req.clone()).await?; @@ -6450,13 +6439,28 @@ impl SchemaApiTestSuite { let res = mt.create_index(req).await?; // assert old index id key has been deleted - let meta: IndexMeta = get_kv_data(mt.as_kv_api(), &old_index_id_key).await?; - assert!(meta.dropped_on.is_some()); + let meta: Result = get_kv_data(mt.as_kv_api(), &old_index_id_ident).await; + assert_eq!( + meta.unwrap_err().to_string(), + "fail to access meta-store: fail to get_kv_data: not found, source: " + ); + + // assert old id-to-name has been deleted. + { + let old_index_id_to_name_ident = + IndexIdToNameIdent::new_generic(&tenant, old_index_id); + let meta: Result = + get_kv_data(mt.as_kv_api(), &old_index_id_to_name_ident).await; + assert_eq!( + meta.unwrap_err().to_string(), + "fail to access meta-store: fail to get_kv_data: not found, source: " + ); + } // assert new index id key has been created - let index_id = res.index_id; - let index_id_key = IndexId { index_id }; - let meta: IndexMeta = get_kv_data(mt.as_kv_api(), &index_id_key).await?; + let index_id = IndexId::new(res.index_id); + let index_id_ident = index_id.into_t_ident(&tenant); + let meta: IndexMeta = get_kv_data(mt.as_kv_api(), &index_id_ident).await?; assert_eq!(meta, index_meta_2); let resp = mt.get_index(get_req).await?; diff --git a/src/meta/api/src/util.rs b/src/meta/api/src/util.rs index cc51fd7cce37..d457b024cde1 100644 --- a/src/meta/api/src/util.rs +++ b/src/meta/api/src/util.rs @@ -33,12 +33,13 @@ use databend_common_meta_app::app_error::WrongShareObject; use databend_common_meta_app::primitive::Id; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdentRaw; +use databend_common_meta_app::schema::index_id_ident::IndexId; +use databend_common_meta_app::schema::index_id_ident::IndexIdIdent; use databend_common_meta_app::schema::DBIdTableName; use databend_common_meta_app::schema::DatabaseId; use databend_common_meta_app::schema::DatabaseIdToName; use databend_common_meta_app::schema::DatabaseMeta; use databend_common_meta_app::schema::DatabaseType; -use databend_common_meta_app::schema::IndexId; use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::ShareDBParams; use databend_common_meta_app::schema::TableId; @@ -70,6 +71,7 @@ use databend_common_meta_app::share::ShareReferenceTable; use databend_common_meta_app::share::ShareSpec; use databend_common_meta_app::share::ShareTable; use databend_common_meta_app::share::ShareTableSpec; +use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; @@ -279,7 +281,7 @@ pub fn deserialize_u64(v: &[u8]) -> Result { pub async fn fetch_id( kv_api: &(impl kvapi::KVApi + ?Sized), generator: T, -) -> Result { +) -> Result { let res = kv_api .upsert_kv(UpsertKVReq { key: generator.to_string_key(), @@ -325,10 +327,11 @@ where T: FromToProto { pub async fn send_txn( kv_api: &(impl kvapi::KVApi + ?Sized), txn_req: TxnRequest, -) -> Result<(bool, Vec), KVAppError> { +) -> Result<(bool, Vec), MetaError> { debug!("send txn: {}", txn_req); let tx_reply = kv_api.transaction(txn_req).await?; let (succ, responses) = txn_reply_to_api_result(tx_reply)?; + debug!("txn success: {}", succ); Ok((succ, responses)) } @@ -1277,13 +1280,15 @@ pub async fn get_table_info_by_share( pub async fn get_index_metas_by_ids( kv_api: &(impl kvapi::KVApi + ?Sized), + tenant: &Tenant, id_name_list: Vec<(u64, String)>, ) -> Result, KVAppError> { let mut index_meta_keys = Vec::with_capacity(id_name_list.len()); for (id, _) in id_name_list.iter() { - let index_id = IndexId { index_id: *id }; + let index_id = IndexId::new(*id); + let id_ident = IndexIdIdent::new_generic(tenant, index_id); - index_meta_keys.push(index_id.to_string_key()); + index_meta_keys.push(id_ident.to_string_key()); } let seq_index_metas = kv_api.mget_kv(&index_meta_keys).await?; diff --git a/src/meta/app/src/app_error.rs b/src/meta/app/src/app_error.rs index d527bf6f59da..efe8b4b9671b 100644 --- a/src/meta/app/src/app_error.rs +++ b/src/meta/app/src/app_error.rs @@ -20,6 +20,7 @@ use databend_common_meta_types::MatchSeq; use crate::background::job_ident; use crate::data_mask::data_mask_name_ident; use crate::schema::catalog_name_ident; +use crate::schema::index_name_ident; use crate::tenant_key::errors::ExistError; use crate::tenant_key::errors::UnknownError; use crate::tenant_key::ident::TIdent; @@ -814,38 +815,6 @@ impl CreateIndexWithDropTime { } } -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] -#[error("IndexAlreadyExists: `{index_name}` while `{context}`")] -pub struct IndexAlreadyExists { - index_name: String, - context: String, -} - -impl IndexAlreadyExists { - pub fn new(index_name: impl Into, context: impl Into) -> Self { - Self { - index_name: index_name.into(), - context: context.into(), - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] -#[error("UnknownIndex: `{index_name}` while `{context}`")] -pub struct UnknownIndex { - index_name: String, - context: String, -} - -impl UnknownIndex { - pub fn new(index_name: impl Into, context: impl Into) -> Self { - Self { - index_name: index_name.into(), - context: context.into(), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[error("DropIndexWithDropTime: drop {index_name} with drop time")] pub struct DropIndexWithDropTime { @@ -1178,10 +1147,10 @@ pub enum AppError { CreateIndexWithDropTime(#[from] CreateIndexWithDropTime), #[error(transparent)] - IndexAlreadyExists(#[from] IndexAlreadyExists), + IndexAlreadyExists(#[from] ExistError), #[error(transparent)] - UnknownIndex(#[from] UnknownIndex), + UnknownIndex(#[from] UnknownError), #[error(transparent)] DropIndexWithDropTime(#[from] DropIndexWithDropTime), @@ -1546,18 +1515,6 @@ impl AppErrorMessage for CreateIndexWithDropTime { } } -impl AppErrorMessage for IndexAlreadyExists { - fn message(&self) -> String { - format!("Index '{}' already exists", self.index_name) - } -} - -impl AppErrorMessage for UnknownIndex { - fn message(&self) -> String { - format!("Unknown index '{}'", self.index_name) - } -} - impl AppErrorMessage for DropIndexWithDropTime { fn message(&self) -> String { format!("Drop Index '{}' with drop time", self.index_name) diff --git a/src/meta/app/src/id_generator.rs b/src/meta/app/src/id_generator.rs index d9ab6602e5fa..5d08f98fac47 100644 --- a/src/meta/app/src/id_generator.rs +++ b/src/meta/app/src/id_generator.rs @@ -14,6 +14,7 @@ use databend_common_meta_kvapi::kvapi; +pub(crate) const ID_GEN_GENERIC: &str = "generic"; pub(crate) const ID_GEN_TABLE: &str = "table_id"; pub(crate) const ID_GEN_DATABASE: &str = "database_id"; pub(crate) const ID_GEN_TABLE_LOCK: &str = "table_lock_id"; @@ -39,6 +40,13 @@ pub struct IdGenerator { } impl IdGenerator { + /// Create a key for generating generic id + pub fn generic() -> Self { + Self { + resource: ID_GEN_GENERIC.to_string(), + } + } + /// Create a key for generating table id with kvapi::KVApi pub fn table_id() -> Self { Self { diff --git a/src/meta/app/src/schema/catalog_id_ident.rs b/src/meta/app/src/schema/catalog_id_ident.rs index 577f37ad2486..55be4bfb5f2b 100644 --- a/src/meta/app/src/schema/catalog_id_ident.rs +++ b/src/meta/app/src/schema/catalog_id_ident.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::data_id::DataId; +use crate::tenant::ToTenant; use crate::tenant_key::ident::TIdent; use crate::tenant_key::raw::TIdentRaw; @@ -22,9 +24,6 @@ pub type CatalogIdIdentRaw = TIdentRaw; pub use kvapi_impl::Resource; -use crate::data_id::DataId; -use crate::tenant::ToTenant; - impl CatalogIdIdent { pub fn new(tenant: impl ToTenant, catalog_id: u64) -> Self { Self::new_generic(tenant, CatalogId::new(catalog_id)) diff --git a/src/meta/app/src/schema/create_option.rs b/src/meta/app/src/schema/create_option.rs index 9c1144c0cce2..8902b9653f4e 100644 --- a/src/meta/app/src/schema/create_option.rs +++ b/src/meta/app/src/schema/create_option.rs @@ -48,3 +48,13 @@ impl From for MatchSeq { } } } + +impl CreateOption { + pub fn is_overriding(&self) -> bool { + matches!(self, CreateOption::CreateOrReplace) + } + + pub fn if_return_error(&self) -> bool { + matches!(self, CreateOption::Create) + } +} diff --git a/src/meta/app/src/schema/index.rs b/src/meta/app/src/schema/index.rs index 06494b46cfb0..cf1511adf7fc 100644 --- a/src/meta/app/src/schema/index.rs +++ b/src/meta/app/src/schema/index.rs @@ -26,34 +26,6 @@ use crate::tenant::Tenant; use crate::tenant::ToTenant; use crate::KeyWithTenant; -#[derive(Clone, Debug, Eq, PartialEq, Default)] -pub struct IndexIdToName { - pub index_id: u64, -} - -impl Display for IndexIdToName { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "{}", self.index_id) - } -} - -#[derive(Clone, Debug, Default, Eq, PartialEq)] -pub struct IndexId { - pub index_id: u64, -} - -impl IndexId { - pub fn new(index_id: u64) -> IndexId { - IndexId { index_id } - } -} - -impl Display for IndexId { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", self.index_id) - } -} - #[derive( serde::Serialize, serde::Deserialize, @@ -167,9 +139,6 @@ impl Display for DropIndexReq { } } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct DropIndexReply {} - #[derive(Clone, Debug, PartialEq, Eq)] pub struct GetIndexReq { pub name_ident: IndexNameIdent, @@ -186,20 +155,20 @@ impl Display for GetIndexReq { } } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct GetIndexReply { pub index_id: u64, pub index_meta: IndexMeta, } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct UpdateIndexReq { + pub tenant: Tenant, pub index_id: u64, - pub index_name: String, pub index_meta: IndexMeta, } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct UpdateIndexReply {} #[derive(Clone, Debug, PartialEq, Eq)] @@ -231,72 +200,3 @@ impl ListIndexesByIdReq { } } } - -mod kvapi_key_impl { - use databend_common_meta_kvapi::kvapi; - - use crate::schema::IndexId; - use crate::schema::IndexIdToName; - use crate::schema::IndexMeta; - use crate::schema::IndexNameIdentRaw; - - impl kvapi::KeyCodec for IndexId { - fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder { - b.push_u64(self.index_id) - } - - fn decode_key(parser: &mut kvapi::KeyParser) -> Result { - let index_id = parser.next_u64()?; - - Ok(Self { index_id }) - } - } - - /// "/" - impl kvapi::Key for IndexId { - const PREFIX: &'static str = "__fd_index_by_id"; - - type ValueType = IndexMeta; - - fn parent(&self) -> Option { - None - } - } - - impl kvapi::KeyCodec for IndexIdToName { - fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder { - b.push_u64(self.index_id) - } - - fn decode_key(parser: &mut kvapi::KeyParser) -> Result { - let index_id = parser.next_u64()?; - - Ok(Self { index_id }) - } - } - - /// "/ -> IndexNameIdentRaw" - impl kvapi::Key for IndexIdToName { - const PREFIX: &'static str = "__fd_index_id_to_name"; - - type ValueType = IndexNameIdentRaw; - - fn parent(&self) -> Option { - Some(IndexId::new(self.index_id).to_string_key()) - } - } - - impl kvapi::Value for IndexMeta { - type KeyType = IndexId; - fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { - [] - } - } - - impl kvapi::Value for IndexNameIdentRaw { - type KeyType = IndexIdToName; - fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { - [] - } - } -} diff --git a/src/meta/app/src/schema/index_id_ident.rs b/src/meta/app/src/schema/index_id_ident.rs new file mode 100644 index 000000000000..9f58edb47b42 --- /dev/null +++ b/src/meta/app/src/schema/index_id_ident.rs @@ -0,0 +1,82 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::data_id::DataId; +use crate::tenant_key::ident::TIdent; +use crate::tenant_key::raw::TIdentRaw; + +pub type IndexId = DataId; + +pub type IndexIdIdent = TIdent; +pub type IndexIdIdentRaw = TIdentRaw; + +pub use kvapi_impl::IndexIdResource; + +mod kvapi_impl { + + use databend_common_meta_kvapi::kvapi; + + use crate::schema::index_id_ident::IndexIdIdent; + use crate::schema::IndexMeta; + use crate::tenant_key::resource::TenantResource; + + pub struct IndexIdResource; + impl TenantResource for IndexIdResource { + const PREFIX: &'static str = "__fd_index_by_id"; + const TYPE: &'static str = "IndexIdIdent"; + const HAS_TENANT: bool = false; + type ValueType = IndexMeta; + } + + impl kvapi::Value for IndexMeta { + type KeyType = IndexIdIdent; + + // todo!("IndexId being parent of IndexIdToName can be described with dependency_keys") + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::IndexId; + use super::IndexIdIdent; + use crate::tenant::Tenant; + + #[test] + fn test_index_id_ident() { + let tenant = Tenant::new_literal("dummy"); + let ident = IndexIdIdent::new_generic(tenant, IndexId::new(3)); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_index_by_id/3"); + + assert_eq!(ident, IndexIdIdent::from_str_key(&key).unwrap()); + } + + #[test] + fn test_background_job_id_ident_with_key_space() { + // TODO(xp): implement this test + // let tenant = Tenant::new_literal("test"); + // let ident = IndexIdIdent::new(tenant, 3); + // + // let key = ident.to_string_key(); + // assert_eq!(key, "__fd_catalog_by_id/3"); + // + // assert_eq!(ident, IndexIdIdent::from_str_key(&key).unwrap()); + } +} diff --git a/src/meta/app/src/schema/index_id_to_name_ident.rs b/src/meta/app/src/schema/index_id_to_name_ident.rs new file mode 100644 index 000000000000..6d2a8094f05d --- /dev/null +++ b/src/meta/app/src/schema/index_id_to_name_ident.rs @@ -0,0 +1,79 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::schema::index_id_ident::IndexId; +use crate::tenant_key::ident::TIdent; +use crate::tenant_key::raw::TIdentRaw; + +pub type IndexIdToNameIdent = TIdent; +pub type IndexIdToNameIdentRaw = TIdentRaw; + +pub use kvapi_impl::IndexIdToName; + +mod kvapi_impl { + + use databend_common_meta_kvapi::kvapi; + + use crate::schema::index_id_to_name_ident::IndexIdToNameIdent; + use crate::schema::IndexNameIdentRaw; + use crate::tenant_key::resource::TenantResource; + + pub struct IndexIdToName; + impl TenantResource for IndexIdToName { + const PREFIX: &'static str = "__fd_index_id_to_name"; + const TYPE: &'static str = "IndexIdToNameIdent"; + const HAS_TENANT: bool = false; + type ValueType = IndexNameIdentRaw; + } + + impl kvapi::Value for IndexNameIdentRaw { + type KeyType = IndexIdToNameIdent; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::IndexId; + use super::IndexIdToNameIdent; + use crate::tenant::Tenant; + + #[test] + fn test_index_id_to_name_ident() { + let tenant = Tenant::new_literal("dummy"); + let ident = IndexIdToNameIdent::new_generic(tenant, IndexId::new(3)); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_index_id_to_name/3"); + + assert_eq!(ident, IndexIdToNameIdent::from_str_key(&key).unwrap()); + } + + #[test] + fn test_index_id_to_name_ident_with_key_space() { + // TODO(xp): implement this test + // let tenant = Tenant::new_literal("test"); + // let ident = IndexIdIdent::new(tenant, 3); + // + // let key = ident.to_string_key(); + // assert_eq!(key, "__fd_catalog_by_id/3"); + // + // assert_eq!(ident, IndexIdIdent::from_str_key(&key).unwrap()); + } +} diff --git a/src/meta/app/src/schema/index_name_ident.rs b/src/meta/app/src/schema/index_name_ident.rs index 92232e68be5d..28070e1b9d99 100644 --- a/src/meta/app/src/schema/index_name_ident.rs +++ b/src/meta/app/src/schema/index_name_ident.rs @@ -16,38 +16,37 @@ use crate::tenant_key::ident::TIdent; use crate::tenant_key::raw::TIdentRaw; /// Index name as meta-service key -pub type IndexNameIdent = TIdent; +pub type IndexNameIdent = TIdent; /// Index name as value. -pub type IndexNameIdentRaw = TIdentRaw; +pub type IndexNameIdentRaw = TIdentRaw; -pub use kvapi_impl::Resource; +pub use kvapi_impl::IndexName; -impl TIdent { +impl TIdent { pub fn index_name(&self) -> &str { self.name() } } -impl TIdentRaw { +impl TIdentRaw { pub fn index_name(&self) -> &str { self.name() } } mod kvapi_impl { - use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::Key; - use crate::schema::IndexId; + use crate::schema::index_id_ident::IndexId; use crate::schema::IndexNameIdent; use crate::tenant_key::resource::TenantResource; + use crate::KeyWithTenant; - pub struct Resource; - impl TenantResource for Resource { + pub struct IndexName; + impl TenantResource for IndexName { const PREFIX: &'static str = "__fd_index"; - const TYPE: &'static str = "IndexNameIdent"; const HAS_TENANT: bool = true; type ValueType = IndexId; } @@ -55,14 +54,10 @@ mod kvapi_impl { impl kvapi::Value for IndexId { type KeyType = IndexNameIdent; /// IndexId is id of the two level `name->id,id->value` mapping - fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { - [self.to_string_key()] + fn dependency_keys(&self, key: &Self::KeyType) -> impl IntoIterator { + [self.into_t_ident(key.tenant()).to_string_key()] } } - - // // Use these error types to replace usage of ErrorCode if possible. - // impl From> for ErrorCode { - // impl From> for ErrorCode { } #[cfg(test)] diff --git a/src/meta/app/src/schema/mod.rs b/src/meta/app/src/schema/mod.rs index bdf526477a6a..8400100ced99 100644 --- a/src/meta/app/src/schema/mod.rs +++ b/src/meta/app/src/schema/mod.rs @@ -21,6 +21,8 @@ pub mod catalog_name_ident; pub mod database_id; pub mod database_id_history_ident; pub mod database_name_ident; +pub mod index_id_ident; +pub mod index_id_to_name_ident; pub mod index_name_ident; pub mod table_lock_ident; pub mod tenant_dictionary_ident; diff --git a/src/meta/app/src/schema/table.rs b/src/meta/app/src/schema/table.rs index 4d5946d12baf..96be81ef59bd 100644 --- a/src/meta/app/src/schema/table.rs +++ b/src/meta/app/src/schema/table.rs @@ -817,6 +817,7 @@ pub struct UpdateTableMetaReply { #[derive(Clone, Debug, PartialEq, Eq)] pub struct CreateTableIndexReq { pub create_option: CreateOption, + pub tenant: Tenant, pub table_id: u64, pub name: String, pub column_ids: Vec, diff --git a/src/meta/app/src/tenant_key/ident.rs b/src/meta/app/src/tenant_key/ident.rs index 8391ebd68a35..480062897c74 100644 --- a/src/meta/app/src/tenant_key/ident.rs +++ b/src/meta/app/src/tenant_key/ident.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::type_name; use std::fmt; use std::fmt::Debug; use std::fmt::Display; @@ -39,14 +40,8 @@ where R: TenantResource, N: Debug, { - fn fmt(&self, f: &mut fmt::Formatter) -> std::fmt::Result { - // If there is a specified type name for this alias, use it. - // Otherwise use the default name - let type_name = if R::TYPE.is_empty() { - "TIdent" - } else { - R::TYPE - }; + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let type_name = self.type_name(); f.debug_struct(type_name) .field("tenant", &self.tenant) @@ -61,11 +56,8 @@ where R: TenantResource, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let type_name = if R::TYPE.is_empty() { - "TIdent" - } else { - R::TYPE - }; + let type_name = self.type_name(); + write!( f, "{}({}/{})", @@ -150,6 +142,17 @@ impl TIdent { &self.name } + /// If there is a specified type name for this alias, use it. + /// Otherwise, use the default name + pub fn type_name(&self) -> &'static str + where R: TenantResource { + if R::TYPE.is_empty() { + type_name::().rsplit("::").next().unwrap_or("TIdent") + } else { + R::TYPE + } + } + /// Create a display-able instance. pub fn display(&self) -> impl fmt::Display + '_ where N: fmt::Display { @@ -282,6 +285,18 @@ mod tests { assert_eq!(key, "foo/test/test1"); assert_eq!(ident, TIdent::::from_str_key(&key).unwrap()); + + // Test debug + + assert_eq!( + format!("{:?}", ident), + r#"Foo { tenant: Tenant { tenant: "test" }, name: "test1" }"#, + "debug" + ); + + // Test display + + assert_eq!(format!("{}", ident), "Foo(test/test1)", "display"); } #[test] diff --git a/src/query/catalog/src/catalog/interface.rs b/src/query/catalog/src/catalog/interface.rs index 564eb0690df4..c2edada40944 100644 --- a/src/query/catalog/src/catalog/interface.rs +++ b/src/query/catalog/src/catalog/interface.rs @@ -43,7 +43,6 @@ use databend_common_meta_app::schema::DictionaryIdentity; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -167,7 +166,7 @@ pub trait Catalog: DynClone + Send + Sync + Debug { async fn create_index(&self, req: CreateIndexReq) -> Result; - async fn drop_index(&self, req: DropIndexReq) -> Result; + async fn drop_index(&self, req: DropIndexReq) -> Result<()>; async fn get_index(&self, req: GetIndexReq) -> Result; diff --git a/src/query/ee/src/aggregating_index/aggregating_index_handler.rs b/src/query/ee/src/aggregating_index/aggregating_index_handler.rs index 734e1ab34d10..1b1b3d9fecd3 100644 --- a/src/query/ee/src/aggregating_index/aggregating_index_handler.rs +++ b/src/query/ee/src/aggregating_index/aggregating_index_handler.rs @@ -19,7 +19,6 @@ use databend_common_catalog::catalog::Catalog; use databend_common_exception::Result; use databend_common_meta_app::schema::CreateIndexReply; use databend_common_meta_app::schema::CreateIndexReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -42,11 +41,7 @@ impl AggregatingIndexHandler for RealAggregatingIndexHandler { } #[async_backtrace::framed] - async fn do_drop_index( - &self, - catalog: Arc, - req: DropIndexReq, - ) -> Result { + async fn do_drop_index(&self, catalog: Arc, req: DropIndexReq) -> Result<()> { catalog.drop_index(req).await } diff --git a/src/query/ee/tests/it/inverted_index/index_refresh.rs b/src/query/ee/tests/it/inverted_index/index_refresh.rs index e401a642a1fa..c761b8292b2a 100644 --- a/src/query/ee/tests/it/inverted_index/index_refresh.rs +++ b/src/query/ee/tests/it/inverted_index/index_refresh.rs @@ -67,10 +67,12 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> { "filters".to_string(), "english_stop,english_stemmer,chinese_stop".to_string(), ); + let tenant = ctx.get_tenant(); let req = CreateTableIndexReq { create_option: CreateOption::Create, table_id, + tenant, name: index_name.clone(), column_ids: vec![0, 1], sync_creation: false, diff --git a/src/query/ee/tests/it/inverted_index/pruning.rs b/src/query/ee/tests/it/inverted_index/pruning.rs index 66809f4b761c..ea9009584722 100644 --- a/src/query/ee/tests/it/inverted_index/pruning.rs +++ b/src/query/ee/tests/it/inverted_index/pruning.rs @@ -499,10 +499,12 @@ async fn test_block_pruner() -> Result<()> { "filters".to_string(), "english_stop,english_stemmer,chinese_stop".to_string(), ); + let tenant = ctx.get_tenant(); let req = CreateTableIndexReq { create_option: CreateOption::Create, table_id, + tenant, name: index_name.clone(), column_ids: vec![1, 2, 3], sync_creation: false, diff --git a/src/query/ee_features/aggregating_index/src/aggregating_index_handler.rs b/src/query/ee_features/aggregating_index/src/aggregating_index_handler.rs index e37600ff0629..94bfde468ecb 100644 --- a/src/query/ee_features/aggregating_index/src/aggregating_index_handler.rs +++ b/src/query/ee_features/aggregating_index/src/aggregating_index_handler.rs @@ -19,7 +19,6 @@ use databend_common_catalog::catalog::Catalog; use databend_common_exception::Result; use databend_common_meta_app::schema::CreateIndexReply; use databend_common_meta_app::schema::CreateIndexReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -34,11 +33,7 @@ pub trait AggregatingIndexHandler: Sync + Send { req: CreateIndexReq, ) -> Result; - async fn do_drop_index( - &self, - catalog: Arc, - req: DropIndexReq, - ) -> Result; + async fn do_drop_index(&self, catalog: Arc, req: DropIndexReq) -> Result<()>; async fn do_get_index( &self, @@ -72,11 +67,7 @@ impl AggregatingIndexHandlerWrapper { } #[async_backtrace::framed] - pub async fn do_drop_index( - &self, - catalog: Arc, - req: DropIndexReq, - ) -> Result { + pub async fn do_drop_index(&self, catalog: Arc, req: DropIndexReq) -> Result<()> { self.handler.do_drop_index(catalog, req).await } diff --git a/src/query/service/src/catalogs/default/database_catalog.rs b/src/query/service/src/catalogs/default/database_catalog.rs index 49bf995eb731..2e967474d352 100644 --- a/src/query/service/src/catalogs/default/database_catalog.rs +++ b/src/query/service/src/catalogs/default/database_catalog.rs @@ -48,7 +48,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -599,7 +598,7 @@ impl Catalog for DatabaseCatalog { } #[async_backtrace::framed] - async fn drop_index(&self, req: DropIndexReq) -> Result { + async fn drop_index(&self, req: DropIndexReq) -> Result<()> { self.mutable_catalog.drop_index(req).await } diff --git a/src/query/service/src/catalogs/default/immutable_catalog.rs b/src/query/service/src/catalogs/default/immutable_catalog.rs index 160e56e90edb..d51b7c1c8f63 100644 --- a/src/query/service/src/catalogs/default/immutable_catalog.rs +++ b/src/query/service/src/catalogs/default/immutable_catalog.rs @@ -44,7 +44,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -433,7 +432,7 @@ impl Catalog for ImmutableCatalog { } #[async_backtrace::framed] - async fn drop_index(&self, _req: DropIndexReq) -> Result { + async fn drop_index(&self, _req: DropIndexReq) -> Result<()> { unimplemented!() } diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 16f2671dd949..afaf2e2d8095 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -24,6 +24,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_api::SchemaApi; use databend_common_meta_api::SequenceApi; +use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent; use databend_common_meta_app::schema::CatalogInfo; @@ -52,7 +53,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -294,8 +294,16 @@ impl Catalog for MutableCatalog { } #[async_backtrace::framed] - async fn drop_index(&self, req: DropIndexReq) -> Result { - Ok(self.ctx.meta.drop_index(req).await?) + async fn drop_index(&self, req: DropIndexReq) -> Result<()> { + let dropped = self.ctx.meta.drop_index(&req.name_ident).await?; + if dropped.is_none() { + if req.if_exists { + // Alright + } else { + return Err(AppError::from(req.name_ident.unknown_error("drop_index")).into()); + } + } + Ok(()) } #[async_backtrace::framed] diff --git a/src/query/service/src/catalogs/default/session_catalog.rs b/src/query/service/src/catalogs/default/session_catalog.rs index 1f8dfa905550..f45474b75940 100644 --- a/src/query/service/src/catalogs/default/session_catalog.rs +++ b/src/query/service/src/catalogs/default/session_catalog.rs @@ -46,7 +46,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -184,7 +183,7 @@ impl Catalog for SessionCatalog { self.inner.create_index(req).await } - async fn drop_index(&self, req: DropIndexReq) -> Result { + async fn drop_index(&self, req: DropIndexReq) -> Result<()> { self.inner.drop_index(req).await } diff --git a/src/query/service/src/catalogs/share/share_catalog.rs b/src/query/service/src/catalogs/share/share_catalog.rs index 811c4bd57b15..b96b2ccdfab4 100644 --- a/src/query/service/src/catalogs/share/share_catalog.rs +++ b/src/query/service/src/catalogs/share/share_catalog.rs @@ -57,7 +57,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -644,7 +643,7 @@ impl Catalog for ShareCatalog { } #[async_backtrace::framed] - async fn drop_index(&self, _req: DropIndexReq) -> Result { + async fn drop_index(&self, _req: DropIndexReq) -> Result<()> { unimplemented!() } diff --git a/src/query/service/src/interpreters/interpreter_index_refresh.rs b/src/query/service/src/interpreters/interpreter_index_refresh.rs index dcc8d1469dad..94022966a006 100644 --- a/src/query/service/src/interpreters/interpreter_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_index_refresh.rs @@ -353,9 +353,10 @@ impl Interpreter for RefreshIndexInterpreter { })?; let ctx = self.ctx.clone(); + let tenant = ctx.get_tenant(); let req = UpdateIndexReq { + tenant, index_id: self.plan.index_id, - index_name: self.plan.index_name.clone(), index_meta: new_index_meta, }; diff --git a/src/query/service/src/interpreters/interpreter_table_index_create.rs b/src/query/service/src/interpreters/interpreter_table_index_create.rs index 1d2923c970f0..f50c99be85cc 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_create.rs @@ -59,9 +59,11 @@ impl Interpreter for CreateTableIndexInterpreter { let sync_creation = self.plan.sync_creation; let table_id = self.plan.table_id; let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; + let tenant = self.ctx.get_tenant(); let create_index_req = CreateTableIndexReq { create_option: self.plan.create_option, + tenant, table_id, name: index_name, column_ids, diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index ba7bb343d08b..5283aa840f9e 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -77,7 +77,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -313,7 +312,7 @@ impl Catalog for FakedCatalog { } #[async_backtrace::framed] - async fn drop_index(&self, _req: DropIndexReq) -> Result { + async fn drop_index(&self, _req: DropIndexReq) -> Result<()> { unimplemented!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index b0b0e3779b6e..37dc21610c27 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -76,7 +76,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -1049,7 +1048,7 @@ impl Catalog for FakedCatalog { } #[async_backtrace::framed] - async fn drop_index(&self, _req: DropIndexReq) -> Result { + async fn drop_index(&self, _req: DropIndexReq) -> Result<()> { unimplemented!() } diff --git a/src/query/storages/hive/hive/src/hive_catalog.rs b/src/query/storages/hive/hive/src/hive_catalog.rs index ad4509411e39..44b9472b8619 100644 --- a/src/query/storages/hive/hive/src/hive_catalog.rs +++ b/src/query/storages/hive/hive/src/hive_catalog.rs @@ -52,7 +52,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -605,7 +604,7 @@ impl Catalog for HiveCatalog { } #[async_backtrace::framed] - async fn drop_index(&self, _req: DropIndexReq) -> Result { + async fn drop_index(&self, _req: DropIndexReq) -> Result<()> { unimplemented!() } diff --git a/src/query/storages/iceberg/src/catalog.rs b/src/query/storages/iceberg/src/catalog.rs index d772a2e5b316..eed935d8d6bf 100644 --- a/src/query/storages/iceberg/src/catalog.rs +++ b/src/query/storages/iceberg/src/catalog.rs @@ -50,7 +50,6 @@ use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::DictionaryMeta; use databend_common_meta_app::schema::DropDatabaseReply; use databend_common_meta_app::schema::DropDatabaseReq; -use databend_common_meta_app::schema::DropIndexReply; use databend_common_meta_app::schema::DropIndexReq; use databend_common_meta_app::schema::DropSequenceReply; use databend_common_meta_app::schema::DropSequenceReq; @@ -442,7 +441,7 @@ impl Catalog for IcebergCatalog { } #[async_backtrace::framed] - async fn drop_index(&self, _req: DropIndexReq) -> Result { + async fn drop_index(&self, _req: DropIndexReq) -> Result<()> { unimplemented!() }