Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple test #9

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/Test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Test

on:
push:
branches:
- main
tags:
- "*"
pull_request:

env:
CARGO_TERM_COLOR: always

permissions:
contents: read

jobs:
build_and_test:
runs-on: ubuntu-latest
strategy:
matrix:
toolchain:
- stable
steps:
- uses: actions/checkout@v4
- name: Rustup
run: rustup update ${{ matrix.toolchain }} && rustup default ${{ matrix.toolchain }}
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Build
run: cargo build --verbose
- name: Test
run: cargo llvm-cov --all-features --workspace --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: lcov.info
fail_ci_if_error: true
157 changes: 78 additions & 79 deletions src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,85 +8,84 @@ use log::warn;

use crate::{chunk::FileChunk, Result};

#[allow(dead_code)]
pub fn merge_chunks_by_naive_picking<K, E>(
canceled: Arc<AtomicBool>,
chunks: Vec<FileChunk<K>>,
mut add_fn: impl FnMut((K, Vec<u8>)) -> Result<(), E>,
) -> Result<(), E>
where
K: Ord + Pod + Copy + Send + Sync,
{
let tmp_file_paths = chunks
.iter()
.map(|chunk| chunk.path().to_owned())
.collect::<Vec<_>>();

let mut chunk_iters = chunks
.into_iter()
.map(|chunk| Ok(chunk.iter()?.peekable()))
.collect::<Result<Vec<_>, E>>()?;

loop {
let mut min_key = None;
let mut min_key_idx = None;
let mut found_ranout = false;

if canceled.load(std::sync::atomic::Ordering::Relaxed) {
break;
}

for (idx, iter) in chunk_iters.iter_mut().enumerate() {
match iter.peek() {
Some(Ok((key, _))) => {
if min_key.is_none()
|| key < min_key.as_ref().expect("min_key should have value")
{
min_key = Some(*key);
min_key_idx = Some(idx);
}
}
Some(Err(_)) => {
min_key_idx = Some(idx);
break;
}
None => {
found_ranout = true;
}
}
}

if let Some(min_key_idx) = min_key_idx {
match chunk_iters[min_key_idx].next() {
Some(Ok((key, value))) => {
add_fn((key, value))?;
}
Some(Err(e)) => {
return Err(e);
}
None => unreachable!(),
}
} else {
break;
}

if found_ranout {
// remove ran-out iterators
chunk_iters.retain_mut(|it| it.peek().is_some());
}
}

for path in tmp_file_paths {
if std::fs::remove_file(&path).is_err() {
warn!("Failed to remove file: {:?}", path);
}
}

match canceled.load(std::sync::atomic::Ordering::Relaxed) {
false => Ok(()),
true => Err(crate::Error::Canceled),
}
}
// pub fn merge_chunks_by_naive_picking<K, E>(
// canceled: Arc<AtomicBool>,
// chunks: Vec<FileChunk<K>>,
// mut add_fn: impl FnMut((K, Vec<u8>)) -> Result<(), E>,
// ) -> Result<(), E>
// where
// K: Ord + Pod + Copy + Send + Sync,
// {
// let tmp_file_paths = chunks
// .iter()
// .map(|chunk| chunk.path().to_owned())
// .collect::<Vec<_>>();
//
// let mut chunk_iters = chunks
// .into_iter()
// .map(|chunk| Ok(chunk.iter()?.peekable()))
// .collect::<Result<Vec<_>, E>>()?;
//
// loop {
// let mut min_key = None;
// let mut min_key_idx = None;
// let mut found_ranout = false;
//
// if canceled.load(std::sync::atomic::Ordering::Relaxed) {
// break;
// }
//
// for (idx, iter) in chunk_iters.iter_mut().enumerate() {
// match iter.peek() {
// Some(Ok((key, _))) => {
// if min_key.is_none()
// || key < min_key.as_ref().expect("min_key should have value")
// {
// min_key = Some(*key);
// min_key_idx = Some(idx);
// }
// }
// Some(Err(_)) => {
// min_key_idx = Some(idx);
// break;
// }
// None => {
// found_ranout = true;
// }
// }
// }
//
// if let Some(min_key_idx) = min_key_idx {
// match chunk_iters[min_key_idx].next() {
// Some(Ok((key, value))) => {
// add_fn((key, value))?;
// }
// Some(Err(e)) => {
// return Err(e);
// }
// None => unreachable!(),
// }
// } else {
// break;
// }
//
// if found_ranout {
// // remove ran-out iterators
// chunk_iters.retain_mut(|it| it.peek().is_some());
// }
// }
//
// for path in tmp_file_paths {
// if std::fs::remove_file(&path).is_err() {
// warn!("Failed to remove file: {:?}", path);
// }
// }
//
// match canceled.load(std::sync::atomic::Ordering::Relaxed) {
// false => Ok(()),
// true => Err(crate::Error::Canceled),
// }
// }

struct HeapItem<K: Ord> {
key: K,
Expand Down
4 changes: 2 additions & 2 deletions src/sorter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ fn start_merging_stage<K, E>(
let mut num_running_merges = 0;

let mut recv_select = Select::new();
recv_select.recv(&chunk_rx); // 0
recv_select.recv(&merged_rx); // 1
recv_select.recv(&chunk_rx); // select index=0
recv_select.recv(&merged_rx); // select index=1

loop {
let idx = recv_select.ready();
Expand Down
68 changes: 68 additions & 0 deletions tests/simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use kv_extsort::{Result, SortConfig};
use log::{debug, error};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{convert::Infallible, time::Instant};

#[test]
fn test_simple() -> Result<(), Infallible> {
env_logger::init();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ログの初期化が適切に行われていますが、テスト関数内でenv_logger::init();を呼び出すことは一般的ではありません。テストの前に一度だけ初期化することをお勧めします。


// Data source
let size = 200_000;
let make_iter = || {
let body_size = 2046;

let mut rng = SmallRng::from_entropy();
type T = (i32, Vec<u8>);
let source = (0..size).map(move |_| {
let u: i32 = rng.gen();
let s = vec![0; body_size];
(u, s)
});

let head_size = std::mem::size_of::<T>();
let total_size = size * (head_size + body_size);
debug!("Total size: {} MiB", total_size / 1024 / 1024);

source
};

let t = Instant::now();
let config = SortConfig::new()
.max_chunk_bytes(2 * 1024 * 1024)
.concurrency(num_cpus::get())
.merge_k(16);

// Setup cancellation
let canceled = config.get_cancel_flag();
ctrlc::set_handler(move || {
canceled.store(true, std::sync::atomic::Ordering::Relaxed);
})
.expect("Error setting Ctrl-C handler");

// Sort
let mut prev_key = None;
let mut count = 0;
for res in kv_extsort::sort(make_iter().map(Result::<_, Infallible>::Ok), config) {
match res {
Ok((_key, _value)) => {
// validate if sorted
count += 1;
if let Some(p) = prev_key {
assert!(p <= _key);
prev_key = Some(_key);
}
}
Err(e) => {
error!("{:?}", e);
break;
}
}
}

debug!("count: {count}");
assert!(count == size, "count: {} != size: {}", count, size);
ciscorn marked this conversation as resolved.
Show resolved Hide resolved
debug!("Time elapsed: {:?}", t.elapsed());

Ok(())
}