Skip to content

Commit

Permalink
chore: avoid sending create table requests for already existing tables (
Browse files Browse the repository at this point in the history
#5347)

* chore: avoid sending create table requests for already existing tables

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Jan 15, 2025
1 parent bd37e08 commit 57f8afc
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 7 deletions.
9 changes: 7 additions & 2 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use futures_util::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -143,7 +143,12 @@ impl CreateLogicalTablesProcedure {

for peer in leaders {
let requester = self.context.node_manager.datanode(&peer).await;
let request = self.make_request(&peer, region_routes)?;
let Some(request) = self.make_request(&peer, region_routes)? else {
debug!("no region request to send to datanode {}", peer);
// We can skip the rest of the datanodes,
// the rest of the datanodes should have the same result.
break;
};

create_region_tasks.push(async move {
requester
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_logical_tables/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl CreateLogicalTablesProcedure {
Ok(())
}

pub(crate) async fn check_tables_already_exist(&mut self) -> Result<()> {
pub async fn check_tables_already_exist(&mut self) -> Result<()> {
let table_name_keys = self
.data
.all_create_table_exprs()
Expand Down
18 changes: 14 additions & 4 deletions src/common/meta/src/ddl/create_logical_tables/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::RegionId;

Expand All @@ -31,11 +32,15 @@ impl CreateLogicalTablesProcedure {
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
) -> Result<Option<RegionRequest>> {
let tasks = &self.data.tasks;
let table_ids_already_exists = &self.data.table_ids_already_exists;
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
for task in tasks {
for (task, table_id_already_exists) in tasks.iter().zip(table_ids_already_exists) {
if table_id_already_exists.is_some() {
continue;
}
let create_table_expr = &task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
Expand All @@ -51,13 +56,18 @@ impl CreateLogicalTablesProcedure {
}
}

Ok(RegionRequest {
if requests.is_empty() {
debug!("no region request to send to datanodes");
return Ok(None);
}

Ok(Some(RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Creates(CreateRequests { requests })),
})
}))
}

fn create_region_request_builder(
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ async fn test_on_datanode_create_logical_regions() {
}
});

procedure.check_tables_already_exist().await.unwrap();
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(
Expand Down

0 comments on commit 57f8afc

Please sign in to comment.