diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 88fa058c6b59..78c531573f18 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -54,7 +54,10 @@ use crate::region::options::MergeMode; use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; /// Initial vector builder capacity. -const INITIAL_BUILDER_CAPACITY: usize = 0; +const INITIAL_BUILDER_CAPACITY: usize = 16; + +/// Vector builder capacity. +const BUILDER_CAPACITY: usize = 512; /// Builder to build [TimeSeriesMemtable]. #[derive(Debug, Default)] @@ -154,9 +157,7 @@ impl TimeSeriesMemtable { ); let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; - let fields = kv.fields().collect::>(); - stats.value_bytes += fields.iter().map(|v| v.data_size()).sum::(); let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded); stats.key_bytes += series_allocated; @@ -166,7 +167,8 @@ impl TimeSeriesMemtable { stats.max_ts = stats.max_ts.max(ts); let mut guard = series.write().unwrap(); - guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields); + let size = guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields()); + stats.value_bytes += size; Ok(()) } @@ -617,6 +619,7 @@ struct Series { pk_cache: Option>, active: ValueBuilder, frozen: Vec, + region_metadata: RegionMetadataRef, } impl Series { @@ -625,12 +628,24 @@ impl Series { pk_cache: None, active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY), frozen: vec![], + region_metadata: region_metadata.clone(), } } - /// Pushes a row of values into Series. - fn push(&mut self, ts: ValueRef, sequence: u64, op_type: OpType, values: Vec) { - self.active.push(ts, sequence, op_type as u8, values); + /// Pushes a row of values into Series. Return the size of values. + fn push<'a>( + &mut self, + ts: ValueRef<'a>, + sequence: u64, + op_type: OpType, + values: impl Iterator>, + ) -> usize { + // + 10 to avoid potential reallocation. + if self.active.len() + 10 > BUILDER_CAPACITY { + let region_metadata = self.region_metadata.clone(); + self.freeze(®ion_metadata); + } + self.active.push(ts, sequence, op_type as u8, values) } fn update_pk_cache(&mut self, pk_values: Vec) { @@ -725,26 +740,44 @@ impl ValueBuilder { /// Pushes a new row to `ValueBuilder`. /// We don't need primary keys since they've already be encoded. - fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec) { - debug_assert_eq!(fields.len(), self.fields.len()); - self.timestamp.push_value_ref(ts); - self.sequence.push_value_ref(ValueRef::UInt64(sequence)); - self.op_type.push_value_ref(ValueRef::UInt8(op_type)); + /// Returns the size of field values. + /// + /// In this method, we don't check the data type of the value, because it is already checked in the caller. + fn push<'a>( + &mut self, + ts: ValueRef, + sequence: u64, + op_type: u8, + fields: impl Iterator>, + ) -> usize { + #[cfg(debug_assertions)] + let fields = { + let field_vec = fields.collect::>(); + debug_assert_eq!(field_vec.len(), self.fields.len()); + field_vec.into_iter() + }; + + let _ = self.timestamp.try_push_value_ref(ts); + let _ = self.sequence.try_push_value_ref(ValueRef::UInt64(sequence)); + let _ = self.op_type.try_push_value_ref(ValueRef::UInt8(op_type)); let num_rows = self.timestamp.len(); - for (idx, field_value) in fields.into_iter().enumerate() { + let mut size = 0; + for (idx, field_value) in fields.enumerate() { + size += field_value.data_size(); if !field_value.is_null() || self.fields[idx].is_some() { - self.fields[idx] - .get_or_insert_with(|| { - // lazy initialize on first non-null value - let mut mutable_vector = - self.field_types[idx].create_mutable_vector(num_rows); - // fill previous rows with nulls - mutable_vector.push_nulls(num_rows - 1); - mutable_vector - }) - .push_value_ref(field_value); + if let Some(field) = self.fields[idx].as_mut() { + let _ = field.try_push_value_ref(field_value); + } else { + let mut mutable_vector = self.field_types[idx] + .create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY)); + mutable_vector.push_nulls(num_rows - 1); + let _ = mutable_vector.try_push_value_ref(field_value); + self.fields[idx] = Some(mutable_vector); + } } } + + size } /// Returns the length of [ValueBuilder] @@ -951,8 +984,8 @@ mod tests { ValueRef::Timestamp(Timestamp::new_millisecond(val)) } - fn field_value_ref(v0: i64, v1: f64) -> Vec> { - vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))] + fn field_value_ref(v0: i64, v1: f64) -> impl Iterator> { + vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter() } fn check_values(values: Values, expect: &[(i64, u64, u8, i64, f64)]) { @@ -1014,20 +1047,20 @@ mod tests { ts_value_ref(1), 0, OpType::Put, - vec![ValueRef::Null, ValueRef::Null], + vec![ValueRef::Null, ValueRef::Null].into_iter(), ); series.push( ts_value_ref(1), 0, OpType::Put, - vec![ValueRef::Int64(1), ValueRef::Null], + vec![ValueRef::Int64(1), ValueRef::Null].into_iter(), ); series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2)); series.push( ts_value_ref(1), 3, OpType::Put, - vec![ValueRef::Int64(2), ValueRef::Null], + vec![ValueRef::Int64(2), ValueRef::Null].into_iter(), ); assert_eq!(4, series.active.timestamp.len()); assert_eq!(0, series.frozen.len()); diff --git a/src/mito2/src/row_converter/dense.rs b/src/mito2/src/row_converter/dense.rs index 8c3d497d7e21..443c4ead12cd 100644 --- a/src/mito2/src/row_converter/dense.rs +++ b/src/mito2/src/row_converter/dense.rs @@ -348,8 +348,8 @@ impl DensePrimaryKeyCodec { I: Iterator>, { let mut serializer = Serializer::new(buffer); - for (value, (_, field)) in row.zip(self.ordered_primary_key_columns.iter()) { - field.serialize(&mut serializer, &value)?; + for (idx, value) in row.enumerate() { + self.field_at(idx).serialize(&mut serializer, &value)?; } Ok(()) }