Skip to content

Commit

Permalink
refactor: simplify least-visible-time schema API, add LeastVisibleTim…
Browse files Browse the repository at this point in the history
…eIdent
  • Loading branch information
drmingdrmer committed Sep 9, 2024
1 parent aa0c9ea commit 6b34d4f
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 166 deletions.
44 changes: 40 additions & 4 deletions src/meta/api/src/name_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_meta_app::tenant_key::ident::TIdent;
use databend_common_meta_app::tenant_key::resource::TenantResource;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KeyCodec;
use databend_common_meta_types::Change;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqValue;
Expand All @@ -46,12 +47,12 @@ use crate::util::txn_op_put_pb;
pub trait NameValueApi<R, N>: KVApi<Error = MetaError>
where
R: TenantResource + Send + Sync + 'static,
R::ValueType: FromToProto + Send + Sync + 'static,
R::ValueType: FromToProto + Clone + Send + Sync + 'static,
N: KeyCodec,
N: fmt::Debug + Clone + Send + Sync + 'static,
{
/// Create a `name -> value` mapping.
async fn create_name_value(
async fn insert_name_value(
&self,
name_ident: TIdent<R, N>,
value: R::ValueType,
Expand All @@ -75,7 +76,7 @@ where
}

/// Create a `name -> value` mapping, with `CreateOption` support
async fn create_name_value_with_create_option(
async fn insert_name_value_with_create_option(
&self,
name_ident: TIdent<R, N>,
value: R::ValueType,
Expand All @@ -97,6 +98,41 @@ where
Ok(Ok(()))
}

/// Update or insert a `name -> value` mapping.
///
/// The `update` function is called with the previous value and should output the updated to write back.
/// If it outputs `None`, nothing is written back.
async fn upsert_name_value_with(
&self,
name_ident: &TIdent<R, N>,
update: impl Fn(Option<R::ValueType>) -> Option<R::ValueType> + Send,
) -> Result<Change<R::ValueType>, MetaTxnError> {
debug!(name_ident :? =name_ident; "NameValueApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let seq_meta = self.get_pb(name_ident).await?;
let seq = seq_meta.seq();

let updated = match update(seq_meta.clone().into_value()) {
Some(x) => x,
None => return Ok(Change::new(seq_meta.clone(), seq_meta)),
};

let transition = self
.upsert_pb(
&UpsertPB::insert(name_ident.clone(), updated).with(MatchSeq::Exact(seq)),
)
.await?;

if transition.is_changed() {
return Ok(transition);
}
}
}

/// Update an existent `name -> value` mapping.
///
/// The `update` function is called with the previous value
Expand Down Expand Up @@ -174,7 +210,7 @@ impl<R, N, T> NameValueApi<R, N> for T
where
T: KVApi<Error = MetaError> + ?Sized,
R: TenantResource + Send + Sync + 'static,
R::ValueType: FromToProto + Send + Sync + 'static,
R::ValueType: FromToProto + Clone + Send + Sync + 'static,
N: KeyCodec,
N: fmt::Debug + Clone + Send + Sync + 'static,
{
Expand Down
35 changes: 29 additions & 6 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_meta_app::schema::dictionary_id_ident::DictionaryId;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::index_id_ident::IndexId;
use databend_common_meta_app::schema::index_id_ident::IndexIdIdent;
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogMeta;
use databend_common_meta_app::schema::CatalogNameIdent;
Expand Down Expand Up @@ -49,13 +50,12 @@ use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::GcDroppedTableReq;
use databend_common_meta_app::schema::GetDatabaseReq;
use databend_common_meta_app::schema::GetIndexReply;
use databend_common_meta_app::schema::GetLVTReply;
use databend_common_meta_app::schema::GetLVTReq;
use databend_common_meta_app::schema::GetTableCopiedFileReply;
use databend_common_meta_app::schema::GetTableCopiedFileReq;
use databend_common_meta_app::schema::GetTableReq;
use databend_common_meta_app::schema::IndexMeta;
use databend_common_meta_app::schema::IndexNameIdent;
use databend_common_meta_app::schema::LeastVisibleTime;
use databend_common_meta_app::schema::ListCatalogReq;
use databend_common_meta_app::schema::ListDatabaseReq;
use databend_common_meta_app::schema::ListDictionaryReq;
Expand All @@ -72,8 +72,6 @@ use databend_common_meta_app::schema::RenameDatabaseReply;
use databend_common_meta_app::schema::RenameDatabaseReq;
use databend_common_meta_app::schema::RenameTableReply;
use databend_common_meta_app::schema::RenameTableReq;
use databend_common_meta_app::schema::SetLVTReply;
use databend_common_meta_app::schema::SetLVTReq;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
use databend_common_meta_app::schema::TableId;
Expand All @@ -95,10 +93,12 @@ use databend_common_meta_app::schema::UpdateVirtualColumnReq;
use databend_common_meta_app::schema::UpsertTableOptionReply;
use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_app::schema::VirtualColumnMeta;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::Change;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_proto_conv::FromToProto;

use crate::kv_app_error::KVAppError;
use crate::meta_txn_error::MetaTxnError;
Expand Down Expand Up @@ -307,8 +307,23 @@ pub trait SchemaApi: Send + Sync {
-> Result<Vec<Arc<CatalogInfo>>, KVAppError>;

// least visible time
async fn set_table_lvt(&self, req: SetLVTReq) -> Result<SetLVTReply, KVAppError>;
async fn get_table_lvt(&self, req: GetLVTReq) -> Result<GetLVTReply, KVAppError>;

/// Updates the table's least visible time (LVT) only if the new value is greater than the existing one.
///
/// This function returns the updated LVT if changed, or the existing LVT if no update was necessary.
async fn set_table_lvt(
&self,
name_ident: &LeastVisibleTimeIdent,
value: &LeastVisibleTime,
) -> Result<LeastVisibleTime, KVAppError>;

#[deprecated(note = "use get::<K>() instead")]
async fn get_table_lvt(
&self,
name_ident: &LeastVisibleTimeIdent,
) -> Result<Option<LeastVisibleTime>, KVAppError> {
Ok(self.get(name_ident).await?)
}

fn name(&self) -> String;

Expand Down Expand Up @@ -337,4 +352,12 @@ pub trait SchemaApi: Send + Sync {
&self,
req: ListDictionaryReq,
) -> Result<Vec<(String, DictionaryMeta)>, KVAppError>;

/// Generic get() implementation for any kvapi::Key.
///
/// This method just return an `Option` of the value without seq number.
async fn get<K>(&self, name_ident: &K) -> Result<Option<K::ValueType>, MetaError>
where
K: kvapi::Key + Sync + 'static,
K::ValueType: FromToProto + 'static;
}
97 changes: 37 additions & 60 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::type_name;
use std::cmp::min;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
Expand Down Expand Up @@ -63,6 +64,7 @@ use databend_common_meta_app::schema::index_id_ident::IndexId;
use databend_common_meta_app::schema::index_id_ident::IndexIdIdent;
use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent;
use databend_common_meta_app::schema::index_name_ident::IndexName;
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
use databend_common_meta_app::schema::CatalogIdToNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogMeta;
Expand Down Expand Up @@ -105,16 +107,13 @@ use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::GcDroppedTableReq;
use databend_common_meta_app::schema::GetDatabaseReq;
use databend_common_meta_app::schema::GetIndexReply;
use databend_common_meta_app::schema::GetLVTReply;
use databend_common_meta_app::schema::GetLVTReq;
use databend_common_meta_app::schema::GetTableCopiedFileReply;
use databend_common_meta_app::schema::GetTableCopiedFileReq;
use databend_common_meta_app::schema::GetTableReq;
use databend_common_meta_app::schema::IndexMeta;
use databend_common_meta_app::schema::IndexNameIdent;
use databend_common_meta_app::schema::IndexNameIdentRaw;
use databend_common_meta_app::schema::LeastVisibleTime;
use databend_common_meta_app::schema::LeastVisibleTimeKey;
use databend_common_meta_app::schema::ListCatalogReq;
use databend_common_meta_app::schema::ListDatabaseReq;
use databend_common_meta_app::schema::ListDictionaryReq;
Expand All @@ -131,8 +130,6 @@ use databend_common_meta_app::schema::RenameDatabaseReply;
use databend_common_meta_app::schema::RenameDatabaseReq;
use databend_common_meta_app::schema::RenameTableReply;
use databend_common_meta_app::schema::RenameTableReq;
use databend_common_meta_app::schema::SetLVTReply;
use databend_common_meta_app::schema::SetLVTReq;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyAction;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
Expand Down Expand Up @@ -188,6 +185,7 @@ use databend_common_meta_types::TxnGetRequest;
use databend_common_meta_types::TxnGetResponse;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnRequest;
use databend_common_proto_conv::FromToProto;
use fastrace::func_name;
use futures::TryStreamExt;
use log::debug;
Expand Down Expand Up @@ -955,7 +953,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
updated_on: None,
};

self.create_name_value_with_create_option(
self.insert_name_value_with_create_option(
req.name_ident.clone(),
virtual_column_meta,
req.create_option,
Expand Down Expand Up @@ -3029,7 +3027,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// Revision is unique. if it presents, consider it as success.
// Thus, we could just ignore create result
let _create_res = self
.create_name_value(key, lock_meta, Some(req.ttl))
.insert_name_value(key, lock_meta, Some(req.ttl))
.await?;

Ok(CreateLockRevReply { revision })
Expand Down Expand Up @@ -3186,66 +3184,38 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn set_table_lvt(&self, req: SetLVTReq) -> Result<SetLVTReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let table_id = req.table_id;

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let lvt_key = LeastVisibleTimeKey { table_id };
let (lvt_seq, lvt_opt): (_, Option<LeastVisibleTime>) =
get_pb_value(self, &lvt_key).await?;
let new_time = match lvt_opt {
Some(lvt) => {
if lvt.time >= req.time {
return Ok(SetLVTReply { time: lvt.time });
} else {
req.time
}
async fn set_table_lvt(
&self,
name_ident: &LeastVisibleTimeIdent,
value: &LeastVisibleTime,
) -> Result<LeastVisibleTime, KVAppError> {
debug!(req :? =(&name_ident, &value); "SchemaApi: {}", func_name!());

let transition = self
.upsert_name_value_with(name_ident, |t: Option<LeastVisibleTime>| {
let curr = t.unwrap_or_default();
if curr.time >= value.time {
None
} else {
Some(value.clone())
}
None => req.time,
};

let new_lvt = LeastVisibleTime { time: new_time };

let txn_req = TxnRequest {
condition: vec![txn_cond_seq(&lvt_key, Eq, lvt_seq)],
if_then: vec![txn_op_put(&lvt_key, serialize_struct(&new_lvt)?)],
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
name :? =(req.table_id),
succ = succ;
"set_table_lvt"
);
})
.await?;

if succ {
return Ok(SetLVTReply { time: new_time });
}
}
return Ok(transition.result.into_value().unwrap_or_default());
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_table_lvt(&self, req: GetLVTReq) -> Result<GetLVTReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let table_id = req.table_id;

let lvt_key = LeastVisibleTimeKey { table_id };

let seq_lvt = self.get_pb(&lvt_key).await?;
let lvt_opt = seq_lvt.into_value();
async fn get<K>(&self, name_ident: &K) -> Result<Option<K::ValueType>, MetaError>
where
K: kvapi::Key + Sync + 'static,
K::ValueType: FromToProto + 'static,
{
debug!(req :? =(&name_ident); "SchemaApi::get::<{}>()", typ::<K>());

Ok(GetLVTReply {
time: lvt_opt.map(|time| time.time),
})
let seq_lvt = self.get_pb(name_ident).await?;
Ok(seq_lvt.into_value())
}

fn name(&self) -> String {
Expand Down Expand Up @@ -4553,3 +4523,10 @@ async fn append_update_stream_meta_requests(
}
Ok(())
}

fn typ<K>() -> &'static str {
type_name::<K>()
.rsplit("::")
.next()
.unwrap_or("UnknownType")
}
Loading

0 comments on commit 6b34d4f

Please sign in to comment.