Skip to content

Commit

Permalink
Add support for asynchronous decoding of rrd stream (#8705)
Browse files Browse the repository at this point in the history
### What

Add a streaming decoder for ``LogMsg``s.  

Commonalized some of the logic between ``StreamingDecoder`` and the
``Decoder``, but there is still some repetition that comes from the fact
we're working with a ``std::io::Read`` in one case and the buffer of
bytes in the other.
  • Loading branch information
zehiko authored Jan 20, 2025
1 parent 18ce378 commit 65de527
Show file tree
Hide file tree
Showing 8 changed files with 454 additions and 24 deletions.
11 changes: 7 additions & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,9 @@ checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"

[[package]]
name = "bytes"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"

[[package]]
name = "cacache"
Expand Down Expand Up @@ -3900,7 +3900,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]

[[package]]
Expand Down Expand Up @@ -5218,7 +5218,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck 0.4.1",
"heck 0.5.0",
"itertools 0.13.0",
"log",
"multimap",
Expand Down Expand Up @@ -6092,6 +6092,7 @@ name = "re_log_encoding"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"bytes",
"criterion",
"ehttp",
"js-sys",
Expand All @@ -6111,6 +6112,8 @@ dependencies = [
"serde_test",
"similar-asserts",
"thiserror 1.0.65",
"tokio",
"tokio-stream",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ bit-vec = "0.8"
bitflags = { version = "2.4", features = ["bytemuck"] }
blackbox = "0.2.0"
bytemuck = { version = "1.18", features = ["extern_crate_alloc"] }
bytes = "1.0"
camino = "1.1"
cargo_metadata = "0.18"
cargo-run-wasm = "0.3.2"
Expand Down
12 changes: 11 additions & 1 deletion crates/store/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ all-features = true
default = []

## Enable loading data from an .rrd file.
decoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"]
decoder = [
"re_log_types/serde",
"dep:bytes",
"dep:lz4_flex",
"dep:rmp-serde",
"dep:tokio",
"dep:tokio-stream",
]

## Enable encoding of log messages to an .rrd file/stream.
encoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"]
Expand Down Expand Up @@ -57,9 +64,12 @@ parking_lot.workspace = true
thiserror.workspace = true

# Optional external dependencies:
bytes = { workspace = true, optional = true }
ehttp = { workspace = true, optional = true, features = ["streaming"] }
lz4_flex = { workspace = true, optional = true }
rmp-serde = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["io-util"] }
tokio-stream = { workspace = true, optional = true }
web-time = { workspace = true, optional = true }

# Web dependencies:
Expand Down
22 changes: 14 additions & 8 deletions crates/store/re_log_encoding/src/codec/file/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@ use re_log_types::LogMsg;
use re_protos::missing_field;

pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option<LogMsg>), DecodeError> {
use re_protos::external::prost::Message;
use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};

let mut read_bytes = 0u64;
let header = MessageHeader::decode(data)?;
read_bytes += std::mem::size_of::<MessageHeader>() as u64 + header.len;

let mut buf = vec![0; header.len as usize];
data.read_exact(&mut buf[..])?;

let msg = match header.kind {
let msg = decode_bytes(header.kind, &buf)?;

Ok((read_bytes, msg))
}

pub fn decode_bytes(message_kind: MessageKind, buf: &[u8]) -> Result<Option<LogMsg>, DecodeError> {
use re_protos::external::prost::Message;
use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};

let msg = match message_kind {
MessageKind::SetStoreInfo => {
let set_store_info = SetStoreInfo::decode(&buf[..])?;
let set_store_info = SetStoreInfo::decode(buf)?;
Some(LogMsg::SetStoreInfo(set_store_info.try_into()?))
}
MessageKind::ArrowMsg => {
let arrow_msg = ArrowMsg::decode(&buf[..])?;
let arrow_msg = ArrowMsg::decode(buf)?;
if arrow_msg.encoding() != Encoding::ArrowIpc {
return Err(DecodeError::Codec(CodecError::UnsupportedEncoding));
}
Expand All @@ -43,13 +49,13 @@ pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option<LogMs
Some(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?))
}
MessageKind::BlueprintActivationCommand => {
let blueprint_activation_command = BlueprintActivationCommand::decode(&buf[..])?;
let blueprint_activation_command = BlueprintActivationCommand::decode(buf)?;
Some(LogMsg::BlueprintActivationCommand(
blueprint_activation_command.try_into()?,
))
}
MessageKind::End => None,
};

Ok((read_bytes, msg))
Ok(msg)
}
15 changes: 15 additions & 0 deletions crates/store/re_log_encoding/src/codec/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ impl MessageHeader {
let mut buf = [0; std::mem::size_of::<Self>()];
data.read_exact(&mut buf)?;

Self::from_bytes(&buf)
}

/// Decode a message header from a byte buffer. Input buffer must be exactly 16 bytes long.
/// TODO(zehiko) this should be public, we need to shuffle things around to ensure that #8726
#[cfg(feature = "decoder")]
pub fn from_bytes(buf: &[u8]) -> Result<Self, crate::decoder::DecodeError> {
if buf.len() != 16 {
return Err(crate::decoder::DecodeError::Codec(
crate::codec::CodecError::HeaderDecoding(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"invalid header length",
)),
));
}
#[allow(clippy::unwrap_used)] // cannot fail
let kind = u64::from_le_bytes(buf[0..8].try_into().unwrap());
let kind = match kind {
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_log_encoding/src/decoder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Decoding [`LogMsg`]:es from `.rrd` files/streams.
pub mod stream;
#[cfg(feature = "decoder")]
pub mod streaming;

use std::io::BufRead as _;
use std::io::Read;
Expand Down Expand Up @@ -412,14 +414,14 @@ mod tests {
};

// TODO(#3741): remove this once we are all in on arrow-rs
fn strip_arrow_extensions_from_log_messages(log_msg: Vec<LogMsg>) -> Vec<LogMsg> {
pub fn strip_arrow_extensions_from_log_messages(log_msg: Vec<LogMsg>) -> Vec<LogMsg> {
log_msg
.into_iter()
.map(LogMsg::strip_arrow_extension_types)
.collect()
}

fn fake_log_messages() -> Vec<LogMsg> {
pub fn fake_log_messages() -> Vec<LogMsg> {
let store_id = StoreId::random(StoreKind::Blueprint);

let arrow_msg = re_chunk::Chunk::builder("test_entity".into())
Expand Down Expand Up @@ -527,8 +529,6 @@ mod tests {
];

for options in options {
println!("{options:?}");

let mut data = vec![];

// write "2 files" i.e. 2 streams that end with end-of-stream marker
Expand Down
Loading

0 comments on commit 65de527

Please sign in to comment.