Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize time series memtable ingestion #5451

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 61 additions & 28 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};

/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 0;
const INITIAL_BUILDER_CAPACITY: usize = 16;

/// Vector builder capacity.
const BUILDER_CAPACITY: usize = 512;
waynexia marked this conversation as resolved.
Show resolved Hide resolved

/// Builder to build [TimeSeriesMemtable].
#[derive(Debug, Default)]
Expand Down Expand Up @@ -154,9 +157,7 @@ impl TimeSeriesMemtable {
);

let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();

stats.value_bytes += fields.iter().map(|v| v.data_size()).sum::<usize>();
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
stats.key_bytes += series_allocated;

Expand All @@ -166,7 +167,8 @@ impl TimeSeriesMemtable {
stats.max_ts = stats.max_ts.max(ts);

let mut guard = series.write().unwrap();
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
let size = guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), kv.fields());
stats.value_bytes += size;

Ok(())
}
Expand Down Expand Up @@ -617,6 +619,7 @@ struct Series {
pk_cache: Option<Vec<Value>>,
active: ValueBuilder,
frozen: Vec<Values>,
region_metadata: RegionMetadataRef,
}

impl Series {
Expand All @@ -625,12 +628,24 @@ impl Series {
pk_cache: None,
active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY),
frozen: vec![],
region_metadata: region_metadata.clone(),
}
}

/// Pushes a row of values into Series.
fn push(&mut self, ts: ValueRef, sequence: u64, op_type: OpType, values: Vec<ValueRef>) {
self.active.push(ts, sequence, op_type as u8, values);
/// Pushes a row of values into Series. Return the size of values.
fn push<'a>(
&mut self,
ts: ValueRef<'a>,
sequence: u64,
op_type: OpType,
values: impl Iterator<Item = ValueRef<'a>>,
) -> usize {
// + 10 to avoid potential reallocation.
if self.active.len() + 10 > BUILDER_CAPACITY {
let region_metadata = self.region_metadata.clone();
self.freeze(&region_metadata);
}
self.active.push(ts, sequence, op_type as u8, values)
}

fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
Expand Down Expand Up @@ -725,26 +740,44 @@ impl ValueBuilder {

/// Pushes a new row to `ValueBuilder`.
/// We don't need primary keys since they've already be encoded.
fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec<ValueRef>) {
debug_assert_eq!(fields.len(), self.fields.len());
self.timestamp.push_value_ref(ts);
self.sequence.push_value_ref(ValueRef::UInt64(sequence));
self.op_type.push_value_ref(ValueRef::UInt8(op_type));
/// Returns the size of field values.
///
/// In this method, we don't check the data type of the value, because it is already checked in the caller.
fn push<'a>(
&mut self,
ts: ValueRef,
sequence: u64,
op_type: u8,
fields: impl Iterator<Item = ValueRef<'a>>,
) -> usize {
#[cfg(debug_assertions)]
let fields = {
let field_vec = fields.collect::<Vec<_>>();
debug_assert_eq!(field_vec.len(), self.fields.len());
field_vec.into_iter()
};

let _ = self.timestamp.try_push_value_ref(ts);
let _ = self.sequence.try_push_value_ref(ValueRef::UInt64(sequence));
let _ = self.op_type.try_push_value_ref(ValueRef::UInt8(op_type));
let num_rows = self.timestamp.len();
for (idx, field_value) in fields.into_iter().enumerate() {
let mut size = 0;
for (idx, field_value) in fields.enumerate() {
size += field_value.data_size();
if !field_value.is_null() || self.fields[idx].is_some() {
self.fields[idx]
.get_or_insert_with(|| {
// lazy initialize on first non-null value
let mut mutable_vector =
self.field_types[idx].create_mutable_vector(num_rows);
// fill previous rows with nulls
mutable_vector.push_nulls(num_rows - 1);
mutable_vector
})
.push_value_ref(field_value);
if let Some(field) = self.fields[idx].as_mut() {
let _ = field.try_push_value_ref(field_value);
} else {
let mut mutable_vector = self.field_types[idx]
.create_mutable_vector(num_rows.max(INITIAL_BUILDER_CAPACITY));
mutable_vector.push_nulls(num_rows - 1);
let _ = mutable_vector.try_push_value_ref(field_value);
self.fields[idx] = Some(mutable_vector);
}
}
}

size
}

/// Returns the length of [ValueBuilder]
Expand Down Expand Up @@ -951,8 +984,8 @@ mod tests {
ValueRef::Timestamp(Timestamp::new_millisecond(val))
}

fn field_value_ref(v0: i64, v1: f64) -> Vec<ValueRef<'static>> {
vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))]
fn field_value_ref(v0: i64, v1: f64) -> impl Iterator<Item = ValueRef<'static>> {
vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))].into_iter()
}

fn check_values(values: Values, expect: &[(i64, u64, u8, i64, f64)]) {
Expand Down Expand Up @@ -1014,20 +1047,20 @@ mod tests {
ts_value_ref(1),
0,
OpType::Put,
vec![ValueRef::Null, ValueRef::Null],
vec![ValueRef::Null, ValueRef::Null].into_iter(),
);
series.push(
ts_value_ref(1),
0,
OpType::Put,
vec![ValueRef::Int64(1), ValueRef::Null],
vec![ValueRef::Int64(1), ValueRef::Null].into_iter(),
);
series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
series.push(
ts_value_ref(1),
3,
OpType::Put,
vec![ValueRef::Int64(2), ValueRef::Null],
vec![ValueRef::Int64(2), ValueRef::Null].into_iter(),
);
assert_eq!(4, series.active.timestamp.len());
assert_eq!(0, series.frozen.len());
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/row_converter/dense.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ impl DensePrimaryKeyCodec {
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, (_, field)) in row.zip(self.ordered_primary_key_columns.iter()) {
field.serialize(&mut serializer, &value)?;
for (idx, value) in row.enumerate() {
self.field_at(idx).serialize(&mut serializer, &value)?;
}
Ok(())
}
Expand Down
Loading