Skip to content

Commit

Permalink
configured via fragments per bundle
Browse files Browse the repository at this point in the history
  • Loading branch information
segfault-magnet committed Jan 28, 2025
1 parent 461c24e commit cef7a89
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 25 deletions.
4 changes: 2 additions & 2 deletions committer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ pub struct BundleConfig {
pub bytes_to_accumulate: NonZeroUsize,

/// TODO: rephrase
/// Max number of bytes the resulting bundle can be.
pub target_bundle_size: NonZeroUsize,
/// target number of fragments the resulting bundle can be.
pub target_fragments_per_bundle: NonZeroUsize,

/// Maximum duration allocated for determining the optimal bundle size.
///
Expand Down
4 changes: 2 additions & 2 deletions committer/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn block_bundler(
BlobEncoder,
bundle::Encoder::new(config.app.bundle.compression_level),
config.app.bundle.optimization_step,
config.app.bundle.target_bundle_size,
config.app.bundle.target_fragments_per_bundle,
);

let block_bundler = BlockBundler::new(
Expand All @@ -97,7 +97,7 @@ pub fn block_bundler(
optimization_time_limit: config.app.bundle.optimization_timeout,
accumulation_time_limit: config.app.bundle.accumulation_timeout,
bytes_to_accumulate: config.app.bundle.bytes_to_accumulate,
target_bundle_size: config.app.bundle.target_bundle_size,
target_fragments_per_bundle: config.app.bundle.target_fragments_per_bundle,
lookback_window: config.app.bundle.block_height_lookback,
max_bundles_per_optimization_run: num_cpus::get()
.try_into()
Expand Down
2 changes: 1 addition & 1 deletion e2e/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Committer {
"COMMITTER__APP__BUNDLE__BYTES_TO_ACCUMULATE",
get_field!(block_bytes_to_accumulate),
)
.env("COMMITTER__APP__BUNDLE__TARGET_BUNDLE_SIZE", "1572864")
.env("COMMITTER__APP__BUNDLE__TARGET_FRAGMENTS_PER_BUNDLE", "12")
.env(
"COMMITTER__APP__BUNDLE__OPTIMIZATION_TIMEOUT",
get_field!(bundle_optimization_timeout),
Expand Down
7 changes: 7 additions & 0 deletions packages/adapters/eth/src/blob_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ impl services::block_bundler::port::l1::FragmentEncoder for BlobEncoder {
fn gas_usage(&self, num_bytes: NonZeroUsize) -> u64 {
blob::Encoder::default().blobs_needed_to_encode(num_bytes.get()) as u64 * DATA_GAS_PER_BLOB
}

fn num_fragments_needed(&self, num_bytes: NonZeroUsize) -> NonZeroUsize {
blob::Encoder::default()
.blobs_needed_to_encode(num_bytes.get())
.try_into()
.expect("known to be non-zero")
}
}

#[cfg(test)]
Expand Down
5 changes: 3 additions & 2 deletions packages/services/src/block_bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub mod service {
pub accumulation_time_limit: Duration,
pub bytes_to_accumulate: NonZeroUsize,
// TODO: segfault this is not implemented
pub target_bundle_size: NonZeroUsize,
pub target_fragments_per_bundle: NonZeroUsize,
pub lookback_window: u32,
}

Expand All @@ -34,7 +34,7 @@ pub mod service {
optimization_time_limit: Duration::from_secs(100),
accumulation_time_limit: Duration::from_secs(100),
bytes_to_accumulate: NonZeroUsize::new(1).unwrap(),
target_bundle_size: NonZeroUsize::MAX,
target_fragments_per_bundle: NonZeroUsize::MAX,
lookback_window: 1000,
max_bundles_per_optimization_run: 1.try_into().unwrap(),
}
Expand Down Expand Up @@ -307,6 +307,7 @@ pub mod port {
id: NonNegative<i32>,
) -> Result<NonEmpty<Fragment>>;
fn gas_usage(&self, num_bytes: NonZeroUsize) -> u64;
fn num_fragments_needed(&self, num_bytes: NonZeroUsize) -> NonZeroUsize;
}
}

Expand Down
28 changes: 17 additions & 11 deletions packages/services/src/block_bundler/bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,21 @@ pub struct Factory<GasCalculator> {
gas_calc: GasCalculator,
bundle_encoder: bundle::Encoder,
step_size: NonZeroUsize,
target_bundle_size: NonZeroUsize,
target_num_fragments: NonZeroUsize,
}

impl<L1> Factory<L1> {
pub fn new(
gas_calc: L1,
bundle_encoder: bundle::Encoder,
step_size: NonZeroUsize,
target_bundle_size: NonZeroUsize,
target_num_fragments: NonZeroUsize,
) -> Self {
Self {
gas_calc,
bundle_encoder,
step_size,
target_bundle_size,
target_num_fragments,
}
}
}
Expand All @@ -120,7 +120,7 @@ where
self.bundle_encoder.clone(),
self.step_size,
id,
self.target_bundle_size,
self.target_num_fragments,
)
}
}
Expand Down Expand Up @@ -149,7 +149,7 @@ pub struct Bundler<FragmentEncoder> {
bundle_encoder: bundle::Encoder,
attempts: VecDeque<NonZeroUsize>,
bundle_id: NonNegative<i32>,
target_bundle_size: NonZeroUsize,
target_num_fragments: NonZeroUsize,
}

impl<T> Bundler<T> {
Expand All @@ -159,7 +159,7 @@ impl<T> Bundler<T> {
bundle_encoder: bundle::Encoder,
initial_step_size: NonZeroUsize,
bundle_id: NonNegative<i32>,
target_bundle_size: NonZeroUsize,
target_num_fragments: NonZeroUsize,
) -> Self {
let max_blocks = blocks.len();
let initial_step = initial_step_size;
Expand All @@ -174,7 +174,7 @@ impl<T> Bundler<T> {
bundle_encoder,
attempts,
bundle_id,
target_bundle_size,
target_num_fragments,
}
}

Expand Down Expand Up @@ -240,14 +240,19 @@ impl<T> Bundler<T> {
let fragment_encoder = self.fragment_encoder.clone();

// Needs to be wrapped in a blocking task to avoid blocking the executor
let target_bundle_size = self.target_bundle_size;
let target_num_fragments = self.target_num_fragments;
tokio::task::spawn_blocking(move || {
blocks_for_analyzing
.into_par_iter()
.map(|blocks| {
let fragment_encoder = fragment_encoder.clone();
let bundle_encoder = bundle_encoder.clone();
create_proposal(bundle_encoder, fragment_encoder, blocks, target_bundle_size)
create_proposal(
bundle_encoder,
fragment_encoder,
blocks,
target_num_fragments,
)
})
.collect::<Result<Vec<_>>>()
})
Expand Down Expand Up @@ -334,7 +339,7 @@ fn create_proposal(
bundle_encoder: bundle::Encoder,
fragment_encoder: impl crate::block_bundler::port::l1::FragmentEncoder,
bundle_blocks: NonEmpty<CompressedFuelBlock>,
target_bundle_size: NonZeroUsize,
target_num_fragments: NonZeroUsize,
) -> Result<Proposal> {
let block_heights = bundle_blocks.first().height..=bundle_blocks.last().height;

Expand All @@ -357,7 +362,8 @@ fn create_proposal(
let compressed_data = NonEmpty::from_vec(compressed_data)
.ok_or_else(|| crate::Error::Other("bundle encoder returned zero bytes".to_string()))?;

let meets_target = compressed_data.len_nonzero() <= target_bundle_size;
let meets_target = fragment_encoder.num_fragments_needed(compressed_data.len_nonzero())
<= target_num_fragments;

let gas_usage = fragment_encoder.gas_usage(compressed_data.len_nonzero());

Expand Down
14 changes: 7 additions & 7 deletions packages/services/tests/block_bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,10 @@ async fn test_fallback_to_smallest_invalid_proposal() {
async fn respects_target_bundle_size() {
// given
// in total 300B
let blocks = generate_storage_block_sequence(0..=2, 100);
let blocks = generate_storage_block_sequence(0..=1, enough_bytes_to_almost_fill_a_blob());

// can fit 2 blocks + encoding overhead, but not 3 blocks
let target_size = NonZeroUsize::new(240).unwrap();
// bundle should fit in 1 blob
let target_size = NonZeroUsize::new(1).unwrap();

let mut bundler = Bundler::new(
BlobEncoder,
Expand All @@ -818,12 +818,12 @@ async fn respects_target_bundle_size() {

assert_eq!(
final_bundle.metadata.block_heights,
0..=1,
"Expected only two blocks to fit in bundle"
0..=0,
"Expected one block to fit in bundle"
);

assert!(
final_bundle.metadata.compressed_data_size.get() <= target_size.get(),
final_bundle.metadata.num_fragments <= target_size,
"Bundle should not exceed target size"
);
}
Expand Down Expand Up @@ -854,7 +854,7 @@ async fn chooses_less_blocks_for_better_gas_usage() {
bundle::Encoder::new(CompressionLevel::Disabled),
NonZeroUsize::new(1).unwrap(),
1u16.into(),
NonZeroUsize::new(fragment_capacity * 4).unwrap(),
NonZeroUsize::new(4).unwrap(),
);
while bundler.advance(1.try_into().unwrap()).await.unwrap() {}

Expand Down

0 comments on commit cef7a89

Please sign in to comment.