Skip to content

Commit

Permalink
refactor: rename fs executor mods
Browse files Browse the repository at this point in the history
To make it more clear.

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jan 27, 2025
1 parent a1e5188 commit 5bbae1f
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 21 deletions.
8 changes: 7 additions & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::source::cdc::MYSQL_CDC_CONNECTOR;
use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
UPSTREAM_SOURCE_KEY,
S3_CONNECTOR, UPSTREAM_SOURCE_KEY,
};

/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
Expand Down Expand Up @@ -153,6 +153,12 @@ pub trait WithPropertiesExt: Get + Sized {
!self.is_iceberg_connector()
}

fn is_legacy_fs_connector(&self) -> bool {
self.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.eq_ignore_ascii_case(S3_CONNECTOR))
.unwrap_or(false)
}

fn is_new_fs_connector(&self) -> bool {
self.get(UPSTREAM_SOURCE_KEY)
.map(|s| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,25 @@ use super::{barrier_to_message_stream, StreamSourceCore};
use crate::executor::prelude::*;
use crate::executor::stream_reader::StreamReaderWithPause;

#[allow(dead_code)]
pub struct FsListExecutor<S: StateStore> {
actor_ctx: ActorContextRef,

/// Streaming source for external
stream_source_core: Option<StreamSourceCore<S>>,

/// Metrics for monitor.
#[expect(dead_code)]
metrics: Arc<StreamingMetrics>,

/// Receiver of barrier channel.
barrier_receiver: Option<UnboundedReceiver<Barrier>>,

/// System parameter reader to read barrier interval
#[expect(dead_code)]
system_params: SystemParamsReaderRef,

/// Rate limit in rows/s.
#[expect(dead_code)]
rate_limit_rps: Option<u32>,
}

Expand All @@ -65,7 +67,6 @@ impl<S: StateStore> FsListExecutor<S> {
}
}

#[allow(clippy::disallowed_types)]
fn build_chunked_paginate_stream(
&self,
source_desc: &SourceDesc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ use crate::executor::UpdateMutation;
/// some latencies in network and cost in meta.
const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;

/// [`FsSourceExecutor`] is a streaming source, fir external file systems
/// [`LegacyFsSourceExecutor`] is a streaming source, fir external file systems
/// such as s3.
pub struct FsSourceExecutor<S: StateStore> {
pub struct LegacyFsSourceExecutor<S: StateStore> {
actor_ctx: ActorContextRef,

/// Streaming source for external
Expand All @@ -68,7 +68,7 @@ pub struct FsSourceExecutor<S: StateStore> {
rate_limit_rps: Option<u32>,
}

impl<S: StateStore> FsSourceExecutor<S> {
impl<S: StateStore> LegacyFsSourceExecutor<S> {
pub fn new(
actor_ctx: ActorContextRef,
stream_source_core: StreamSourceCore<S>,
Expand Down Expand Up @@ -506,15 +506,15 @@ impl<S: StateStore> FsSourceExecutor<S> {
}
}

impl<S: StateStore> Execute for FsSourceExecutor<S> {
impl<S: StateStore> Execute for LegacyFsSourceExecutor<S> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.into_stream().boxed()
}
}

impl<S: StateStore> Debug for FsSourceExecutor<S> {
impl<S: StateStore> Debug for LegacyFsSourceExecutor<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FsSourceExecutor")
f.debug_struct("LegacyFsSourceExecutor")
.field("source_id", &self.stream_source_core.source_id)
.field("column_ids", &self.stream_source_core.column_ids)
.finish()
Expand Down
12 changes: 6 additions & 6 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ pub use state_table_handler::*;
mod executor_core;
pub use executor_core::StreamSourceCore;

mod fs_source_executor;
mod legacy_fs_source_executor;
#[expect(deprecated)]
pub use fs_source_executor::*;
pub use legacy_fs_source_executor::*;
mod source_executor;
pub use source_executor::*;
mod source_backfill_executor;
pub use source_backfill_executor::*;
mod list_executor;
pub use list_executor::*;
mod fetch_executor;
pub use fetch_executor::*;
mod fs_list_executor;
pub use fs_list_executor::*;
mod fs_fetch_executor;
pub use fs_fetch_executor::*;

mod source_backfill_state_table;
pub use source_backfill_state_table::BackfillStateTableHandler;
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
.map_err(StreamExecutorError::from)
}

/// this method should only be used by [`FsSourceExecutor`](super::FsSourceExecutor)
/// this method should only be used by [`LegacyFsSourceExecutor`](super::LegacyFsSourceExecutor)
pub(crate) async fn get_all_completed(&self) -> StreamExecutorResult<HashSet<SplitId>> {
let start = Bound::Excluded(row::once(Some(Self::string_to_scalar(
COMPLETE_SPLIT_PREFIX,
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<S: StateStore> SourceStateTableHandler<S> {
}

/// set all complete
/// can only used by [`FsSourceExecutor`](super::FsSourceExecutor)
/// can only used by [`LegacyFsSourceExecutor`](super::LegacyFsSourceExecutor)
pub(crate) async fn set_all_complete(
&mut self,
states: Vec<SplitImpl>,
Expand Down
6 changes: 2 additions & 4 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::executor::source::{
};
use crate::executor::TroublemakerExecutor;

const FS_CONNECTORS: &[&str] = &["s3"];
pub struct SourceExecutorBuilder;

pub fn create_source_desc_builder(
Expand Down Expand Up @@ -197,13 +196,12 @@ impl ExecutorBuilder for SourceExecutorBuilder {
state_table_handler,
);

let connector = get_connector_name(&source.with_properties);
let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str());
let is_fs_connector = source.with_properties.is_legacy_fs_connector();
let is_fs_v2_connector = source.with_properties.is_new_fs_connector();

if is_fs_connector {
#[expect(deprecated)]
crate::executor::source::FsSourceExecutor::new(
crate::executor::source::LegacyFsSourceExecutor::new(
params.actor_context.clone(),
stream_source_core,
params.executor_stats,
Expand Down

0 comments on commit 5bbae1f

Please sign in to comment.