Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): do not store upstream actors in merge node #20222

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ message MergeNode {
//
// `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used.
// See `compose_fragment`.
repeated uint32 upstream_actor_id = 1;
repeated uint32 upstream_actor_id = 1 [deprecated = true];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some explanation of the deprecation? Also for StreamActor.

uint32 upstream_fragment_id = 2;
// Type of the upstream dispatcher. If there's always one upstream according to this
// type, the compute node may use the `ReceiverExecutor` as an optimization.
Expand Down Expand Up @@ -972,7 +972,7 @@ message StreamActor {
// Note that upstream actor ids are also stored in the proto of merge nodes.
// It is painstaking to traverse through the node tree and get upstream actor id from the root StreamNode.
// We duplicate the information here to ease the parsing logic in stream manager.
repeated uint32 upstream_actor_id = 6;
repeated uint32 upstream_actor_id = 6 [deprecated = true];
// Vnodes that the executors in this actor own.
// If the fragment is a singleton, this field will not be set and leave a `None`.
common.Buffer vnode_bitmap = 8;
Expand Down
11 changes: 10 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;

message BuildActorInfo {
message UpstreamActors {
repeated uint32 actors = 1;
}

stream_plan.StreamActor actor = 1;
map<uint32, UpstreamActors> upstreams = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name it fragment_upstreams?

}

repeated common.ActorInfo broadcast_info = 8;
repeated stream_plan.StreamActor actors_to_build = 9;
repeated BuildActorInfo actors_to_build = 9;

Check failure on line 30 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "9" with name "actors_to_build" on message "InjectBarrierRequest" changed type from "stream_plan.StreamActor" to "stream_service.InjectBarrierRequest.BuildActorInfo".
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11;
}
Expand Down
20 changes: 1 addition & 19 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,14 @@ pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow:
for actor in &fragment.actors {
if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) {
println!(
"\t\tActor{} ({} splits): [{}]{}",
"\t\tActor{} ({} splits): [{}]",
if ignore_id {
"".to_owned()
} else {
format!(" #{:<3}", actor.actor_id,)
},
split_count,
splits,
if !actor.upstream_actor_id.is_empty() {
let upstream_splits = actor
.upstream_actor_id
.iter()
.find_map(|id| actor_splits_map.get(id))
.expect("should have one upstream source actor");
format!(
" <- Upstream Actor{}: [{}]",
if ignore_id {
"".to_owned()
} else {
format!(" #{}", actor.upstream_actor_id[0])
},
upstream_splits.1
)
} else {
"".to_owned()
}
);
} else {
println!(
Expand Down
11 changes: 7 additions & 4 deletions src/meta/src/barrier/checkpoint/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_meta_model::WorkerId;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::StreamActor;
use risingwave_pb::stream_service::barrier_complete_response::{
CreateMviewProgress, PbCreateMviewProgress,
};
use tracing::warn;

use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
use crate::model::StreamActorWithUpstreams;

#[derive(Debug)]
pub(super) struct CreateMviewLogStoreProgressTracker {
Expand Down Expand Up @@ -110,7 +110,7 @@ pub(super) enum CreatingStreamingJobStatus {
pending_non_checkpoint_barriers: Vec<u64>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActorWithUpstreams>>, Mutation)>,
},
/// The creating job is consuming log store.
///
Expand All @@ -126,7 +126,7 @@ pub(super) enum CreatingStreamingJobStatus {

pub(super) struct CreatingJobInjectBarrierInfo {
pub barrier_info: BarrierInfo,
pub new_actors: Option<HashMap<WorkerId, Vec<StreamActor>>>,
pub new_actors: Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>>,
pub mutation: Option<Mutation>,
}

Expand Down Expand Up @@ -252,7 +252,10 @@ impl CreatingStreamingJobStatus {
pub(super) fn new_fake_barrier(
prev_epoch_fake_physical_time: &mut u64,
pending_non_checkpoint_barriers: &mut Vec<u64>,
initial_barrier_info: &mut Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
initial_barrier_info: &mut Option<(
HashMap<WorkerId, Vec<StreamActorWithUpstreams>>,
Mutation,
)>,
is_checkpoint: bool,
) -> CreatingJobInjectBarrierInfo {
{
Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_pb::stream_plan::update_mutation::*;
use risingwave_pb::stream_plan::{
AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers,
DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::warn;
Expand All @@ -49,7 +49,9 @@ use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{StreamingJob, StreamingJobType};
use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments};
use crate::model::{
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobFragments,
};
use crate::stream::{
build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig,
};
Expand Down Expand Up @@ -83,7 +85,7 @@ pub struct Reschedule {
/// `Source` and `SourceBackfill` are handled together here.
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
pub newly_created_actors: Vec<(StreamActorWithUpstreams, PbActorStatus)>,
}

/// Replacing an old job with a new one. All actors in the job will be rebuilt.
Expand Down Expand Up @@ -952,7 +954,7 @@ impl Command {
mutation
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActor>>> {
pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>> {
match self {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::WorkerSlotId;
use risingwave_meta_model::StreamingParallelism;
use risingwave_pb::stream_plan::StreamActor;
use thiserror_ext::AsReport;
use tokio::time::Instant;
use tracing::{debug, info, warn};
Expand All @@ -34,7 +33,7 @@ use crate::barrier::info::InflightDatabaseInfo;
use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, StreamJobFragments, TableParallelism};
use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments, TableParallelism};
use crate::stream::{
JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
RescheduleOptions, SourceChange,
Expand Down Expand Up @@ -724,7 +723,7 @@ impl GlobalBarrierWorkerContextImpl {
}

/// Update all actors in compute nodes.
async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActorWithUpstreams>> {
self.metadata_manager.all_active_actors().await
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ use risingwave_connector::source::SplitImpl;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PbRecoveryStatus;
use risingwave_pb::stream_plan::StreamActor;
use tokio::sync::oneshot::Sender;

use self::notifier::Notifier;
use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, StreamJobFragments};
use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments};
use crate::{MetaError, MetaResult};

mod checkpoint;
Expand Down Expand Up @@ -104,7 +103,7 @@ struct BarrierWorkerRuntimeInfoSnapshot {
database_fragment_infos: HashMap<DatabaseId, InflightDatabaseInfo>,
state_table_committed_epochs: HashMap<TableId, u64>,
subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
stream_actors: HashMap<ActorId, StreamActor>,
stream_actors: HashMap<ActorId, StreamActorWithUpstreams>,
source_splits: HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
hummock_version_stats: HummockVersionStats,
Expand All @@ -115,7 +114,7 @@ impl BarrierWorkerRuntimeInfoSnapshot {
database_id: DatabaseId,
database_info: &InflightDatabaseInfo,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
stream_actors: &HashMap<ActorId, StreamActor>,
stream_actors: &HashMap<ActorId, StreamActorWithUpstreams>,
state_table_committed_epochs: &HashMap<TableId, u64>,
) -> MetaResult<()> {
{
Expand Down Expand Up @@ -190,7 +189,7 @@ struct DatabaseRuntimeInfoSnapshot {
database_fragment_info: InflightDatabaseInfo,
state_table_committed_epochs: HashMap<TableId, u64>,
subscription_info: InflightSubscriptionInfo,
stream_actors: HashMap<ActorId, StreamActor>,
stream_actors: HashMap<ActorId, StreamActorWithUpstreams>,
source_splits: HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
}
Expand Down
30 changes: 23 additions & 7 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::{
AddMutation, Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo,
};
use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
use risingwave_pb::stream_service::streaming_control_stream_request::{
CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
RemovePartialGraphRequest, ResetDatabaseRequest,
Expand All @@ -57,7 +57,7 @@ use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::MetaSrvEnv;
use crate::model::{ActorId, StreamJobFragments};
use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments};
use crate::stream::build_actor_connector_splits;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -320,7 +320,7 @@ impl ControlStreamManager {
database_id: DatabaseId,
info: InflightDatabaseInfo,
state_table_committed_epochs: &mut HashMap<TableId, u64>,
stream_actors: &mut HashMap<ActorId, StreamActor>,
stream_actors: &mut HashMap<ActorId, StreamActorWithUpstreams>,
source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
subscription_info: InflightSubscriptionInfo,
Expand Down Expand Up @@ -455,7 +455,7 @@ impl ControlStreamManager {
barrier_info: &BarrierInfo,
pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
mut new_actors: Option<HashMap<WorkerId, Vec<StreamActor>>>,
mut new_actors: Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>>,
subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
) -> MetaResult<HashSet<WorkerId>> {
Expand Down Expand Up @@ -484,7 +484,7 @@ impl ControlStreamManager {
.flatten()
.flat_map(|(worker_id, actor_infos)| {
actor_infos.iter().map(|actor_info| ActorInfo {
actor_id: actor_info.actor_id,
actor_id: actor_info.0.actor_id,
host: self
.nodes
.get(worker_id)
Expand Down Expand Up @@ -541,6 +541,22 @@ impl ControlStreamManager {
.into_iter()
.flatten()
.flatten()
.map(|(actor, upstreams)| BuildActorInfo {
actor: Some(actor),
upstreams: upstreams
.into_iter()
.map(|(fragment_id, upstreams)| {
(
fragment_id,
UpstreamActors {
actors: upstreams
.into_iter()
.collect(),
},
)
})
.collect(),
})
.collect(),
subscriptions_to_add: subscriptions_to_add.clone(),
subscriptions_to_remove: subscriptions_to_remove.clone(),
Expand Down
Loading
Loading