Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What would user's code would look like the for simple use-cases? (After dropping async, and instead only using channels and Rayon)? #104

Open
Tracked by #93
JackKelly opened this issue Mar 15, 2024 · 9 comments
Assignees
Labels
question Further information is requested

Comments

@JackKelly
Copy link
Owner

JackKelly commented Mar 15, 2024

For reference: Here's a quick code sketch I wrote, back when I was planning to use Tokio with Rayon.

Use cases

1) File conversion & re-chunking

  1. Send LSIO a list of millions of chunks to read.
  2. Have a Rayon threadpool which decompresses those chunks as soon as they arrive from disk. Tell the IO code to re-use IO buffers as soon as the data has been decompressed from an IO buffer.
  3. Have a thread which "reduces" and creates output chunks. Let's say that each output chunk contains 10 input chunks.
  4. Rayon threadpool compresses those chunks.
  5. And writes those compressed chunks back into the io_uring channel for writing to disk.

2) ML training: Load random crops of Zarr data from disk.

3) Vincent's use-case: Loading timeseries (added 2024-04-04)

Imagine the chunks are arranged like this:

0.0   0.1   0.2   0.3
1.0   1.1   1.2   1.3
2.0   2.1   2.2   2.3
3.0   3.1   3.2   3.3

Each row is a timeseries. So, for example, chunks 0.0, 0.1, 0.2, 0.3 have to be given to the user in order.

But the decompression could happen out-of-order.

Perhaps the solution is for the user to group each row. And then, within a row, decompress chunks in any order. And then re-order the decompressed chunks before delivering to the user?

@JackKelly
Copy link
Owner Author

JackKelly commented Mar 15, 2024

UPDATE: This attempt is pretty dumb 🙂. See the comment below for a new, improved approach!


Here's an attempt at making use-case 1 work:

Inputs:

  • src_filenames: Receiver<Vec<CString>>: The million files to load, divided into 100,000 Vec<CString>s, where each Vec<CString> is a single group. Group n will finish loading before loading group n+1.
const MAX_N_BUFFERS: usize = 1024;
let mut uring_local = IoUringLocal::new(MAX_N_BUFFERS);

// Start loading the files in a separate threadpool. Completed buffers will
// appear in `rx_of_filled_buffers`. We apply back-pressure by either letting
// `rx_of_filled_buffers` fill to `MAX_N_BUFFERS`, or by not telling `uring_local`
// that it can re-use buffers. This will not block. This will start loading immediately.
let rx_of_filled_buffers = uring_local.get(src_filenames);

// Decompress buffers:
// This will run `decompression_function` on each chunk using separate Rayon threadpool:
let decompressor = ChannelProcessor::new();
let rx_of_decompressed_buffers = decompressor.map(rx_of_filled_buffers, decompression_function);

// Tell io_uring to re-use IO buffers as soon as decompression has finished for each chunk:
// This function passes-through each completed chunk:
rx_of_decompressed_buffers = uring_local.reuse_buffers(rx_of_decompressed_buffers);

// Reduce:
let rx_of_reduced_buffers = reduce(rx_of_decompressed_buffers, 10);

// Compress:
let rx_of_compressed_output_buffers = compressor.map(rx_of_reduced_buffers, compression_function);

// Write. Behind the scenes, this will be added to the same uring threadpool's list of operations, I guess?
uring_local.put(rx_of_compressed_output_buffers);

@JackKelly
Copy link
Owner Author

JackKelly commented Mar 15, 2024

Writing the code in the comment above give me an idea that I've summarised in issue #106

@JackKelly
Copy link
Owner Author

JackKelly commented Mar 15, 2024

UPDATE 2024-04-03: The code below is out-of-date! Please see this code sketch for the latest ideas: https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/src/new_design_march_2024.rs


Let me try again, now that I've figured out that we can just use iterators (see this code sketch)!

Structs

struct ByteRange<M> {
    byte_range: Range<isize>,

    // metadata is used to identify this byte range.
    // For example, in Zarr, this would be used to identify the
    // location at which this chunk appears in the merged array.
    metadata: M,
}

enum OperationKind {
    GetRanges,
    PutRanges,
}

struct Operation<M> {
    operation_kind: OperationKind,
    buffers: Option<Vec<AlignedBuffer>>, // For PutRanges
    byte_ranges: Option<Vec<ByteRange<M>>>,  // For GetRanges and PutRanges
    filename: CString,  // For GetRanges and PutRanges
}

impl<M> Operation<M> {
    /// If the user submits a GetRanges operation with an invalid filename then
    /// the user will receive a single Err(std::io::ErrorKind::NotFound) with context
    /// that describes the filename that failed.
    /// If a subset of the `byte_ranges` results in an error (e.g. reading beyond
    /// end of the file) then the user will receive a mixture of `Ok(Output::Buffer)`
    /// and `Err`, where the `Err` will include context such as the filename and byte_range.
    fn get_ranges(filename, byte_ranges) -> Self<M> {
    }

    fn put_ranges(filename, byte_ranges, buffers) -> Self<M> {
        // TODO: Maybe we also need a `slices: &[u8]` field, which gives one slice
        // per `byte_range`, whilst also having a `buffers` field to own the `AlignedBuffer`.
    }
}

struct OpGroup<M> {
    operations: Receiver<Operation<M>>,

    // Metadata for the whole group. Such as the filename of the merged output.
    metadata: M,
}

struct Output<M> {
    // Each `byte_range` within an `Operation::GetRanges` returns a `Buffer`.
    operation_kind: OperationKind,
    buffer: Option<AlignedBuffer>,
    byte_range: Option<ByteRange<M>>,
}

struct OutputGroup<GROUPMETA, OUTPUTMETA> {
    // We use a `Receiver` so we can process the next `Buffer` as soon as the producing
    // thread finishes each `Buffer`:
    // 
    outputs: Receiver<Result<Output<OUTPUTMETA>>>,

    // Metadata for the group (e.g. the output filename).
    metadata: <GROUPMETA>,
}

User code

const MAX_N_BUFFERS: usize = 1024;
let mut uring_local = IoUringLocal::new(MAX_N_BUFFERS);

let mut submission_queue: Sender<OpGroup> = uring_local.submission();

// Define operations to get a bunch of files:
let get_group_0 = OpGroup::new()
    .extend(!vec[
        Operation::get_ranges("foo.0.0", 0..-1),
        Operation::get_ranges("foo.0.1", 0..-1),
    ])
    .metadata(OutputFilename("foo_0"));

// Define operations to get a bunch of files:
let get_group_1 = OpGroup::new()
    .extend(!vec[
        Operation::get_ranges("foo.1.0", 0..-1),
        Operation::get_ranges("foo.1.1", 0..-1),
    ])
    .metadata(OutputFilename("foo_1"));

// Start loading the files in a separate threadpool:
submission_queue.send(get_group_0).unwrap();
submission_queue.send(get_group_1).unwrap();

// uring_local will load all operations from `get_group_0`. And then from `get_group_1`.
// Now we can wait on the completed items.

let completion_queue: Receiver<OutputGroup> = uring_local.completion();

let mut buffer_recycling_queue = uring_local.buffer_recycling_queue();

completion_queue.into_iter().par_bridge().for_each(|output_group: OutputGroup| {
    let out = output_group.outputs.into_iter().par_bridge()
        .map(|output| {
            assert_eq!(output.operation_kind, GetRanges);
            let decompressed = decompress(&output.buffer.unwrap());
            buffer_recycling_queue.send(output.buffer.take()).unwrap();
            decompressed
        })
        .reduce(reduce_func);
    let out = compress(out);

    // Write `out` to disk:
    let put_op = Operation::put_ranges(output_group.metadata.output_filename, 0..-1, out);
    let op_group = OpGroup::new().append(put_op);
    submission_queue.send(op_group);  // Does not block.
});

@JackKelly
Copy link
Owner Author

JackKelly commented Mar 15, 2024

Actually, I don't think it's acceptable for the output channel to be a Receiver<Vec<Chunk>>. Because we want to start processing chunks as soon as they're ready. Maybe a channel of channels?! Or a channel of crossbeam queues?

But we probably do want the input channel to be a Sender<Vec<IoOperation>>. Or - perhaps better - a Sender<IoOpGroup> (so we can attach metadata for the whole group).

UPDATE: I've updated the code in the comment above

@JackKelly
Copy link
Owner Author

UPDATE 2024-04-03: The code above is out-of-date! Please see this code sketch for the latest ideas: https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/src/new_design_march_2024.rs

@JackKelly
Copy link
Owner Author

JackKelly commented Apr 3, 2024

I need to think more about what the user code would look like (to satisfy the use-cases above) with the new code sketch. Some questions that spring to mind:

  • Can we use something like a Sender<IoOpGroup> to send groups to the IO threadpool. So we can attach group-wide metadata (such as the final output filename for that whole group).
  • Or, can we use a flat structure, and every Chunk struct would have a group: Arc<GROUP>? Operations for a single group would be submitted together. The GROUP object would contain group metadata (such as the output filename).

@JackKelly
Copy link
Owner Author

can we use a flat structure

We can't have a flat list, where list items define the end of groups, because Rayon processes things out-of-order. So the following code never seems to print 0, 1, 2. Instead it print 0, 1. Or even just 0.

use rayon::prelude::*;

#[derive(Debug)]
enum Operation {
    Get(u8),
    EndOfGroup(u8),
}

fn main() {
    let ops = vec![
        Operation::Get(0),
        Operation::Get(1),
        Operation::Get(2),
        Operation::EndOfGroup(0),
        Operation::Get(10),
        Operation::Get(11),
        Operation::EndOfGroup(1),
    ];

    // Oops: Rayon may process these items out-of-order. So we might start loading group 1 before
    // we hit the EndOfGroup(0).
    ops.into_par_iter()
        .map(|op| {
            if matches!(op, Operation::Get { .. }) {
                Some(op)
            } else {
                None
            }
        })
        .while_some()
        .for_each(|op| println!("{:?}", op));
}

So I think we may have to use a channel of channels for the completion queue!

@JackKelly
Copy link
Owner Author

I should test the channel of channels idea in my Rust playground

@JackKelly
Copy link
Owner Author

Yup, using channels of channels works for both submission and completion!

use rayon::prelude::*;

#[derive(Debug)]
enum Operation {
    Get(u8),
}

fn main() {
    let (completion_tx, completion_rx) = crossbeam::channel::bounded(4);

    {
        let (submission_tx, submission_rx) = crossbeam::channel::bounded(4);

        // Send the first group of operations:
        let (inner_submission_tx_0, inner_submission_rx_0) = crossbeam::channel::bounded(4);
        vec![
            Operation::Get(1),
            Operation::Get(2),
            Operation::Get(3),
            Operation::Get(4),
        ]
        .into_iter()
        .for_each(|op| inner_submission_tx_0.send(op).unwrap());
        submission_tx.send(inner_submission_rx_0).unwrap();
        drop(inner_submission_tx_0);

        // Send the second group of operations:
        let (inner_submission_tx_1, inner_submission_rx_1) = crossbeam::channel::bounded(4);
        vec![
            Operation::Get(6),
            Operation::Get(7),
            Operation::Get(8),
            Operation::Get(9),
        ]
        .into_iter()
        .for_each(|op| inner_submission_tx_1.send(op).unwrap());
        submission_tx.send(inner_submission_rx_1).unwrap();
        drop(inner_submission_tx_1);

        // Hang up the submission_tx, otherwise we'll never finish!
        drop(submission_tx);

        // "Process" the submission queue, and send data to the completion queue:
        submission_rx.into_iter().par_bridge().for_each(|inner| {
            let (inner_completion_tx, inner_completion_rx) =
                crossbeam::channel::bounded::<Operation>(4);
            inner
                .into_iter()
                .par_bridge()
                .for_each(|Operation::Get(x)| {
                    inner_completion_tx.send(Operation::Get(x * 10)).unwrap()
                });
            completion_tx.send(inner_completion_rx).unwrap();
        });
    }

    drop(completion_tx);

    completion_rx.into_iter().for_each(|inner| {
        println!("GROUP:");
        inner.into_iter().for_each(|op| println!("{op:?}"));
    });
}

From: https://github.com/JackKelly/rust-playground/blob/493da54d3f5bd2373501374f7cc71d70c7c8de4a/groups-within-iterators/src/main.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
Status: In Progress
Development

No branches or pull requests

1 participant