Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mirror insert request to flownode in async #5444

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 120 additions & 110 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use common_telemetry::{error, info};
use futures_util::future;
use meter_macros::write_meter;
use partition::manager::PartitionRuleManagerRef;
Expand Down Expand Up @@ -338,50 +338,18 @@ impl Inserter {
instant_requests,
} = requests;

// Mirror requests for source table to flownode
match self
.mirror_flow_node_requests(
normal_requests
.requests
.iter()
.chain(instant_requests.requests.iter()),
)
.await
{
Ok(flow_requests) => {
let node_manager = self.node_manager.clone();
let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
common_runtime::spawn_global(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.context(RequestInsertsSnafu)
})
});

match future::try_join_all(flow_tasks)
.await
.context(JoinTaskSnafu)
{
Ok(ret) => {
let affected_rows = ret
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<u64>>()
.unwrap_or(0);
crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
}
Err(err) => {
warn!(err; "Failed to insert data into flownode");
}
}
}
Err(err) => warn!(err; "Failed to mirror request to flownode"),
}
// Mirror requests for source table to flownode asynchronously
let flow_mirror_task = FlowMirrorTask::new(
&self.table_flownode_set_cache,
normal_requests
.requests
.iter()
.chain(instant_requests.requests.iter()),
)
.await?;
flow_mirror_task.detach(self.node_manager.clone())?;

// Write requests to datanode and wait for response
let write_tasks = self
.group_requests_by_peer(normal_requests)
.await?
Expand Down Expand Up @@ -412,72 +380,6 @@ impl Inserter {
))
}

/// Mirror requests for source table to flownode
async fn mirror_flow_node_requests<'it, 'zelf: 'it>(
&'zelf self,
requests: impl Iterator<Item = &'it RegionInsertRequest>,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
// store partial source table requests used by flow node(only store what's used)
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
match src_table_reqs.get_mut(&table_id) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
// already know this is not source table
Some(None) => continue,
_ => {
let peers = self
.table_flownode_set_cache
.get(table_id)
.await
.context(RequestInsertsSnafu)?
.unwrap_or_default()
.values()
.cloned()
.collect::<Vec<_>>();

if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req.clone());
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
src_table_reqs.insert(table_id, None);
}
}
}
}

let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();

for (_table_id, (peers, reqs)) in src_table_reqs
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
if peers.len() == 1 {
// fast path, zero copy
inserts
.entry(peers[0].clone())
.or_default()
.requests
.extend(reqs.requests);
continue;
} else {
// TODO(discord9): need to split requests to multiple flownodes
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
}

Ok(inserts)
}

async fn group_requests_by_peer(
&self,
requests: RegionInsertRequests,
Expand Down Expand Up @@ -915,3 +817,111 @@ struct CreateAlterTableResult {
/// Table Info of the created tables.
table_infos: HashMap<TableId, Arc<TableInfo>>,
}

struct FlowMirrorTask {
requests: HashMap<Peer, RegionInsertRequests>,
num_rows: usize,
}

impl FlowMirrorTask {
async fn new(
cache: &TableFlownodeSetCacheRef,
requests: impl Iterator<Item = &RegionInsertRequest>,
) -> Result<Self> {
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
let mut num_rows = 0;

for req in requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
match src_table_reqs.get_mut(&table_id) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
// already know this is not source table
Some(None) => continue,
_ => {
let peers = cache
.get(table_id)
.await
.context(RequestInsertsSnafu)?
.unwrap_or_default()
.values()
.cloned()
.collect::<Vec<_>>();

if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req.clone());
num_rows += reqs
.requests
.iter()
.map(|r| r.rows.as_ref().unwrap().rows.len())
.sum::<usize>();
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
src_table_reqs.insert(table_id, None);
}
}
}
}

let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();

for (_table_id, (peers, reqs)) in src_table_reqs
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
if peers.len() == 1 {
// fast path, zero copy
inserts
.entry(peers[0].clone())
.or_default()
.requests
.extend(reqs.requests);
continue;
} else {
// TODO(discord9): need to split requests to multiple flownodes
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
}

Ok(Self {
requests: inserts,
num_rows,
})
}

fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
for (peer, inserts) in self.requests {
let node_manager = node_manager.clone();
common_runtime::spawn_global(async move {
let result = node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.context(RequestInsertsSnafu);

match result {
Ok(resp) => {
let affected_rows = resp.affected_rows;
crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
}
Err(err) => {
error!(err; "Failed to insert data into flownode {}", peer);
}
}
});
}

Ok(())
}
}
5 changes: 5 additions & 0 deletions src/operator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ lazy_static! {
"table operator mirror rows"
)
.unwrap();
pub static ref DIST_MIRROR_PENDING_ROW_COUNT: IntGauge = register_int_gauge!(
"greptime_table_operator_mirror_pending_rows",
"table operator mirror pending rows"
)
.unwrap();
pub static ref DIST_DELETE_ROW_COUNT: IntCounter = register_int_counter!(
"greptime_table_operator_delete_rows",
"table operator delete rows"
Expand Down
Loading