From 4a2cef30c450ac333790b61340d8a45017e974ac Mon Sep 17 00:00:00 2001 From: Luis Moreno Date: Fri, 20 Oct 2023 15:02:37 -0400 Subject: [PATCH] chore: move file iterators to fluvio_storage crate --- Cargo.lock | 1 + crates/fluvio-spu/Cargo.toml | 2 +- .../src/services/public/stream_fetch.rs | 2 +- .../src/services/public/tests/produce.rs | 3 +- crates/fluvio-spu/src/smartengine/context.rs | 4 +- .../fluvio-spu/src/smartengine/file_batch.rs | 346 +----------------- crates/fluvio-spu/src/storage/mod.rs | 4 +- crates/fluvio-storage/Cargo.toml | 3 + crates/fluvio-storage/src/file.rs | 4 +- crates/fluvio-storage/src/iterators.rs | 336 +++++++++++++++++ crates/fluvio-storage/src/lib.rs | 2 + 11 files changed, 352 insertions(+), 355 deletions(-) create mode 100644 crates/fluvio-storage/src/iterators.rs diff --git a/Cargo.lock b/Cargo.lock index 6db630f5fd..db3527888d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3217,6 +3217,7 @@ dependencies = [ "bytes 1.5.0", "clap", "derive_builder", + "fluvio-compression", "fluvio-controlplane", "fluvio-controlplane-metadata", "fluvio-future", diff --git a/crates/fluvio-spu/Cargo.toml b/crates/fluvio-spu/Cargo.toml index a0a9ca2482..39b8bdfbe2 100644 --- a/crates/fluvio-spu/Cargo.toml +++ b/crates/fluvio-spu/Cargo.toml @@ -51,7 +51,7 @@ chrono = { workspace = true } # Fluvio dependencies fluvio = { workspace = true } fluvio-types = { workspace = true, features = ["events"] } -fluvio-storage = { workspace = true } +fluvio-storage = { workspace = true, features = ["iterators"] } fluvio-compression = { workspace = true } fluvio-controlplane = { workspace = true } fluvio-controlplane-metadata = { workspace = true } diff --git a/crates/fluvio-spu/src/services/public/stream_fetch.rs b/crates/fluvio-spu/src/services/public/stream_fetch.rs index 4d0898f976..a2f1ef2ef5 100644 --- a/crates/fluvio-spu/src/services/public/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/stream_fetch.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Instant; +use fluvio_storage::iterators::FileBatchIterator; use tracing::{debug, error, instrument, trace, warn}; use tokio::select; @@ -31,7 +32,6 @@ use crate::services::public::conn_context::ConnectionContext; use crate::services::public::stream_fetch::publishers::INIT_OFFSET; use crate::smartengine::context::SmartModuleContext; use crate::smartengine::batch::process_batch; -use crate::smartengine::file_batch::FileBatchIterator; use crate::core::metrics::SpuMetrics; use crate::traffic::TrafficType; diff --git a/crates/fluvio-spu/src/services/public/tests/produce.rs b/crates/fluvio-spu/src/services/public/tests/produce.rs index 7658f10e07..4189461a45 100644 --- a/crates/fluvio-spu/src/services/public/tests/produce.rs +++ b/crates/fluvio-spu/src/services/public/tests/produce.rs @@ -3,7 +3,7 @@ use std::{env::temp_dir, time::Duration}; use fluvio::{SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind}; use fluvio_controlplane::replica::Replica; use fluvio_smartmodule::{Record, dataplane::smartmodule::Lookback}; -use fluvio_storage::FileReplica; +use fluvio_storage::{FileReplica, iterators::FileBatchIterator}; use tracing::debug; use fluvio_protocol::{ @@ -34,7 +34,6 @@ use crate::{ }, }, replication::leader::LeaderReplicaState, - smartengine::file_batch::FileBatchIterator, }; #[fluvio_future::test(ignore)] diff --git a/crates/fluvio-spu/src/smartengine/context.rs b/crates/fluvio-spu/src/smartengine/context.rs index a3dca52738..da857fe384 100644 --- a/crates/fluvio-spu/src/smartengine/context.rs +++ b/crates/fluvio-spu/src/smartengine/context.rs @@ -7,6 +7,7 @@ use fluvio_protocol::link::ErrorCode; use fluvio_smartmodule::Record; use fluvio_spu_schema::server::smartmodule::{SmartModuleInvocation, SmartModuleInvocationWasm}; use fluvio_storage::ReplicaStorage; +use fluvio_storage::iterators::{FileBatch, FileBatchIterator, FileRecordIterator, RecordItem}; use fluvio_types::Timestamp; use tracing::{debug, trace, error}; @@ -15,14 +16,11 @@ use crate::core::metrics::SpuMetrics; use crate::replication::leader::LeaderReplicaState; use crate::smartengine::chain; -use crate::smartengine::file_batch::{FileRecordIterator, FileBatchIterator}; use crate::smartengine::Lookback; use crate::smartengine::SmartModuleChainBuilder; use crate::smartengine::SmartModuleChainInstance; use crate::smartengine::Version; -use super::file_batch::{FileBatch, RecordItem}; - #[derive(Debug)] pub struct SmartModuleContext { chain: SmartModuleChainInstance, diff --git a/crates/fluvio-spu/src/smartengine/file_batch.rs b/crates/fluvio-spu/src/smartengine/file_batch.rs index e42cb78e86..b5d21b7802 100644 --- a/crates/fluvio-spu/src/smartengine/file_batch.rs +++ b/crates/fluvio-spu/src/smartengine/file_batch.rs @@ -1,25 +1,11 @@ -use std::collections::VecDeque; -use std::os::fd::BorrowedFd; -use std::os::unix::io::RawFd; -use std::io::{Error as IoError, ErrorKind, Cursor}; - -use fluvio_protocol::{Decoder, Version}; -use fluvio_smartmodule::Record; use fluvio_types::Timestamp; -use tracing::{warn, debug}; -use nix::sys::uio::pread; -use fluvio_protocol::record::{Batch, Offset, BATCH_FILE_HEADER_SIZE, BATCH_HEADER_SIZE}; -use fluvio_future::file_slice::AsyncFileSlice; +use fluvio_protocol::record::Offset; use fluvio_compression::{Compression, CompressionError}; use super::batch::SmartModuleInputBatch; -// only encode information necessary to decode batches efficiently -pub struct FileBatch { - pub(crate) batch: Batch, - pub(crate) records: Vec, -} +use fluvio_storage::iterators::FileBatch; impl SmartModuleInputBatch for FileBatch { fn records(&self) -> &Vec { @@ -42,331 +28,3 @@ impl SmartModuleInputBatch for FileBatch { self.batch.get_compression() } } - -/// Iterator that returns batch from file -pub struct FileBatchIterator { - fd: RawFd, - offset: Offset, - end: i64, -} - -impl FileBatchIterator { - #[allow(unused)] - pub fn new(fd: RawFd, offset: Offset, len: i64) -> Self { - Self { - fd, - offset, - end: offset + len, - } - } - - pub fn from_raw_slice(slice: AsyncFileSlice) -> Self { - use std::os::unix::io::AsRawFd; - let offset = slice.position() as i64; - Self { - fd: slice.as_raw_fd(), - offset, - end: offset + slice.len() as i64, - } - } -} - -impl Iterator for FileBatchIterator { - type Item = Result; - - fn next(&mut self) -> Option { - if self.offset >= self.end { - return None; - } - - let offset = self.offset; - - // ugly hack for armv7 pread offset = i32 - // needed for gnu but not zig musl - #[cfg(all(target_pointer_width = "32", target_env = "gnu"))] - let offset: i32 = offset.try_into().unwrap(); - - let mut header = vec![0u8; BATCH_FILE_HEADER_SIZE]; - let bytes_read = match pread( - unsafe { BorrowedFd::borrow_raw(self.fd) }, - &mut header, - offset, - ) - .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {err}"))) - { - Ok(bytes) => bytes, - Err(err) => return Some(Err(err)), - }; - - if bytes_read < header.len() { - warn!(bytes_read, header_len = header.len()); - return Some(Err(IoError::new( - ErrorKind::UnexpectedEof, - format!( - "not eough for batch header {} out of {}", - bytes_read, - header.len() - ), - ))); - } - - let mut batch: Batch = Batch::default(); - if let Err(err) = batch.decode_from_file_buf(&mut Cursor::new(header), 0) { - return Some(Err(IoError::new( - ErrorKind::Other, - format!("decodinge batch header error {err}"), - ))); - } - - let remainder = batch.batch_len as usize - BATCH_HEADER_SIZE; - - debug!( - file_offset = self.offset, - base_offset = batch.base_offset, - "fbatch header" - ); - - let mut raw_records = vec![0u8; remainder]; - - self.offset += BATCH_FILE_HEADER_SIZE as i64; - - let offset = self.offset; - - // ugly hack for armv7 pread offset = i32 - // needed for gnu but not zig musl - #[cfg(all(target_pointer_width = "32", target_env = "gnu"))] - let offset: i32 = offset.try_into().unwrap(); - - let bytes_read = match pread( - unsafe { BorrowedFd::borrow_raw(self.fd) }, - &mut raw_records, - offset, - ) - .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {err}"))) - { - Ok(bytes) => bytes, - Err(err) => return Some(Err(err)), - }; - - if bytes_read < raw_records.len() { - warn!(bytes_read, record_len = raw_records.len()); - return Some(Err(IoError::new( - ErrorKind::UnexpectedEof, - format!( - "not enough for batch records {} out of {}", - bytes_read, - raw_records.len() - ), - ))); - } - - let compression = match batch.get_compression() { - Ok(compression) => compression, - Err(err) => { - return Some(Err(IoError::new( - ErrorKind::Other, - format!("unknown compression value for batch {err}"), - ))) - } - }; - - let records = match compression.uncompress(&raw_records) { - Ok(Some(records)) => records, - Ok(None) => raw_records, - Err(err) => { - return Some(Err(IoError::new( - ErrorKind::Other, - format!("uncompress error {err}"), - ))) - } - }; - - self.offset += bytes_read as i64; - - debug!(file_offset = self.offset, "fbatch end"); - - Some(Ok(FileBatch { batch, records })) - } -} - -/// Iterator that converts an iterator over file batches to an iterator over record items. -/// RecordItem is a record with resolved offset and timestamp. -pub struct FileRecordIterator>> { - batch_iterator: T, - batch: VecDeque, - version: Version, -} - -impl>> FileRecordIterator { - pub fn new(batch_iterator: T, version: Version) -> Self { - Self { - batch_iterator, - batch: VecDeque::new(), - version, - } - } -} - -impl>> Iterator for FileRecordIterator { - type Item = Result; - - fn next(&mut self) -> Option { - match self.batch.pop_front() { - Some(r) => Some(Ok(r)), - None => { - let next_batch = match self.batch_iterator.next()? { - Ok(b) => b, - Err(err) => return Some(Err(err)), - }; - - let base_offset = next_batch.batch.base_offset; - let base_timestamp = next_batch.batch.header.first_timestamp; - - let mut records: Vec = vec![]; - if let Err(err) = Decoder::decode( - &mut records, - &mut std::io::Cursor::new(next_batch.records), - self.version, - ) { - return Some(Err(err)); - } - self.batch.append( - &mut records - .into_iter() - .map(|record| { - let offset = base_offset + record.get_header().offset_delta(); - let timestamp = - base_timestamp + record.get_header().get_timestamp_delta(); - RecordItem { - record, - offset, - timestamp, - } - }) - .collect(), - ); - self.batch.pop_front().map(Ok) - } - } - } -} - -#[derive(Debug)] -pub struct RecordItem { - pub record: Record, - pub offset: Offset, - pub timestamp: Timestamp, -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - use std::time::SystemTime; - use std::fs::File; - use std::env::temp_dir; - use std::os::unix::io::AsRawFd; - - use fluvio_future::task::run_block_on; - use fluvio_protocol::record::RecordSet; - use fluvio_storage::{FileReplica, ReplicaStorage}; - use fluvio_storage::config::{StorageConfigBuilder, ReplicaConfigBuilder}; - - use super::*; - - #[test] - fn test_file_record_iterator() -> anyhow::Result<()> { - //given - let base_dir = temp_dir().join("test_file_record_iterator"); - let mut replica = run_block_on(FileReplica::create_or_load_with_storage( - format!( - "test_file_record_iterator_{}", - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH)? - .as_millis() - ), - Default::default(), - Default::default(), - ReplicaConfigBuilder::default().base_dir(base_dir).build(), - Arc::new(StorageConfigBuilder::default().build()?), - ))?; - - let mut batch1 = Batch::default(); - batch1.header.first_timestamp = 100; - - batch1.add_record(Record::new("1")); - batch1.add_record(Record::new("2")); - batch1.mut_records()[0].preamble.set_timestamp_delta(1); - batch1.mut_records()[1].preamble.set_timestamp_delta(2); - - let mut batch2 = Batch::default(); - batch2.header.first_timestamp = 200; - batch2.base_offset = 2; - - batch2.add_record(Record::new("3")); - batch2.add_record(Record::new("4")); - batch2.mut_records()[0].preamble.set_timestamp_delta(1); - batch2.mut_records()[1].preamble.set_timestamp_delta(2); - - let mut records = RecordSet { - batches: vec![batch1, batch2], - }; - run_block_on(replica.write_recordset(&mut records, false))?; - - //when - let slice = run_block_on(replica.read_partition_slice( - 0, - u32::MAX, - fluvio::Isolation::ReadUncommitted, - ))?; - let file_slice = slice - .file_slice - .ok_or_else(|| anyhow::anyhow!("expected file slice"))?; - - let record_iter = FileRecordIterator::new(FileBatchIterator::from_raw_slice(file_slice), 0); - let records: Vec = - record_iter.collect::, std::io::Error>>()?; - - //then - assert_eq!(records.len(), 4); - assert_eq!(std::str::from_utf8(records[0].record.value())?, "1"); - assert_eq!(records[0].offset, 0); - assert_eq!(records[0].timestamp, 101); - - assert_eq!(std::str::from_utf8(records[1].record.value())?, "2"); - assert_eq!(records[1].offset, 1); - assert_eq!(records[1].timestamp, 102); - - assert_eq!(std::str::from_utf8(records[2].record.value())?, "3"); - assert_eq!(records[2].offset, 2); - assert_eq!(records[2].timestamp, 201); - - assert_eq!(std::str::from_utf8(records[3].record.value())?, "4"); - assert_eq!(records[3].offset, 3); - assert_eq!(records[3].timestamp, 202); - - Ok(()) - } - - #[test] - fn test_file_record_iterator_error_propagated() -> anyhow::Result<()> { - //given - let path = temp_dir().join("test_file_record_iterator_error_propagated"); - let bytes = b"some data"; - std::fs::write(&path, bytes)?; - - let read_only = File::open(path)?; - let fd = read_only.as_raw_fd(); - - let file_slice = AsyncFileSlice::new(fd, 0, bytes.len() as u64); - - //when - let record_iter = FileRecordIterator::new(FileBatchIterator::from_raw_slice(file_slice), 0); - let res = record_iter.collect::, std::io::Error>>(); - - //then - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::UnexpectedEof); - - Ok(()) - } -} diff --git a/crates/fluvio-spu/src/storage/mod.rs b/crates/fluvio-spu/src/storage/mod.rs index 93dfccafbe..a6c299eff8 100644 --- a/crates/fluvio-spu/src/storage/mod.rs +++ b/crates/fluvio-spu/src/storage/mod.rs @@ -7,13 +7,13 @@ use async_rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use anyhow::Result; use fluvio_protocol::record::BatchRecords; -use fluvio_controlplane_metadata::partition::{ReplicaKey}; +use fluvio_controlplane_metadata::partition::ReplicaKey; use fluvio_spu_schema::Isolation; use fluvio_protocol::Encoder; use fluvio_protocol::record::{Offset, RecordSet}; use fluvio_protocol::link::ErrorCode; use fluvio_storage::{ReplicaStorage, StorageError, OffsetInfo, ReplicaSlice}; -use fluvio_types::{event::offsets::OffsetChangeListener}; +use fluvio_types::event::offsets::OffsetChangeListener; use fluvio_types::event::offsets::OffsetPublisher; pub const REMOVAL_START: Offset = -1000; // indicate that storage about to be removed diff --git a/crates/fluvio-storage/Cargo.toml b/crates/fluvio-storage/Cargo.toml index 215898475c..a18512e496 100644 --- a/crates/fluvio-storage/Cargo.toml +++ b/crates/fluvio-storage/Cargo.toml @@ -15,7 +15,9 @@ doc = false required-features = ["cli", "fluvio-future/subscriber"] [features] +default = ["iterators"] cli = ["clap"] +iterators = [] fixture = [] @@ -56,6 +58,7 @@ fluvio-future = { workspace = true, features = ["fixture"] } flv-util = { workspace = true, features = ["fixture"] } fluvio-socket = { workspace = true, features = ["file"] } fluvio-protocol = { workspace = true, features = ["fixture"] } +fluvio-compression = { workspace = true, features = ["compress"] } [package.metadata.cargo-udeps.ignore] development = ["fluvio-socket"] diff --git a/crates/fluvio-storage/src/file.rs b/crates/fluvio-storage/src/file.rs index deb3d5f0f3..c815c466e9 100644 --- a/crates/fluvio-storage/src/file.rs +++ b/crates/fluvio-storage/src/file.rs @@ -7,7 +7,7 @@ use blocking::unblock; use bytes::{BytesMut, Bytes}; use fluvio_protocol::record::Size; -use libc::{c_void}; +use libc::c_void; use nix::errno::Errno; use nix::Result as NixResult; use tracing::{debug, instrument, trace}; @@ -328,7 +328,7 @@ mod tests { use futures_lite::AsyncWriteExt; - use fluvio_future::{fs::File}; + use fluvio_future::fs::File; use super::*; diff --git a/crates/fluvio-storage/src/iterators.rs b/crates/fluvio-storage/src/iterators.rs new file mode 100644 index 0000000000..50ff30d70d --- /dev/null +++ b/crates/fluvio-storage/src/iterators.rs @@ -0,0 +1,336 @@ +use std::collections::VecDeque; +use std::os::fd::BorrowedFd; +use std::os::unix::io::RawFd; +use std::io::{Error as IoError, ErrorKind, Cursor}; + +use nix::sys::uio::pread; + +use fluvio_protocol::types::Timestamp; +use fluvio_protocol::{Decoder, Version}; + +use fluvio_protocol::record::{Batch, Offset, BATCH_FILE_HEADER_SIZE, BATCH_HEADER_SIZE, Record}; +use fluvio_future::file_slice::AsyncFileSlice; + +// only encode information necessary to decode batches efficiently +pub struct FileBatch { + pub batch: Batch, + pub records: Vec, +} + +#[derive(Debug)] +pub struct RecordItem { + pub record: Record, + pub offset: Offset, + pub timestamp: Timestamp, +} + +/// Iterator that returns batch from file +pub struct FileBatchIterator { + fd: RawFd, + offset: Offset, + end: i64, +} + +impl FileBatchIterator { + #[allow(unused)] + pub fn new(fd: RawFd, offset: Offset, len: i64) -> Self { + Self { + fd, + offset, + end: offset + len, + } + } + + pub fn from_raw_slice(slice: AsyncFileSlice) -> Self { + use std::os::unix::io::AsRawFd; + let offset = slice.position() as i64; + Self { + fd: slice.as_raw_fd(), + offset, + end: offset + slice.len() as i64, + } + } +} + +impl Iterator for FileBatchIterator { + type Item = Result; + + fn next(&mut self) -> Option { + if self.offset >= self.end { + return None; + } + + let offset = self.offset; + + // ugly hack for armv7 pread offset = i32 + // needed for gnu but not zig musl + #[cfg(all(target_pointer_width = "32", target_env = "gnu"))] + let offset: i32 = offset.try_into().unwrap(); + + let mut header = vec![0u8; BATCH_FILE_HEADER_SIZE]; + let bytes_read = match pread( + unsafe { BorrowedFd::borrow_raw(self.fd) }, + &mut header, + offset, + ) + .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {err}"))) + { + Ok(bytes) => bytes, + Err(err) => return Some(Err(err)), + }; + + if bytes_read < header.len() { + return Some(Err(IoError::new( + ErrorKind::UnexpectedEof, + format!( + "not eough for batch header {} out of {}", + bytes_read, + header.len() + ), + ))); + } + + let mut batch: Batch = Batch::default(); + if let Err(err) = batch.decode_from_file_buf(&mut Cursor::new(header), 0) { + return Some(Err(IoError::new( + ErrorKind::Other, + format!("decodinge batch header error {err}"), + ))); + } + + let remainder = batch.batch_len as usize - BATCH_HEADER_SIZE; + + let mut raw_records = vec![0u8; remainder]; + + self.offset += BATCH_FILE_HEADER_SIZE as i64; + + let offset = self.offset; + + // ugly hack for armv7 pread offset = i32 + // needed for gnu but not zig musl + #[cfg(all(target_pointer_width = "32", target_env = "gnu"))] + let offset: i32 = offset.try_into().unwrap(); + + let bytes_read = match pread( + unsafe { BorrowedFd::borrow_raw(self.fd) }, + &mut raw_records, + offset, + ) + .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {err}"))) + { + Ok(bytes) => bytes, + Err(err) => return Some(Err(err)), + }; + + if bytes_read < raw_records.len() { + return Some(Err(IoError::new( + ErrorKind::UnexpectedEof, + format!( + "not enough for batch records {} out of {}", + bytes_read, + raw_records.len() + ), + ))); + } + + let compression = match batch.get_compression() { + Ok(compression) => compression, + Err(err) => { + return Some(Err(IoError::new( + ErrorKind::Other, + format!("unknown compression value for batch {err}"), + ))) + } + }; + + let records = match compression.uncompress(&raw_records) { + Ok(Some(records)) => records, + Ok(None) => raw_records, + Err(err) => { + return Some(Err(IoError::new( + ErrorKind::Other, + format!("uncompress error {err}"), + ))) + } + }; + + self.offset += bytes_read as i64; + + Some(Ok(FileBatch { batch, records })) + } +} + +/// Iterator that converts an iterator over file batches to an iterator over record items. +/// RecordItem is a record with resolved offset and timestamp. +pub struct FileRecordIterator>> { + batch_iterator: T, + batch: VecDeque, + version: Version, +} + +impl>> FileRecordIterator { + pub fn new(batch_iterator: T, version: Version) -> Self { + Self { + batch_iterator, + batch: VecDeque::new(), + version, + } + } +} + +impl>> Iterator for FileRecordIterator { + type Item = Result; + + fn next(&mut self) -> Option { + match self.batch.pop_front() { + Some(r) => Some(Ok(r)), + None => { + let next_batch = match self.batch_iterator.next()? { + Ok(b) => b, + Err(err) => return Some(Err(err)), + }; + + let base_offset = next_batch.batch.base_offset; + let base_timestamp = next_batch.batch.header.first_timestamp; + + let mut records: Vec = vec![]; + if let Err(err) = Decoder::decode( + &mut records, + &mut std::io::Cursor::new(next_batch.records), + self.version, + ) { + return Some(Err(err)); + } + self.batch.append( + &mut records + .into_iter() + .map(|record| { + let offset = base_offset + record.get_header().offset_delta(); + let timestamp = + base_timestamp + record.get_header().get_timestamp_delta(); + RecordItem { + record, + offset, + timestamp, + } + }) + .collect(), + ); + self.batch.pop_front().map(Ok) + } + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + use std::time::SystemTime; + use std::fs::File; + use std::env::temp_dir; + use std::os::unix::io::AsRawFd; + + use fluvio_future::task::run_block_on; + use fluvio_protocol::record::RecordSet; + use crate::{FileReplica, ReplicaStorage}; + use crate::config::{StorageConfigBuilder, ReplicaConfigBuilder}; + + use super::*; + + #[test] + fn test_file_record_iterator() -> anyhow::Result<()> { + //given + let base_dir = temp_dir().join("test_file_record_iterator"); + let mut replica = run_block_on(FileReplica::create_or_load_with_storage( + format!( + "test_file_record_iterator_{}", + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_millis() + ), + Default::default(), + Default::default(), + ReplicaConfigBuilder::default().base_dir(base_dir).build(), + Arc::new(StorageConfigBuilder::default().build()?), + ))?; + + let mut batch1 = Batch::default(); + batch1.header.first_timestamp = 100; + + batch1.add_record(Record::new("1")); + batch1.add_record(Record::new("2")); + batch1.mut_records()[0].preamble.set_timestamp_delta(1); + batch1.mut_records()[1].preamble.set_timestamp_delta(2); + + let mut batch2 = Batch::default(); + batch2.header.first_timestamp = 200; + batch2.base_offset = 2; + + batch2.add_record(Record::new("3")); + batch2.add_record(Record::new("4")); + batch2.mut_records()[0].preamble.set_timestamp_delta(1); + batch2.mut_records()[1].preamble.set_timestamp_delta(2); + + let mut records = RecordSet { + batches: vec![batch1, batch2], + }; + run_block_on(replica.write_recordset(&mut records, false))?; + + //when + let slice = run_block_on(replica.read_partition_slice( + 0, + u32::MAX, + fluvio_spu_schema::Isolation::ReadUncommitted, + ))?; + let file_slice = slice + .file_slice + .ok_or_else(|| anyhow::anyhow!("expected file slice"))?; + + let record_iter = FileRecordIterator::new(FileBatchIterator::from_raw_slice(file_slice), 0); + let records: Vec = + record_iter.collect::, std::io::Error>>()?; + + //then + assert_eq!(records.len(), 4); + assert_eq!(std::str::from_utf8(records[0].record.value())?, "1"); + assert_eq!(records[0].offset, 0); + assert_eq!(records[0].timestamp, 101); + + assert_eq!(std::str::from_utf8(records[1].record.value())?, "2"); + assert_eq!(records[1].offset, 1); + assert_eq!(records[1].timestamp, 102); + + assert_eq!(std::str::from_utf8(records[2].record.value())?, "3"); + assert_eq!(records[2].offset, 2); + assert_eq!(records[2].timestamp, 201); + + assert_eq!(std::str::from_utf8(records[3].record.value())?, "4"); + assert_eq!(records[3].offset, 3); + assert_eq!(records[3].timestamp, 202); + + Ok(()) + } + + #[test] + fn test_file_record_iterator_error_propagated() -> anyhow::Result<()> { + //given + let path = temp_dir().join("test_file_record_iterator_error_propagated"); + let bytes = b"some data"; + std::fs::write(&path, bytes)?; + + let read_only = File::open(path)?; + let fd = read_only.as_raw_fd(); + + let file_slice = AsyncFileSlice::new(fd, 0, bytes.len() as u64); + + //when + let record_iter = FileRecordIterator::new(FileBatchIterator::from_raw_slice(file_slice), 0); + let res = record_iter.collect::, std::io::Error>>(); + + //then + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::UnexpectedEof); + + Ok(()) + } +} diff --git a/crates/fluvio-storage/src/lib.rs b/crates/fluvio-storage/src/lib.rs index dda7583d2d..ea2a8605b9 100644 --- a/crates/fluvio-storage/src/lib.rs +++ b/crates/fluvio-storage/src/lib.rs @@ -13,6 +13,8 @@ mod util; mod validator; mod file; pub mod config; +#[cfg(feature = "iterators")] +pub mod iterators; #[cfg(feature = "fixture")] pub mod fixture;