Skip to content

Commit

Permalink
Feature: Raft::client_write_ff() ff for fire-and-forget
Browse files Browse the repository at this point in the history
`Raft<C>::client_write_ff() -> C::Responder::Receiver` submit a client
request to Raft to update the state machine, returns an application
defined response receiver `Responder::Receiver` to receive the response.

`_ff` means fire and forget.

It is same as [`Raft::client_write`] but does not wait for the response.
When using this method, it is the application's responsibility for
defining mechanism building and consuming the `Responder::Receiver`.

- Part of databendlabs#1068
  • Loading branch information
drmingdrmer committed Apr 8, 2024
1 parent fe565b4 commit e594261
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
18 changes: 16 additions & 2 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,15 +689,29 @@ where C: RaftTypeConfig
ResponderReceiverOf<C>: Future<Output = Result<WriteResult<C>, E>>,
E: Error + OptionalSend,
{
let (app_data, tx, rx) = ResponderOf::<C>::from_app_data(app_data);
let rx = self.client_write_ff(app_data).await?;

self.inner.send_msg(RaftMsg::ClientWriteRequest { app_data, tx }).await?;
let res: WriteResult<C> = self.inner.recv_msg(rx).await?;

let client_write_response = res.map_err(|e| RaftError::APIError(e))?;
Ok(client_write_response)
}

/// Submit a mutating client request to Raft to update the state machine, returns an application
/// defined response receiver [`Responder::Receiver`].
///
/// `_ff` means fire and forget.
///
/// It is same as [`Raft::client_write`] but does not wait for the response.
#[tracing::instrument(level = "debug", skip(self, app_data))]
pub async fn client_write_ff(&self, app_data: C::D) -> Result<ResponderReceiverOf<C>, Fatal<C>> {
let (app_data, tx, rx) = ResponderOf::<C>::from_app_data(app_data);

self.inner.send_msg(RaftMsg::ClientWriteRequest { app_data, tx }).await?;

Ok(rx)
}

/// Return `true` if this node is already initialized and can not be initialized again with
/// [`Raft::initialize`]
pub async fn is_initialized(&self) -> Result<bool, Fatal<C>> {
Expand Down
40 changes: 36 additions & 4 deletions tests/tests/client_api/t10_client_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ use std::sync::Arc;
use anyhow::Result;
use futures::prelude::*;
use maplit::btreeset;
use openraft::raft::ClientWriteResponse;
use openraft::CommittedLeaderId;
use openraft::Config;
use openraft::LogId;
use openraft::SnapshotPolicy;
use openraft_memstore::ClientRequest;
use openraft_memstore::IntoMemClientRequest;
use openraft_memstore::TypeConfig;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Client write tests.
///
/// What does this test do?
///
/// - create a stable 3-node cluster.
/// - write a lot of data to it.
/// - assert that the cluster stayed stable and has all of the expected data.
Expand Down Expand Up @@ -63,3 +63,35 @@ async fn client_writes() -> Result<()> {

Ok(())
}

/// Test Raft::client_write_ff,
///
/// Manually receive the client-write response via the returned `Responder::Receiver`
#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn client_write_ff() -> Result<()> {
let config = Arc::new(
Config {
enable_tick: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;
let _ = log_index;

let n0 = router.get_raft_handle(&0)?;

let resp_rx = n0.client_write_ff(ClientRequest::make_request("foo", 2)).await?;
let got: ClientWriteResponse<TypeConfig> = resp_rx.await??;
assert_eq!(None, got.response().0.as_deref());

let resp_rx = n0.client_write_ff(ClientRequest::make_request("foo", 3)).await?;
let got: ClientWriteResponse<TypeConfig> = resp_rx.await??;
assert_eq!(Some("request-2"), got.response().0.as_deref());

Ok(())
}

0 comments on commit e594261

Please sign in to comment.