Skip to content

Commit

Permalink
feat(rs-nats): switch to new PublishMessage sink
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Aug 20, 2024
1 parent 173426e commit ca1066a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 48 deletions.
25 changes: 12 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ wrpc-cli = { workspace = true }

[workspace.dependencies]
anyhow = { version = "1", default-features = false }
async-nats = { package = "async-nats-wrpc", version = "0.35.1", default-features = false }
async-nats = { git = "https://github.com/rvolosatovs/nats.rs", branch = "feat/command-sender", version = "0.35.1", default-features = false }
bitflags = { version = "2", default-features = false }
bytes = { version = "1", default-features = false }
clap = { version = "4", default-features = false }
Expand Down
52 changes: 18 additions & 34 deletions crates/transport-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use core::{mem, str};
use std::sync::Arc;

use anyhow::{anyhow, ensure, Context as _};
use async_nats::client::Publisher;
use async_nats::{HeaderMap, Message, ServerInfo, StatusCode, Subject, Subscriber};
use async_nats::{HeaderMap, Message, PublishMessage, ServerInfo, StatusCode, Subject, Subscriber};
use bytes::{Buf as _, Bytes};
use futures::future::try_join_all;
use futures::sink::SinkExt as _;
Expand Down Expand Up @@ -379,30 +378,23 @@ impl AsyncRead for Reader {

#[derive(Clone, Debug)]
pub struct SubjectWriter {
nats: Arc<async_nats::Client>,
nats: async_nats::Client,
tx: Subject,
publisher: Publisher,
}

impl SubjectWriter {
fn new(nats: Arc<async_nats::Client>, tx: Subject, publisher: Publisher) -> Self {
Self {
nats,
tx,
publisher,
}
fn new(nats: async_nats::Client, tx: Subject) -> Self {
Self { nats, tx }
}
}

impl wrpc_transport::Index<Self> for SubjectWriter {
#[instrument(level = "trace", skip(self))]
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
let tx = Subject::from(index_path(self.tx.as_str(), path));
let publisher = self.nats.publish_sink(tx.clone());
Ok(Self {
nats: Arc::clone(&self.nats),
nats: self.nats.clone(),
tx,
publisher,
})
}
}
Expand All @@ -415,7 +407,7 @@ impl AsyncWrite for SubjectWriter {
mut buf: &[u8],
) -> Poll<std::io::Result<usize>> {
trace!("polling for readiness");
match self.publisher.poll_ready_unpin(cx) {
match self.nats.poll_ready_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => {
return Poll::Ready(Err(std::io::Error::new(
Expand All @@ -433,7 +425,13 @@ impl AsyncWrite for SubjectWriter {
(buf, _) = buf.split_at(max_payload);
}
trace!("starting send");
match self.publisher.start_send_unpin(Bytes::copy_from_slice(buf)) {
let subject = self.tx.clone();
match self.nats.start_send_unpin(PublishMessage {
subject,
payload: Bytes::copy_from_slice(buf),
reply: None,
headers: None,
}) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(err) => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
Expand All @@ -445,7 +443,7 @@ impl AsyncWrite for SubjectWriter {
#[instrument(level = "trace", skip_all, ret, fields(subject = self.tx.as_str()))]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
trace!("flushing");
self.publisher
self.nats
.poll_flush_unpin(cx)
.map_err(|_| std::io::ErrorKind::BrokenPipe.into())
}
Expand All @@ -454,10 +452,7 @@ impl AsyncWrite for SubjectWriter {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
trace!("writing empty buffer to shut down stream");
ready!(self.as_mut().poll_write(cx, &[]))?;
trace!("closing");
self.publisher
.poll_close_unpin(cx)
.map_err(|_| std::io::ErrorKind::BrokenPipe.into())
Poll::Ready(Ok(()))
}
}

Expand Down Expand Up @@ -533,9 +528,7 @@ impl RootParamWriter {
else {
return Poll::Ready(Err(corrupted_memory_error()));
};
let param_tx = Subject::from(param_subject(&tx));
let param_pub = nats.publish_sink(param_tx.clone());
let tx = SubjectWriter::new(nats, param_tx, param_pub);
let tx = SubjectWriter::new(nats, Subject::from(param_subject(&tx)));
let indexed = indexed.into_inner().map_err(|err| {
std::io::Error::new(std::io::ErrorKind::Other, err.to_string())
})?;
Expand Down Expand Up @@ -906,11 +899,7 @@ impl wrpc_transport::Invoke for Client {
.context("failed to send handshake")?;
Ok((
ParamWriter::Root(RootParamWriter::new(
SubjectWriter::new(
Arc::clone(&self.nats),
param_tx.clone(),
self.nats.publish_sink(param_tx),
),
SubjectWriter::new((*self.nats).clone(), param_tx),
handshake_rx,
params,
)),
Expand Down Expand Up @@ -991,14 +980,9 @@ impl wrpc_transport::Serve for Client {
nats.publish_with_reply(tx.clone(), rx, Bytes::default())
.await
.context("failed to publish handshake accept")?;
let result_tx = Subject::from(result_subject(&tx));
Ok((
headers,
SubjectWriter::new(
Arc::clone(&nats),
result_tx.clone(),
nats.publish_sink(result_tx),
),
SubjectWriter::new((*nats).clone(), Subject::from(result_subject(&tx))),
Reader {
buffer: payload,
incoming: param_rx,
Expand Down

0 comments on commit ca1066a

Please sign in to comment.