Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement gRPC log stream #8730

Merged
merged 38 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9ae58ca
Implement message proxy client and sink wrapper
jprochazk Jan 16, 2025
539248d
client does not exist in wasm
jprochazk Jan 16, 2025
ee1ecf9
unused dependencies
jprochazk Jan 16, 2025
8041aa4
formatting
jprochazk Jan 16, 2025
fc05632
friendly error when `connect_grpc` is unavailable
jprochazk Jan 17, 2025
d6ba393
add doc comments
jprochazk Jan 17, 2025
83a023f
python fmt
jprochazk Jan 17, 2025
c44ce18
Merge branch 'main' into jan/grpc-log-sink
jprochazk Jan 17, 2025
e5ca91e
Merge branch 'main' into jan/grpc-log-sink
jprochazk Jan 17, 2025
e4ef1af
Merge branch 'main' into jan/grpc-log-sink
jprochazk Jan 17, 2025
1bd445a
Merge branch 'main' into jan/grpc-log-sink
jprochazk Jan 19, 2025
cc6e789
Implement gRPC log stream
jprochazk Jan 19, 2025
bb67740
fix wasm build
jprochazk Jan 19, 2025
3e15121
fmt
jprochazk Jan 19, 2025
0cc63ca
Merge branch 'main' into jan/grpc-log-sink
jprochazk Jan 21, 2025
048fe55
Merge branch 'jan/grpc-log-sink' into jan/grpc-log-stream
jprochazk Jan 21, 2025
20527b0
remove unneeded pin
jprochazk Jan 22, 2025
b5ac6a5
update comment
jprochazk Jan 22, 2025
5dcb056
remove tokio dependency on web
jprochazk Jan 22, 2025
52e0e8b
add more todos
jprochazk Jan 22, 2025
510fca0
logging for re_grpc_server binary
jprochazk Jan 22, 2025
a45f594
more todos
jprochazk Jan 22, 2025
7b023f7
show messageproxy source in top panel
jprochazk Jan 22, 2025
ff3aec2
more todos
jprochazk Jan 22, 2025
c4403a9
Merge branch 'main' into jan/grpc-log-stream
jprochazk Jan 22, 2025
0c28273
fix wasm builds
jprochazk Jan 22, 2025
a82ed71
add envoy conf
jprochazk Jan 22, 2025
4527150
remove envoy timeout
jprochazk Jan 22, 2025
7135f70
update readme
jprochazk Jan 22, 2025
a32da77
remove unneeded cors
jprochazk Jan 22, 2025
6224260
Remove `envoy` config in favor of `tonic-web`
jprochazk Jan 22, 2025
5f6860d
`e` -> `err`
jprochazk Jan 22, 2025
f1f0fe5
fix duplicate `tower-http` dependency
jprochazk Jan 22, 2025
4566b75
Merge branch 'main' into jan/grpc-log-stream
jprochazk Jan 22, 2025
ca72d56
move todo
jprochazk Jan 22, 2025
1387d18
remove usage from readme
jprochazk Jan 22, 2025
d9fcf0d
put port/addr in const
jprochazk Jan 22, 2025
858c844
sort
jprochazk Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6077,6 +6077,8 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tonic",
"tonic-web",
"tower-http",
]

[[package]]
Expand Down Expand Up @@ -9012,6 +9014,26 @@ dependencies = [
"syn 2.0.87",
]

[[package]]
name = "tonic-web"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5299dd20801ad736dccb4a5ea0da7376e59cd98f213bf1c3d478cf53f4834b58"
dependencies = [
"base64 0.22.1",
"bytes",
"http 1.1.0",
"http-body 1.0.1",
"http-body-util",
"pin-project",
"tokio-stream",
"tonic",
"tower-http",
"tower-layer",
"tower-service",
"tracing",
]

[[package]]
name = "tonic-web-wasm-client"
version = "0.6.0"
Expand Down Expand Up @@ -9071,6 +9093,22 @@ dependencies = [
"tower-service",
]

[[package]]
name = "tower-http"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
dependencies = [
"bitflags 2.8.0",
"bytes",
"http 1.1.0",
"http-body 1.0.1",
"http-body-util",
"pin-project-lite",
"tower-layer",
"tower-service",
]

[[package]]
name = "tower-layer"
version = "0.3.3"
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ tokio-util = { version = "0.7.12", default-features = false }
toml = { version = "0.8.10", default-features = false }
tonic = { version = "0.12.3", default-features = false }
tonic-build = { version = "0.12.3", default-features = false }
tonic-web = "0.12"
tonic-web-wasm-client = "0.6"
tower-http = "0.5"
tracing = { version = "0.1", default-features = false }
tungstenite = { version = "0.24", default-features = false }
type-map = "0.5"
Expand Down
17 changes: 17 additions & 0 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub enum DataSource {
/// over `rerun://` gRPC interface.
#[cfg(feature = "grpc")]
RerunGrpcUrl { url: String },

/// A stream of messages over gRPC, relayed from the SDK.
#[cfg(feature = "grpc")]
MessageProxy { url: String },
}

impl DataSource {
Expand Down Expand Up @@ -95,6 +99,12 @@ impl DataSource {
return Self::RerunGrpcUrl { url: uri };
}

// TODO(#8761): URL prefix
#[cfg(feature = "grpc")]
if uri.starts_with("temp://") {
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
return Self::MessageProxy { url: uri };
}

if uri.starts_with("file://") || path.exists() {
Self::FilePath(file_source, path)
} else if uri.starts_with("http://")
Expand Down Expand Up @@ -138,6 +148,8 @@ impl DataSource {
Self::Stdin => None,
#[cfg(feature = "grpc")]
Self::RerunGrpcUrl { .. } => None, // TODO(jleibs): This needs to come from the server.
#[cfg(feature = "grpc")]
Self::MessageProxy { .. } => None,
}
}

Expand Down Expand Up @@ -249,6 +261,11 @@ impl DataSource {
Self::RerunGrpcUrl { url } => {
re_grpc_client::stream_from_redap(url, on_msg).map_err(|err| err.into())
}

#[cfg(feature = "grpc")]
Self::MessageProxy { url } => {
re_grpc_client::message_proxy::stream(url, on_msg).map_err(|err| err.into())
}
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions crates/store/re_grpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ re_arrow_util.workspace = true
re_chunk.workspace = true
re_error.workspace = true
re_log.workspace = true
re_log_encoding.workspace = true
re_log_encoding = { workspace = true, features = ["encoder", "decoder"] }
re_log_types.workspace = true
re_protos.workspace = true
re_smart_channel.workspace = true
Expand All @@ -38,9 +38,8 @@ url.workspace = true

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio.workspace = true
tonic = { workspace = true, default-features = false, features = ["transport"] }

tokio.workspace = true
jprochazk marked this conversation as resolved.
Show resolved Hide resolved

# Web dependencies:
[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
10 changes: 5 additions & 5 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Communications with an Rerun Data Platform gRPC server.

#[cfg(not(target_arch = "wasm32"))]
pub mod message_proxy;

use std::{collections::HashMap, error::Error, sync::Arc};
Expand Down Expand Up @@ -92,6 +91,9 @@ pub enum StreamError {
#[error(transparent)]
ChunkError(#[from] re_chunk::ChunkError),

#[error(transparent)]
DecodeError(#[from] re_log_encoding::decoder::DecodeError),

#[error("Invalid URI: {0}")]
InvalidUri(String),
}
Expand Down Expand Up @@ -176,8 +178,7 @@ async fn stream_recording_async(
#[cfg(target_arch = "wasm32")]
let tonic_client = tonic_web_wasm_client::Client::new_with_options(
redap_endpoint.to_string(),
tonic_web_wasm_client::options::FetchOptions::new()
.mode(tonic_web_wasm_client::options::Mode::Cors), // I'm not 100% sure this is needed, but it felt right.
emilk marked this conversation as resolved.
Show resolved Hide resolved
tonic_web_wasm_client::options::FetchOptions::new(),
);

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -340,8 +341,7 @@ async fn stream_catalog_async(
#[cfg(target_arch = "wasm32")]
let tonic_client = tonic_web_wasm_client::Client::new_with_options(
redap_endpoint.to_string(),
tonic_web_wasm_client::options::FetchOptions::new()
.mode(tonic_web_wasm_client::options::Mode::Cors), // I'm not 100% sure this is needed, but it felt right.
tonic_web_wasm_client::options::FetchOptions::new(),
);

#[cfg(not(target_arch = "wasm32"))]
Expand Down
177 changes: 6 additions & 171 deletions crates/store/re_grpc_client/src/message_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,173 +1,8 @@
use std::thread;
use std::thread::JoinHandle;
pub mod read;
pub use read::stream;

use re_log_encoding::Compression;
use re_log_types::LogMsg;
use re_protos::sdk_comms::v0::message_proxy_client::MessageProxyClient;
use tokio::runtime;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tonic::transport::Endpoint;
#[cfg(not(target_arch = "wasm32"))]
pub mod write;

enum Cmd {
LogMsg(LogMsg),
Flush(oneshot::Sender<()>),
}

#[derive(Clone)]
pub struct Options {
compression: Compression,
}

impl Default for Options {
fn default() -> Self {
Self {
compression: Compression::LZ4,
}
}
}

pub struct Client {
thread: Option<JoinHandle<()>>,
cmd_tx: UnboundedSender<Cmd>,
shutdown_tx: Sender<()>,
}

impl Client {
#[expect(clippy::needless_pass_by_value)]
pub fn new(addr: impl Into<String>, options: Options) -> Self {
let addr: String = addr.into();
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);

let thread = thread::Builder::new()
.name("message_proxy_client".to_owned())
.spawn(move || {
let mut runtime = runtime::Builder::new_current_thread();
runtime.enable_all();
runtime
.build()
.expect("Failed to build tokio runtime")
.block_on(message_proxy_client(
addr,
cmd_rx,
shutdown_rx,
options.compression,
));
})
.expect("Failed to spawn message proxy client thread");

Self {
thread: Some(thread),
cmd_tx,
shutdown_tx,
}
}

pub fn send(&self, msg: LogMsg) {
self.cmd_tx.send(Cmd::LogMsg(msg)).ok();
}

pub fn flush(&self) {
let (tx, rx) = oneshot::channel();
if self.cmd_tx.send(Cmd::Flush(tx)).is_err() {
re_log::debug!("Flush failed: already shut down.");
return;
};

match rx.blocking_recv() {
Ok(_) => {
re_log::debug!("Flush complete");
}
Err(_) => {
re_log::debug!("Flush failed, not all messages were sent");
}
}
}
}

impl Drop for Client {
fn drop(&mut self) {
re_log::debug!("Shutting down message proxy client");
// Wait for flush
self.flush();
// Quit immediately after that - no messages are left in the channel
self.shutdown_tx.try_send(()).ok();
// Wait for the shutdown
self.thread.take().map(|t| t.join().ok());
re_log::debug!("Message proxy client has shut down");
}
}

async fn message_proxy_client(
addr: String,
mut cmd_rx: UnboundedReceiver<Cmd>,
mut shutdown_rx: Receiver<()>,
compression: Compression,
) {
let endpoint = match Endpoint::from_shared(addr) {
Ok(endpoint) => endpoint,
Err(err) => {
re_log::error!("Failed to connect to message proxy server: {err}");
return;
}
};
let channel = match endpoint.connect().await {
Ok(channel) => channel,
Err(err) => {
re_log::error!("Failed to connect to message proxy server: {err}");
return;
}
};
let mut client = MessageProxyClient::new(channel);

let stream = async_stream::stream! {
loop {
tokio::select! {
cmd = cmd_rx.recv() => {
match cmd {
Some(Cmd::LogMsg(msg)) => {
let msg = match re_log_encoding::protobuf_conversions::log_msg_to_proto(msg, compression) {
Ok(msg) => msg,
Err(err) => {
re_log::error!("Failed to encode message: {err}");
break;
}
};

yield msg;
}

Some(Cmd::Flush(tx)) => {
// Messages are received in order, so once we receive a `flush`
// we know we've sent all messages before that flush through already.
re_log::debug!("Flush requested");
if tx.send(()).is_err() {
re_log::debug!("Failed to respond to flush: channel is closed");
return;
};
}

None => {
re_log::debug!("Channel closed");
break;
}
}
}

_ = shutdown_rx.recv() => {
re_log::debug!("Shutting down without flush");
return;
}
}
}
};

if let Err(err) = client.write_messages(stream).await {
re_log::error!("Write messages call failed: {err}");
};
}
#[cfg(not(target_arch = "wasm32"))]
pub use write::Client;
Loading
Loading