Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent map #88

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Concurrent map #88

wants to merge 2 commits into from

Conversation

Boog900
Copy link
Member

@Boog900 Boog900 commented Mar 5, 2024

This adds a concurrent hash map impl.

This concurrent map is optimized for:

  • multiple threads working on retrieving values concurrently.
  • the keys needed being known before the workers start

It is not a full concurrent map. It is pretty much a concurrent map builder.

Some background:

In the consensus code, we batch outputs need into one DB request so the DB can split the request up as it sees fit. This is probably the biggest reason why our RPC scanner is so quick, it will take this huge list of outputs needed and split them up between nodes getting the maximum from each node (5,000).

The way this currently works is when the shimmed db-rpc gets an output request it will deconstruct the hashmap into mutliplte hashmaps sending them to different nodes, await the response and combine the hashmap responses.

This isn't very efficient but it was good enough for the RPC scanner where the majority of the wait is on the actual RPC request.

This may be a problem for our DB though, which is what this Map impl tries to solve, no longer do we have to iterate, deconstruct and combine hashmaps.

Nothing here is final, but would be good to have thoughts @hinto-janai, @SyntheticBird45

I did think of using 2 DashMaps one for request one for return but I feel this custom solution would be quicker as we don't have to care about 99% of the things DashMap can do.

@Boog900
Copy link
Member Author

Boog900 commented Mar 5, 2024

I am not looking for reviews of this code (yet), but if you see some invalid logic then please say.

@hinto-janai
Copy link
Contributor

What part of Cuprate exactly calls this? Is it something like:

  1. Outside Cuprate sends large output request to DB
  2. DB uses ConcurrentMapBuilder to fetch outputs
  3. DB responds with BuiltMap

@Boog900
Copy link
Member Author

Boog900 commented Mar 6, 2024

Yeah that's a bit I am unsure about, I think the best way is for the caller to pass the IndexSet to the service, which will then create a builder. Then with that builder, create N request to put in the channel which would get picked up by different workers and when they all return the service will call try_create on the builder it has, returning the BuiltMap if successful.

If a request goes to the same worker that already dealt with it, the worker would use get_work which would return None/Err and the worker would just move on to another request.

@hinto-janai
Copy link
Contributor

Could we use rayon + par_iter().collect() here instead our own impl?

  1. Caller sends IndexSet to DB service
  2. DB thread picks up request, starts par_iter().collect() in rayon pool
  3. rayon returns result to DB thread
  4. DB sends response back to caller

The 1 DB thread acts as the single joiner, the rayon pool does the actual work.

              |-------------------|
              |  Outside Cuprate  |
              |-------------------|
                        ^
                        |
                   DB Service
                        |
       /---------------/
      v
|-----------------------------------------------|
| DbThread1 | DbThread2 | DbThread3 | DbThread4 |
|-----------------------------------------------|
    ^   |
    |   v
|-----------------------------------------------|
|           global rayon thread pool            |
|-----------------------------------------------|

From my perspective, the DB thread-pool in service isn't necessarily supposed to be doing work, they exist essentially to route things around. They could all use the global rayon thread-pool as the work "backend" and take advantage of par_iter and such.

@Boog900
Copy link
Member Author

Boog900 commented Mar 6, 2024

I don't think this expands the scope of the service too much, it is still routing the request to the threads to do work.

I don't think we can use rayon, as we would be doing DB operations (getting outputs) in the par_iter.

We could use a normal thread pool instead of rayon, and have each thread use the MapBuilder but then we are spawning more threads when we already have DB threads to do work.

@Boog900
Copy link
Member Author

Boog900 commented Mar 6, 2024

I would be good to have a way to send a request to one or all DB threads though, instead of needing to send the same request multiple times and hope it gets picked up by different threads.

@hinto-janai
Copy link
Contributor

I don't think this expands the scope of the service too much

Some thread has to split up and distribute the work. If not the caller by sending multiple requests, then wouldn't it be the 1 DB thread who received the initial request to all DB threads?

Setting up channel+select wiring would work but I think implementing 1 DB thread receives request + rayon would be simpler.

I don't think we can use rayon, as we would be doing DB operations (getting outputs) in the par_iter

rayon::scope() + pass in &ConcreteEnv? Or a custom rayon::ThreadPool that has Arc<ConcreteEnv> that drops alongside the DB threadpool.

For the question: "who is responsible for DB parallelization?"

  • The caller (splits up the work, sends multiple requests, receives multiples responses)
  • The DB threadpool (1 DB thread receives request, splits work across the other threads)
  • rayon (1 DB thread receives request, splits work using rayon)

I don't think it should ever be the caller, as all requests (even 1) should saturate the DB as much as possible.

@Boog900
Copy link
Member Author

Boog900 commented Mar 6, 2024

I had a bit more of a think.

The reason I didn't want to use rayon is because one of the reasons for doing what we are doing with the DB is to remove all DB operations from the async runtime, so it feels weird to then put it in a rayon runtime, which should also not have blocking operations.

But if we were to use a second rayon pool just for the DB we would not starve other parts of Cuprate or the rest of the DB, as if one part of the DB is blocking if we were to move on to another request that is likely to block as well so we don't lose anything.

We can also take using rayon a step further by removing our reader threads completely, just using the custom rayon thread pool to execute requests.

Instead of the service sending the request down a channel with a oneshot return it would call spawn on the runtime with the request and oneshot.

This would allow using rayon's par iters for all DB requests, without spawning excess threads

@hinto-janai
Copy link
Contributor

if one part of the DB is blocking if we were to move on to another request that is likely to block as well

What about a situation where there's 1 large request, then shortly after 500 small requests? Shouldn't those 500 requests be responded to (dictated by rayon's scheduler I guess) instead of being stalled on the 1?

We can also take using rayon a step further by removing our reader threads completely, just using the custom rayon thread pool to execute requests

A single "manager" thread (1 DB reader thread) splitting work with a shared rayon threadpool is what I would do on first glance, but if rayon threads can use their own threadpool then sure (what is rayon's behavior with nested par_*()? bad perf? deadlocks?).

Instead of the service sending the request down a channel with a oneshot return it would call spawn on the runtime with the request and oneshot

Can you write code or a graph of what this would look like? How does the work get split after spawn()? How does it get joined and sent back?

@Boog900
Copy link
Member Author

Boog900 commented Mar 7, 2024

What about a situation where there's 1 large request, then shortly after 500 small requests? Shouldn't those 500 requests be responded to (dictated by rayon's scheduler I guess) instead of being stalled on the 1?

This is true of our current reader threads as well a big request may take a thread out, we still have other rayon threads that can be doing work.

A single "manager" thread (1 DB reader thread) splitting work with a shared rayon threadpool is what I would do on first glance, but if rayon threads can use their own threadpool then sure (what is rayon's behavior with nested par_*()? bad perf? deadlocks?).

No it's perfectly fine to do nested parallelism within rayon, the rayon thread which made the call will also work on the par_iter so no deadlocks

Can you write code or a graph of what this would look like? How does the work get split after spawn()? How does it get joined and sent back?

Here is some rough code:

// the DB service's call function
fn call(&self, req: DBReq) -> Future {
    let (tx, rx) = oneshot::new();
    // this will put a job in the pool without blocking the current thread.
    self.pool.spawn(handle_req(tx, req)):
    
    rx

}

fn handle_req(tx: oneshot::Sender, req: DBReq) {
     match req {
         DBReq::Outputs(needed) => tx.send(get_outs(needed))
     }
}

fn get_outs(needed: OutputIDs) -> Outputs {
     // the current rayon thread will make progress on this, this will not deadlock.
    needed.par_iter(|id| get_output(id)).collect()
}

We would need to add some sort of back pressure so the database does not get rundown with tasks like with the current channels but that will be easy to do with a semaphore so we can ignore for now.

@hinto-janai
Copy link
Contributor

No it's perfectly fine to do nested parallelism within rayon, the rayon thread which made the call will also work on the par_iter so no deadlocks

Swapping our current DB reader thread-pool with rayon seems good then. I don't think our locks will deadlock either since only read guards are taken.

Here is some rough code

Yeah this looks good too. Is the question now if get_output() should use this PR's concurrent map or do a bunch of Vec flattens or similar? I think this PR would be faster although I'm not a fan of introducing unsafe and maintaining it. Maybe some perf tests could settle the difference?

@SyntheticBird45
Copy link
Member

I was thinking about the similarities between tokio blocking threadpool and rayon one. Someone on reddit described the difference between the two. Despite tokio being supposedly more suitable for our purpose, I think rayon api is much easier and straightforward to use and test. So yeah I agree with the idea of using rayon threadpool

@SyntheticBird45
Copy link
Member

So, unless I'm stupid, the proposed concurrent hashmap is optimized for the multiple db threads to concurrently write results into it ?

@Boog900
Copy link
Member Author

Boog900 commented Mar 7, 2024

Yeah this looks good too. Is the question now if get_output() should use this PR's concurrent map or do a bunch of Vec flattens or similar? I think this PR would be faster although I'm not a fan of introducing unsafe and maintaining it. Maybe some perf tests could settle the difference?

Yep we need benchmarks from the DB, I don't think there will be too much difference though, rayon is very efficient.

So, unless I'm stupid, the proposed concurrent hashmap is optimized for the multiple db threads to concurrently write results into it ?

pretty much

@SyntheticBird45
Copy link
Member

SyntheticBird45 commented Mar 7, 2024

Yep we need benchmarks from the DB, I don't think there will be too much difference though, rayon is very efficient.

If this PR isn't urgent, we could write a benchmark code with criterion and try out with an rng for data and a random thread sleep to simulate blocking file op.

@Boog900
Copy link
Member Author

Boog900 commented Mar 7, 2024

This PR isn't urgent, yeah we could

transmuting the Vec is not safe as Rust could make layout optimisations for `Vec<T>`` that it can't for `Vec<UnsafeCell<MaybeUninit<V>>>``
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants