Skip to content

Commit

Permalink
feat: Partition memtables by time if compaction window is provided (#…
Browse files Browse the repository at this point in the history
…3501)

* feat: define time partitions

* feat: adapt time partitions to version

* feat: implement non write methods

* feat: add write one to memtable

* feat: implement write

* chore: fix warning

* fix: inner not set

* refactor: add collect_iter_timestamps

* test: test partitions

* chore: debug log

* chore: fix typos

* chore: log memtable id

* fix: empty check

* chore: log total parts

* chore: update comments
  • Loading branch information
evenyag authored Mar 14, 2024
1 parent 3a32677 commit 8ca9e01
Show file tree
Hide file tree
Showing 12 changed files with 784 additions and 127 deletions.
7 changes: 6 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ use table::predicate::Predicate;

use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;

pub mod key_values;
pub mod merge_tree;
pub mod time_partition;
pub mod time_series;
pub(crate) mod version;

Expand Down Expand Up @@ -82,9 +84,12 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the id of this memtable.
fn id(&self) -> MemtableId;

/// Write key values into the memtable.
/// Writes key values into the memtable.
fn write(&self, kvs: &KeyValues) -> Result<()>;

/// Writes one key value pair into the memtable.
fn write_one(&self, key_value: KeyValue) -> Result<()>;

/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl KeyValues {
/// Primary key columns have the same order as region's primary key. Field
/// columns are ordered by their position in the region schema (The same order
/// as users defined while creating the region).
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct KeyValue<'a> {
row: &'a Row,
schema: &'a Vec<ColumnSchema>,
Expand Down
80 changes: 19 additions & 61 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use table::predicate::Predicate;

use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::tree::MergeTree;
use crate::memtable::{
Expand Down Expand Up @@ -127,6 +128,17 @@ impl Memtable for MergeTreeMemtable {
res
}

fn write_one(&self, key_value: KeyValue) -> Result<()> {
let mut metrics = WriteMetrics::default();
let mut pk_buffer = Vec::new();
// Ensures the memtable always updates stats.
let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);

self.update_stats(&metrics);

res
}

fn iter(
&self,
projection: Option<&[ColumnId]>,
Expand Down Expand Up @@ -290,16 +302,14 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use datatypes::vectors::Int64Vector;

use super::*;
use crate::test_util::memtable_util;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};

#[test]
fn test_memtable_sorted_input() {
Expand All @@ -322,23 +332,10 @@ mod tests {
let expected_ts = kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<BTreeSet<_>>();
.collect::<Vec<_>>();

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<BTreeSet<_>>();
let read = collect_iter_timestamps(iter);
assert_eq!(expected_ts, read);

let stats = memtable.stats();
Expand Down Expand Up @@ -386,20 +383,7 @@ mod tests {
memtable.write(&kvs).unwrap();

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let read = collect_iter_timestamps(iter);
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);

let iter = memtable.iter(None, None).unwrap();
Expand Down Expand Up @@ -514,20 +498,7 @@ mod tests {

let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let read = collect_iter_timestamps(iter);
assert_eq!(expect, read);
}

Expand Down Expand Up @@ -564,20 +535,7 @@ mod tests {
let iter = memtable
.iter(None, Some(Predicate::new(vec![expr.into()])))
.unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let read = collect_iter_timestamps(iter);
assert_eq!(timestamps, read);
}
}
Expand Down
48 changes: 48 additions & 0 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,54 @@ impl MergeTree {
Ok(())
}

/// Write one key value pair into the tree.
///
/// # Panics
/// Panics if the tree is immutable (frozen).
pub fn write_one(
&self,
kv: KeyValue,
pk_buffer: &mut Vec<u8>,
metrics: &mut WriteMetrics,
) -> Result<()> {
let has_pk = !self.metadata.primary_key.is_empty();

ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);
metrics.max_ts = metrics.max_ts.max(ts);
metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();

if !has_pk {
// No primary key.
return self.write_no_key(kv);
}

// Encode primary key.
pk_buffer.clear();
if self.is_partitioned {
// Use sparse encoder for metric engine.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
} else {
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
}

// Write rows with
self.write_with_key(pk_buffer, kv, metrics)?;

metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();

Ok(())
}

/// Scans the tree.
pub fn read(
&self,
Expand Down
Loading

0 comments on commit 8ca9e01

Please sign in to comment.