Skip to content

Commit

Permalink
chore: move file iterators to fluvio_storage crate (#3613)
Browse files Browse the repository at this point in the history
  • Loading branch information
morenol committed Oct 20, 2023
1 parent bf4e866 commit fe61757
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 359 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio-spu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ chrono = { workspace = true }
# Fluvio dependencies
fluvio = { workspace = true }
fluvio-types = { workspace = true, features = ["events"] }
fluvio-storage = { workspace = true }
fluvio-storage = { workspace = true, features = ["iterators"] }
fluvio-compression = { workspace = true }
fluvio-controlplane = { workspace = true }
fluvio-controlplane-metadata = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions crates/fluvio-spu/src/services/public/stream_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ use std::time::Instant;
use tracing::{debug, error, instrument, trace, warn};
use tokio::select;

use fluvio_compression::CompressionError;
use fluvio_controlplane_metadata::partition::ReplicaKey;
use fluvio_types::event::{StickyEvent, offsets::OffsetPublisher};
use fluvio_future::task::spawn;
use fluvio_socket::{ExclusiveFlvSink, SocketError};
use fluvio_protocol::{
api::{RequestMessage, RequestHeader},
record::{RecordSet, Offset, RawRecords},
};
use fluvio_protocol::link::{ErrorCode, smartmodule::SmartModuleTransformRuntimeError};
use fluvio_compression::CompressionError;
use fluvio_protocol::record::Batch;
use fluvio_socket::{ExclusiveFlvSink, SocketError};
use fluvio_storage::iterators::FileBatchIterator;
use fluvio_spu_schema::{
server::stream_fetch::{
DefaultStreamFetchRequest, FileStreamFetchRequest, StreamFetchRequest, StreamFetchResponse,
Expand All @@ -23,15 +25,13 @@ use fluvio_spu_schema::{
file::FileRecordSet,
};
use fluvio_types::event::offsets::OffsetChangeListener;
use fluvio_protocol::record::Batch;

use crate::core::{DefaultSharedGlobalContext, metrics::IncreaseValue};
use crate::replication::leader::SharedFileLeaderState;
use crate::services::public::conn_context::ConnectionContext;
use crate::services::public::stream_fetch::publishers::INIT_OFFSET;
use crate::smartengine::context::SmartModuleContext;
use crate::smartengine::batch::process_batch;
use crate::smartengine::file_batch::FileBatchIterator;
use crate::core::metrics::SpuMetrics;
use crate::traffic::TrafficType;

Expand Down
3 changes: 1 addition & 2 deletions crates/fluvio-spu/src/services/public/tests/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{env::temp_dir, time::Duration};
use fluvio::{SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind};
use fluvio_controlplane::replica::Replica;
use fluvio_smartmodule::{Record, dataplane::smartmodule::Lookback};
use fluvio_storage::FileReplica;
use fluvio_storage::{FileReplica, iterators::FileBatchIterator};
use tracing::debug;

use fluvio_protocol::{
Expand Down Expand Up @@ -34,7 +34,6 @@ use crate::{
},
},
replication::leader::LeaderReplicaState,
smartengine::file_batch::FileBatchIterator,
};

#[fluvio_future::test(ignore)]
Expand Down
4 changes: 1 addition & 3 deletions crates/fluvio-spu/src/smartengine/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use fluvio_protocol::link::ErrorCode;
use fluvio_smartmodule::Record;
use fluvio_spu_schema::server::smartmodule::{SmartModuleInvocation, SmartModuleInvocationWasm};
use fluvio_storage::ReplicaStorage;
use fluvio_storage::iterators::{FileBatch, FileBatchIterator, FileRecordIterator, RecordItem};
use fluvio_types::Timestamp;
use tracing::{debug, trace, error};

Expand All @@ -15,14 +16,11 @@ use crate::core::metrics::SpuMetrics;
use crate::replication::leader::LeaderReplicaState;

use crate::smartengine::chain;
use crate::smartengine::file_batch::{FileRecordIterator, FileBatchIterator};
use crate::smartengine::Lookback;
use crate::smartengine::SmartModuleChainBuilder;
use crate::smartengine::SmartModuleChainInstance;
use crate::smartengine::Version;

use super::file_batch::{FileBatch, RecordItem};

#[derive(Debug)]
pub struct SmartModuleContext {
chain: SmartModuleChainInstance,
Expand Down
Loading

0 comments on commit fe61757

Please sign in to comment.