diff --git a/crates/rooch-da/src/chunk.rs b/crates/rooch-da/src/chunk.rs index b8825672c9..3f5789d113 100644 --- a/crates/rooch-da/src/chunk.rs +++ b/crates/rooch-da/src/chunk.rs @@ -97,6 +97,7 @@ impl Chunk for ChunkV0 { segment_number: i as u64, }, is_last: i == segments_count - 1, // extra info overhead is much smaller than max_block_size - max_segment_size + data_len: data.len() as u64, // *_checksum will be filled in to_bytes method of Segment data_checksum: 0, checksum: 0, diff --git a/crates/rooch-da/src/segment.rs b/crates/rooch-da/src/segment.rs index 354105d8ab..61cd75a285 100644 --- a/crates/rooch-da/src/segment.rs +++ b/crates/rooch-da/src/segment.rs @@ -16,13 +16,14 @@ pub trait Segment: fmt::Debug + Send { fn is_last(&self) -> bool; } -pub const SEGMENT_V0_DATA_OFFSET: usize = 42; -pub const SEGMENT_V0_CHECKSUM_OFFSET: usize = 34; +pub const SEGMENT_V0_DATA_OFFSET: usize = 50; +pub const SEGMENT_V0_CHECKSUM_OFFSET: usize = 42; #[derive(Serialize, Debug, PartialEq, Clone)] pub struct SegmentV0 { pub id: SegmentID, pub is_last: bool, // is last segment in chunk + pub data_len: u64, // length of data pub data_checksum: u64, // checksum of data, xxh3_64 pub checksum: u64, // checksum of above fields(exclude data) and version after to_bytes, xxh3_64 @@ -44,9 +45,20 @@ impl SegmentV0 { let chunk_id = u128::from_le_bytes(bytes[1..17].try_into()?); let segment_number = u64::from_le_bytes(bytes[17..25].try_into()?); let is_last = bytes[25] != 0; - let data_checksum = u64::from_le_bytes(bytes[26..34].try_into()?); - let checksum = u64::from_le_bytes(bytes[34..SEGMENT_V0_DATA_OFFSET].try_into()?); - let data = bytes[SEGMENT_V0_DATA_OFFSET..].to_vec(); + let data_len = u64::from_le_bytes(bytes[26..34].try_into()?); + let data_checksum = u64::from_le_bytes(bytes[34..42].try_into()?); + let checksum = u64::from_le_bytes(bytes[42..SEGMENT_V0_DATA_OFFSET].try_into()?); + // check bytes has enough length + if bytes.len() < SEGMENT_V0_DATA_OFFSET + data_len as usize { + return Err(anyhow::anyhow!(format!( + "segment_v0: bytes:{} less than exp header:{} + data:{}", + bytes.len(), + SEGMENT_V0_DATA_OFFSET, + data_len as usize + ))); + } + let data = + bytes[SEGMENT_V0_DATA_OFFSET..SEGMENT_V0_DATA_OFFSET + data_len as usize].to_vec(); let exp_checksum = xxh3_64(&bytes[0..SEGMENT_V0_CHECKSUM_OFFSET]); if exp_checksum != checksum { @@ -64,6 +76,7 @@ impl SegmentV0 { segment_number, }, is_last, + data_len, data_checksum, checksum, data, @@ -73,11 +86,12 @@ impl SegmentV0 { impl Segment for SegmentV0 { fn to_bytes(&self) -> Vec { - let mut bytes = Vec::with_capacity(SEGMENT_V0_DATA_OFFSET + self.data.len()); + let mut bytes = Vec::with_capacity(SEGMENT_V0_DATA_OFFSET + self.data_len as usize); bytes.push(ChunkVersion::V0.into()); // version bytes.extend_from_slice(&self.id.chunk_id.to_le_bytes()); bytes.extend_from_slice(&self.id.segment_number.to_le_bytes()); bytes.push(self.is_last as u8); + bytes.extend_from_slice(&self.data_len.to_le_bytes()); let data_checksum = xxh3_64(&self.data); bytes.extend_from_slice(&data_checksum.to_le_bytes()); let checksum = xxh3_64(&bytes[0..SEGMENT_V0_CHECKSUM_OFFSET]); @@ -179,6 +193,7 @@ mod tests { segment_number: 12345678, }, is_last: true, + data_len: 5, data_checksum: 1234567890, checksum: 12345678, data: vec![1, 2, 3, 4, 5], diff --git a/crates/rooch-da/src/server/openda/actor/server.rs b/crates/rooch-da/src/server/openda/actor/server.rs index 4db1bcd310..06b0a19d64 100644 --- a/crates/rooch-da/src/server/openda/actor/server.rs +++ b/crates/rooch-da/src/server/openda/actor/server.rs @@ -16,12 +16,14 @@ use crate::chunk::{Chunk, ChunkV0}; use rooch_config::da_config::{DAServerOpenDAConfig, OpenDAScheme}; use crate::messages::PutBatchInternalDAMessage; +use crate::segment::SegmentID; pub struct DAServerOpenDAActor { max_segment_size: usize, operator: Operator, } +pub const CHUNK_V0_PREFIX: &str = "chunk_v0"; pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 4 * 1024 * 1024; pub const DEFAULT_MAX_RETRY_TIMES: usize = 4; @@ -119,9 +121,9 @@ impl DAServerOpenDAActor { let segments = chunk.to_segments(self.max_segment_size); for segment in segments { let bytes = segment.to_bytes(); + match self - .operator - .write(&segment.get_id().to_string(), bytes) + .write_segment(segment.get_id(), bytes, CHUNK_V0_PREFIX.to_string()) .await { Ok(_) => { @@ -136,13 +138,26 @@ impl DAServerOpenDAActor { segment.get_id(), e, ); - return Err(e.into()); + return Err(e); } } } Ok(()) } + + async fn write_segment( + &self, + segment_id: SegmentID, + segment_bytes: Vec, + prefix: String, + ) -> Result<()> { + let path = format!("{}/{}", prefix, segment_id); + let mut w = self.operator.writer(&path).await?; + w.write(segment_bytes).await?; + w.close().await?; + Ok(()) + } } fn check_config_exist(