Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow lazily chunking unsorted iteration #55

Merged
merged 6 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/my_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn main() {

// Open finished file
let all_items = if unsorted_read {
UnsortedShardReader::<T1>::open(tmp.path())?.collect::<Result<_, _>>()?
UnsortedShardReader::<T1>::open(tmp.path()).collect::<Result<_, _>>()?
} else {
let reader = ShardReader::<T1>::open(tmp.path())?;

Expand Down
218 changes: 53 additions & 165 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
//! assert_eq!(all_items, all_items_sorted);
//!
//! // If you want to iterate through the items in unsorted order.
//! let unsorted_items: Vec<_> = UnsortedShardReader::<DataStruct>::open(filename)?.collect();
//! let unsorted_items: Vec<_> = UnsortedShardReader::<DataStruct>::open(filename).collect();
//! // You will get the items in the order they are written to disk.
//! assert_eq!(unsorted_items.len(), all_items.len());
//!
Expand All @@ -84,6 +84,7 @@

#![deny(warnings)]
#![deny(missing_docs)]
use std::any::type_name;
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::fs::File;
Expand All @@ -94,7 +95,6 @@ use std::os::unix::fs::FileExt;
use std::path::Path;
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread;
use std::{any::type_name, path::PathBuf};

use anyhow::{format_err, Error};
use bincode::{deserialize_from, serialize_into};
Expand All @@ -112,6 +112,9 @@ pub mod helper;
pub use crate::range::Range;
use range::Rorder;

mod unsorted;
pub use unsorted::*;

/// The size (in bytes) of a ShardIter object (mostly buffers)
// ? sizeof(T)
// + 8 usize items_remaining
Expand Down Expand Up @@ -1392,157 +1395,6 @@ where
}
}

#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
/// A group of `len_items` items, from shard `shard`, stored at position `offset`, using `len_bytes` bytes on-disk.
/// Similar to ShardRecord, just that we don't store the key. Used for `UnsortedShardReader`
struct KeylessShardRecord {
offset: usize,
len_bytes: usize,
len_items: usize,
}

/// Read from a collection of shardio files in the order in which items are written without
/// considering the sort order.
///
/// Useful if you just want to iterate over all the items irrespective of the ordering.
///
#[allow(dead_code)]
pub struct UnsortedShardReader<T, S = DefaultSort>
where
S: SortKey<T>,
{
shard_files: Vec<PathBuf>,
// Which file among the shard_files are we reading from
active_file_num: usize,
// The index of the shard file we are reading from
active_file_index: Vec<KeylessShardRecord>,
// Which KeylessShardRecord among the active_file_index are we reading now
active_index_num: usize,
// How many items within a compressed block have we read so far
active_index_items_read: usize,
decoder: Option<lz4::Decoder<BufReader<ReadAdapter<File, File>>>>,
phantom: PhantomData<(T, S)>,
}

impl<T, S> UnsortedShardReader<T, S>
where
T: DeserializeOwned,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned,
S: SortKey<T>,
{
/// Open a single shard file
pub fn open<P: AsRef<Path>>(shard_file: P) -> Result<Self, Error> {
UnsortedShardReader::open_set(&[shard_file])
}

/// Open a set of shard files
pub fn open_set<P: AsRef<Path>>(shard_files: &[P]) -> Result<Self, Error> {
let shard_files: Vec<_> = shard_files.iter().map(|f| f.as_ref().into()).collect();

Ok(UnsortedShardReader {
shard_files,
active_file_num: 0,
active_file_index: Vec::new(),
active_index_num: 0,
active_index_items_read: 0,
decoder: None,
phantom: PhantomData,
})
}
}

impl<T, S> Iterator for UnsortedShardReader<T, S>
where
T: DeserializeOwned,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned,
S: SortKey<T>,
{
type Item = Result<T, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.active_file_num >= self.shard_files.len() {
// We are done going through all the files
return None;
}
if self.decoder.is_none() {
// Open the next file
self.active_index_num = 0;
self.active_index_items_read = 0;

let reader = match ShardReaderSingle::<T, S>::open(
&self.shard_files[self.active_file_num],
) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
self.active_file_index = reader
.index
.into_iter()
.map(|r| KeylessShardRecord {
offset: r.offset,
len_bytes: r.len_bytes,
len_items: r.len_items,
})
.collect();

let decoder = match self.active_file_index.first() {
Some(rec) => lz4::Decoder::new(BufReader::new(ReadAdapter::new(
reader.file,
rec.offset,
rec.len_bytes,
))),
None => {
// There are no chunks in this file
self.active_file_num += 1;
continue;
}
};
self.decoder = match decoder {
Ok(d) => Some(d),
Err(e) => return Some(Err(e.into())),
};
}
if self.active_index_items_read
>= self.active_file_index[self.active_index_num].len_items
{
// We are done with this chunk
self.active_index_num += 1;
self.active_index_items_read = 0;

if self.active_index_num >= self.active_file_index.len() {
// We are done with this file
self.decoder = None;
self.active_file_num += 1;
self.active_index_num = 0;
} else {
// Load up the decoder for the next chunk
let decoder = self.decoder.take().unwrap();
let (buf, _) = decoder.finish();
let file = buf.into_inner().file;
let rec = self.active_file_index[self.active_index_num];
let decoder = lz4::Decoder::new(BufReader::new(ReadAdapter::new(
file,
rec.offset,
rec.len_bytes,
)));
self.decoder = match decoder {
Ok(d) => Some(d),
Err(e) => return Some(Err(e.into())),
};
}
continue;
} else {
// Read the next item
self.active_index_items_read += 1;
match deserialize_from(self.decoder.as_mut().unwrap()) {
Ok(item) => return Some(Ok(item)),
Err(e) => return Some(Err(e.into())),
}
}
}
}
}

#[cfg(test)]
mod shard_tests {
use super::*;
Expand All @@ -1553,6 +1405,7 @@ mod shard_tests {
use std::fmt::Debug;
use std::hash::Hash;
use std::iter::{repeat, FromIterator};
use std::path::PathBuf;

#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug, PartialOrd, Ord, Hash)]
struct T1 {
Expand Down Expand Up @@ -1835,7 +1688,7 @@ mod shard_tests {
}

let unsorted_items =
UnsortedShardReader::<T, S>::open_set(&files)?.collect::<Result<Vec<_>, _>>()?;
UnsortedShardReader::<T, S>::open_set(&files).collect::<Result<Vec<_>, _>>()?;
assert!(set_compare(&out_items, &unsorted_items));

Ok(out_items)
Expand Down Expand Up @@ -1938,10 +1791,12 @@ mod shard_tests {
disk_chunk_size, producer_chunk_size, n_items
);

let tmp = tempfile::NamedTempFile::new()?;
// Write two files to check file set reading logic.

// Write and close file
let true_items = {
let create = || -> Result<_, Error> {
let tmp = tempfile::NamedTempFile::new()?;

// Write and close file
let mut writer: ShardWriter<T1> = ShardWriter::new(
tmp.path(),
producer_chunk_size,
Expand All @@ -1955,12 +1810,20 @@ mod shard_tests {

writer.finish()?;
true_items.sort();
true_items
Ok((tmp, true_items))
};

let (tmp0, true_items0) = create()?;
let (tmp1, true_items1) = create()?;

let mut true_items = Vec::from_iter(true_items0.into_iter().chain(true_items1));
true_items.sort();

let file_set = [tmp0.path(), tmp1.path()];

if do_read {
// Open finished file
let reader = ShardReader::<T1>::open(tmp.path())?;
// Open finished files
let reader = ShardReader::<T1>::open_set(&file_set)?;
let iter = reader.iter_range(&Range::all())?;

let all_items_res: Result<Vec<_>, Error> = iter.collect();
Expand All @@ -1974,8 +1837,8 @@ mod shard_tests {
}

for rc in [1, 3, 8, 15, 27].iter() {
// Open finished file & test chunked reads
let set_reader = ShardReader::<T1>::open(tmp.path())?;
// Open finished files & test chunked reads
let set_reader = ShardReader::<T1>::open_set(&file_set)?;
let mut all_items_chunks = Vec::new();

// Read in chunks
Expand All @@ -2000,10 +1863,29 @@ mod shard_tests {
}

// Check the unsorted read
let unsorted_reader = UnsortedShardReader::<T1>::open(tmp.path())?;
assert_eq!(2 * n_items, UnsortedShardReader::<T1>::len(&file_set)?);
let unsorted_reader = UnsortedShardReader::<T1>::open_set(&file_set);
let all_items_res: Result<Vec<_>, Error> = unsorted_reader.collect();
let all_items = all_items_res?;
assert!(set_compare(&true_items, &all_items));

let check_unsorted_skip = |to_skip: usize| -> Result<(), Error> {
let mut unsorted_reader_skip = UnsortedShardReader::<T1>::open_set(&file_set);
let skipped = unsorted_reader_skip.skip_lazy(to_skip)?;
assert_eq!(to_skip, skipped);
let all_items_res_skip: Result<Vec<_>, Error> = unsorted_reader_skip.collect();
let all_items_skip = all_items_res_skip?;
assert_eq!(&all_items[to_skip..], &all_items_skip);
Ok(())
};

check_unsorted_skip(0)?;
check_unsorted_skip(1)?;
check_unsorted_skip(disk_chunk_size)?;
check_unsorted_skip((disk_chunk_size * 3) + 1)?;
check_unsorted_skip(n_items)?; // skip entire first file
check_unsorted_skip(n_items + 1)?; // skip entire first file plus next item
check_unsorted_skip(n_items * 2)?; // skip everything
}
Ok(())
}
Expand Down Expand Up @@ -2084,7 +1966,7 @@ mod shard_tests {
assert!(set_compare(&true_items, &all_items_chunks));

// Check the unsorted read
let unsorted_reader = UnsortedShardReader::<T1, FieldDSort>::open(tmp.path())?;
let unsorted_reader = UnsortedShardReader::<T1, FieldDSort>::open(tmp.path());
let all_items_res: Result<Vec<_>, Error> = unsorted_reader.collect();
let all_items = all_items_res?;
assert!(set_compare(&true_items, &all_items));
Expand Down Expand Up @@ -2262,7 +2144,13 @@ mod shard_tests {
#[test]
fn test_empty_open_set() {
let shard_files = Vec::<PathBuf>::new();
let reader = UnsortedShardReader::<u8>::open_set(&shard_files).unwrap();
let reader = UnsortedShardReader::<u8>::open_set(&shard_files);
assert_eq!(reader.count(), 0);

// Test that skipping an empty set works correctly.
let mut reader = UnsortedShardReader::<u8>::open_set(&shard_files);
let skipped = reader.skip_lazy(10).unwrap();
assert_eq!(0, skipped);
assert_eq!(reader.count(), 0);
}
}
Loading
Loading