Skip to content

Commit

Permalink
feat(rs-transport): introduce {Serve,Invoke}Ext
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jul 10, 2024
1 parent ed1e162 commit b4de852
Show file tree
Hide file tree
Showing 14 changed files with 492 additions and 413 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ wasmtime = { version = "22", default-features = false }
wasmtime-wasi = { version = "22", default-features = false }
wit-bindgen = { version = "0.27", default-features = false }
wit-bindgen-core = { version = "0.27", default-features = false }
wit-bindgen-wrpc = { version = "0.4.5", default-features = false, path = "./crates/wit-bindgen" }
wit-bindgen-wrpc = { version = "0.4.6", default-features = false, path = "./crates/wit-bindgen" }
wit-bindgen-wrpc-go = { version = "0.2", default-features = false, path = "./crates/wit-bindgen-go" }
wit-bindgen-wrpc-rust = { version = "0.4.5", default-features = false, path = "./crates/wit-bindgen-rust" }
wit-bindgen-wrpc-rust-macro = { version = "0.4.5", default-features = false, path = "./crates/wit-bindgen-rust-macro" }
wit-bindgen-wrpc-rust = { version = "0.4.6", default-features = false, path = "./crates/wit-bindgen-rust" }
wit-bindgen-wrpc-rust-macro = { version = "0.4.6", default-features = false, path = "./crates/wit-bindgen-rust-macro" }
wit-component = { version = "0.212", default-features = false }
wit-parser = { version = "0.212", default-features = false }
wrpc-cli = { version = "0.2", path = "./crates/cli", default-features = false }
wrpc-introspect = { version = "0.1", default-features = false, path = "./crates/introspect" }
wrpc-runtime-wasmtime = { version = "0.19", path = "./crates/runtime-wasmtime", default-features = false }
wrpc-transport = { version = "0.26.3", path = "./crates/transport", default-features = false }
wrpc-transport-nats = { version = "0.22", path = "./crates/transport-nats", default-features = false }
wrpc-transport = { version = "0.26.4", path = "./crates/transport", default-features = false }
wrpc-transport-nats = { version = "0.22.1", path = "./crates/transport-nats", default-features = false }
wrpc-transport-quic = { version = "0.1", path = "./crates/transport-quic", default-features = false }
wrpc-wasmtime-nats-cli = { version = "0.3", path = "./crates/wasmtime-nats-cli", default-features = false }
2 changes: 1 addition & 1 deletion crates/transport-nats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-transport-nats"
version = "0.22.0"
version = "0.22.1"
description = "wRPC NATS transport"

authors.workspace = true
Expand Down
122 changes: 60 additions & 62 deletions crates/transport-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,63 +921,6 @@ impl wrpc_transport::Invoke for Client {
}
}

#[instrument(level = "trace", skip_all)]
async fn serve_connection(
nats: Arc<async_nats::Client>,
Message {
reply: tx,
payload,
headers,
..
}: Message,
paths: &[impl AsRef<[Option<usize>]>],
) -> anyhow::Result<(Option<HeaderMap>, SubjectWriter, Reader)> {
let tx = tx.context("peer did not specify a reply subject")?;
let rx = nats.new_inbox();
let param_rx = Subject::from(param_subject(&rx));
let (param_rx, nested) = try_join!(
async {
trace!(
subject = param_rx.as_str(),
"subscribing on parameter subject"
);
nats.subscribe(param_rx.clone())
.await
.context("failed to subscribe on parameter subject")
},
try_join_all(paths.iter().map(|path| async {
let subject = subscribe_path(&param_rx, path.as_ref());
trace!(?subject, "subscribing on nested parameter subject");
nats.subscribe(Subject::from(subject))
.await
.context("failed to subscribe on nested parameter subject")
}))
)?;
let nested: SubscriberTree = zip(paths.iter(), nested).collect();
ensure!(
paths.is_empty() == nested.is_empty(),
"failed to construct subscription tree"
);
trace!("publishing handshake response");
nats.publish_with_reply(tx.clone(), rx, Bytes::default())
.await
.context("failed to publish handshake accept")?;
let result_tx = Subject::from(result_subject(&tx));
Ok((
headers,
SubjectWriter::new(
Arc::clone(&nats),
result_tx.clone(),
nats.publish_sink(result_tx),
),
Reader {
buffer: payload,
incoming: param_rx,
nested: Arc::new(std::sync::Mutex::new(nested)),
},
))
}

impl wrpc_transport::Serve for Client {
type Context = Option<HeaderMap>;
type Outgoing = SubjectWriter;
Expand All @@ -1004,10 +947,65 @@ impl wrpc_transport::Serve for Client {
};
let paths = paths.into();
let nats = Arc::clone(&self.nats);
Ok(sub.then(move |msg| {
let nats = Arc::clone(&nats);
let paths = Arc::clone(&paths);
async move { serve_connection(nats, msg, &paths).await }
}))
Ok(sub.then(
// NOTE: instrumenting this function causes a stack overflow
move |Message {
reply: tx,
payload,
headers,
..
}: Message| {
{
let nats = Arc::clone(&nats);
let paths = Arc::clone(&paths);
async move {
let tx = tx.context("peer did not specify a reply subject")?;
let rx = nats.new_inbox();
let param_rx = Subject::from(param_subject(&rx));
let (param_rx, nested) = try_join!(
async {
trace!(
subject = param_rx.as_str(),
"subscribing on parameter subject"
);
nats.subscribe(param_rx.clone())
.await
.context("failed to subscribe on parameter subject")
},
try_join_all(paths.iter().map(|path| async {
let subject = subscribe_path(&param_rx, path.as_ref());
trace!(?subject, "subscribing on nested parameter subject");
nats.subscribe(Subject::from(subject))
.await
.context("failed to subscribe on nested parameter subject")
}))
)?;
let nested: SubscriberTree = zip(paths.iter(), nested).collect();
ensure!(
paths.is_empty() == nested.is_empty(),
"failed to construct subscription tree"
);
trace!("publishing handshake response");
nats.publish_with_reply(tx.clone(), rx, Bytes::default())
.await
.context("failed to publish handshake accept")?;
let result_tx = Subject::from(result_subject(&tx));
Ok((
headers,
SubjectWriter::new(
Arc::clone(&nats),
result_tx.clone(),
nats.publish_sink(result_tx),
),
Reader {
buffer: payload,
incoming: param_rx,
nested: Arc::new(std::sync::Mutex::new(nested)),
},
))
}
}
},
))
}
}
4 changes: 2 additions & 2 deletions crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-transport"
version = "0.26.3"
version = "0.26.4"
description = "wRPC core transport functionality"

authors.workspace = true
Expand All @@ -20,7 +20,7 @@ io-std = ["tokio/io-std"]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true }
futures = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["macros", "rt"] }
tokio = { workspace = true, features = ["macros", "rt", "time"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true, features = ["codec", "io"] }
tracing = { workspace = true, features = ["attributes"] }
Expand Down
Loading

0 comments on commit b4de852

Please sign in to comment.