Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Improve gc_dropped_table_by_id() method #16528

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 64 additions & 35 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2775,14 +2775,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
tables: _,
} => gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?,
DroppedId::Table { name, id } => {
gc_dropped_table_by_id(
self,
&req.tenant,
name.db_id,
id.table_id,
name.table_name.clone(),
)
.await?
gc_dropped_table_by_id(self, &req.tenant, &name, &id).await?
}
}
}
Expand Down Expand Up @@ -3805,61 +3798,97 @@ async fn gc_dropped_db_by_id(
Ok(())
}

/// Permanently remove a dropped table from the meta-service.
///
/// The data of the table should already have been removed before calling this method.
async fn gc_dropped_table_by_id(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
tenant: &Tenant,
db_id: u64,
table_id: u64,
table_name: String,
db_id_table_name: &DBIdTableName,
table_id_ident: &TableId,
) -> Result<(), KVAppError> {
// first get TableIdList
let table_id_history_ident = TableIdHistoryIdent {
database_id: db_id,
table_name,
};
// First remove all copied files for the dropped table.
// These markers are not part of the table and can be removed in separate transactions.
remove_copied_files_for_dropped_table(kv_api, table_id_ident).await?;

let seq_id_list = kv_api.get_pb(&table_id_history_ident).await?;
let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let mut txn = TxnRequest::default();

// 1)
remove_data_for_dropped_table(kv_api, table_id_ident, &mut txn).await?;

// 2)
let table_id_history_ident = TableIdHistoryIdent {
database_id: db_id_table_name.db_id,
table_name: db_id_table_name.table_name.clone(),
};

update_txn_to_remove_table_history(
kv_api,
&mut txn,
&table_id_history_ident,
table_id_ident,
)
.await?;

// 3)
remove_index_for_dropped_table(kv_api, tenant, table_id_ident, &mut txn).await?;

let (succ, _responses) = send_txn(kv_api, txn).await?;

if succ {
return Ok(());
}
}
}

/// Fill in condition and operations to TxnRequest to remove table history.
///
/// If the table history does not exist or does not include the table id, do nothing.
///
/// This function does not submit the txn.
async fn update_txn_to_remove_table_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
txn: &mut TxnRequest,
table_id_history_ident: &TableIdHistoryIdent,
table_id_ident: &TableId,
) -> Result<(), MetaError> {
let seq_id_list = kv_api.get_pb(table_id_history_ident).await?;

let Some(seq_id_list) = seq_id_list else {
return Ok(());
};

let seq = seq_id_list.seq;
let mut tb_id_list = seq_id_list.data;
let mut history = seq_id_list.data;

// remove table_id from tb_id_list:
{
let index = tb_id_list.id_list.iter().position(|&x| x == table_id);
let Some(index) = index else {
let table_id = table_id_ident.table_id;
let pos = history.id_list.iter().position(|&x| x == table_id);
let Some(index) = pos else {
return Ok(());
};

tb_id_list.id_list.remove(index);
history.id_list.remove(index);
}

let mut txn = TxnRequest::default();

// construct the txn request
txn.condition.push(
// condition: table id list not changed
txn_cond_eq_seq(&table_id_history_ident, seq),
txn_cond_eq_seq(table_id_history_ident, seq),
);

if tb_id_list.id_list.is_empty() {
txn.if_then.push(txn_op_del(&table_id_history_ident));
if history.id_list.is_empty() {
txn.if_then.push(txn_op_del(table_id_history_ident));
} else {
// save new table id list
txn.if_then
.push(txn_op_put_pb(&table_id_history_ident, &tb_id_list, None)?);
.push(txn_op_put_pb(table_id_history_ident, &history, None)?);
}

let table_id_ident = TableId { table_id };
remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?;
remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?;
remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn).await?;

let _resp = kv_api.transaction(txn).await?;

Ok(())
}

Expand Down
Loading