From e481f073f5dbe052a42949cc7dd49e029df8ad8f Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sun, 25 Feb 2024 21:06:01 +0800 Subject: [PATCH] feat: Implement dedup for the new memtable and expose the config (#3377) * fix: KeyValues num_fields() is incorrect * chore: fix warnings * feat: support dedup * feat: allow using the new memtable * feat: serde default for config * fix: resets pk index after finishing a dict --- src/mito2/src/config.rs | 5 + src/mito2/src/engine/basic_test.rs | 4 +- src/mito2/src/memtable/key_values.rs | 48 ++++++-- src/mito2/src/memtable/merge_tree.rs | 16 ++- src/mito2/src/memtable/merge_tree/data.rs | 27 +---- src/mito2/src/memtable/merge_tree/dedup.rs | 85 ++++++------- src/mito2/src/memtable/merge_tree/dict.rs | 11 +- .../src/memtable/merge_tree/partition.rs | 112 +++++++++++------- src/mito2/src/memtable/merge_tree/shard.rs | 40 ++++++- src/mito2/src/test_util/version_util.rs | 6 +- src/mito2/src/worker.rs | 15 ++- 11 files changed, 223 insertions(+), 146 deletions(-) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 32376aa802ed..2f97b5b7f697 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, NoneAsEmptyString}; use crate::error::Result; +use crate::memtable::merge_tree::MergeTreeConfig; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; /// Default max running background job. @@ -102,6 +103,9 @@ pub struct MitoConfig { /// Inverted index configs. pub inverted_index: InvertedIndexConfig, + + /// Experimental memtable. + pub experimental_memtable: Option, } impl Default for MitoConfig { @@ -127,6 +131,7 @@ impl Default for MitoConfig { parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, inverted_index: InvertedIndexConfig::default(), + experimental_memtable: None, }; // Adjust buffer and cache size according to system memory if we can. diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 7236c323c58e..ca6f0c173e82 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -550,11 +550,11 @@ async fn test_region_usage() { flush_region(&engine, region_id, None).await; let region_stat = region.region_usage().await; - assert_eq!(region_stat.wal_usage, 0); assert_eq!(region_stat.sst_usage, 2962); // region total usage - assert_eq!(region_stat.disk_usage(), 4028); + // Some memtables may share items. + assert!(region_stat.disk_usage() >= 4028); } #[tokio::test] diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 29e7b0beca12..4986a81cb2c1 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -124,7 +124,7 @@ impl<'a> KeyValue<'a> { /// Get number of field columns. pub fn num_fields(&self) -> usize { - self.row.values.len() - self.helper.num_primary_key_column - 1 + self.helper.indices.len() - self.helper.num_primary_key_column - 1 } /// Get sequence. @@ -261,7 +261,13 @@ mod tests { } } - fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) { + fn check_key_values( + kvs: &KeyValues, + num_rows: usize, + keys: &[Option], + ts: i64, + values: &[Option], + ) { assert_eq!(num_rows, kvs.num_rows()); let mut expect_seq = START_SEQ; let expect_ts = ValueRef::Int64(ts); @@ -273,10 +279,10 @@ mod tests { assert_eq!(values.len(), kv.num_fields()); assert_eq!(expect_ts, kv.timestamp()); - let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::Int64(*k)).collect(); + let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect(); let actual_keys: Vec<_> = kv.primary_keys().collect(); assert_eq!(expect_keys, actual_keys); - let expect_values: Vec<_> = values.iter().map(|v| ValueRef::Int64(*v)).collect(); + let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect(); let actual_values: Vec<_> = kv.fields().collect(); assert_eq!(expect_values, actual_values); } @@ -312,7 +318,7 @@ mod tests { // KeyValues // keys: [k0=2, k1=0] // ts: 1, - check_key_values(&kvs, 3, &[2, 0], 1, &[]); + check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]); } #[test] @@ -325,7 +331,7 @@ mod tests { // KeyValues (note that v0 is in front of v1 in region schema) // ts: 2, // fields: [v0=1, v1=0] - check_key_values(&kvs, 3, &[], 2, &[1, 0]); + check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]); } #[test] @@ -339,6 +345,34 @@ mod tests { // keys: [k0=0, k1=3] // ts: 2, // fields: [v0=1, v1=4] - check_key_values(&kvs, 3, &[0, 3], 2, &[1, 4]); + check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]); + } + + #[test] + fn test_sparse_field() { + let meta = new_region_metadata(2, 2); + // The value of each row: + // k0=0, v0=1, ts=2, k1=3, (v1 will be null) + let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3); + let kvs = KeyValues::new(&meta, mutation).unwrap(); + // KeyValues + // keys: [k0=0, k1=3] + // ts: 2, + // fields: [v0=1, v1=null] + check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]); + } + + #[test] + fn test_sparse_tag_field() { + let meta = new_region_metadata(2, 2); + // The value of each row: + // k0 = 0, v0=1, ts=2, (k1, v1 will be null) + let mutation = new_mutation(&["k0", "v0", "ts"], 3); + let kvs = KeyValues::new(&meta, mutation).unwrap(); + // KeyValues + // keys: [k0=0, k1=null] + // ts: 2, + // fields: [v0=1, v1=null] + check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]); } } diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 8244862a4256..a1e0b2ec9df9 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -28,6 +28,7 @@ use std::fmt; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; +use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; @@ -54,7 +55,8 @@ struct PkId { } /// Config for the merge tree memtable. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct MergeTreeConfig { /// Max keys in an index shard. pub index_max_keys_per_shard: usize, @@ -248,16 +250,19 @@ impl MergeTreeMemtable { /// Builder to build a [MergeTreeMemtable]. #[derive(Debug, Default)] pub struct MergeTreeMemtableBuilder { - write_buffer_manager: Option, config: MergeTreeConfig, + write_buffer_manager: Option, } impl MergeTreeMemtableBuilder { /// Creates a new builder with specific `write_buffer_manager`. - pub fn new(write_buffer_manager: Option) -> Self { + pub fn new( + config: MergeTreeConfig, + write_buffer_manager: Option, + ) -> Self { Self { + config, write_buffer_manager, - config: MergeTreeConfig::default(), } } } @@ -420,7 +425,8 @@ mod tests { memtable_util::metadata_with_primary_key(vec![], false) }; // Try to build a memtable via the builder. - let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata); + let memtable = + MergeTreeMemtableBuilder::new(MergeTreeConfig::default(), None).build(1, &metadata); let expect = (0..100).collect::>(); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1); diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 418784fc71e0..71ee32bb1b16 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -883,29 +883,6 @@ impl DataPartsReader { } } -#[cfg(test)] -pub(crate) fn write_rows_to_buffer( - buffer: &mut DataBuffer, - schema: &RegionMetadataRef, - pk_index: u16, - ts: Vec, - v0: Vec>, - sequence: u64, -) { - let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values( - schema, - "whatever".to_string(), - 1, - ts.into_iter(), - v0.into_iter(), - sequence, - ); - - for kv in kvs.iter() { - buffer.write_row(pk_index, kv); - } -} - #[cfg(test)] mod tests { use datafusion::arrow::array::Float64Array; @@ -914,7 +891,9 @@ mod tests { use parquet::data_type::AsBytes; use super::*; - use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test}; + use crate::test_util::memtable_util::{ + extract_data_batch, metadata_for_test, write_rows_to_buffer, + }; #[test] fn test_lazy_mutable_vector_builder() { diff --git a/src/mito2/src/memtable/merge_tree/dedup.rs b/src/mito2/src/memtable/merge_tree/dedup.rs index 889db134debd..b5155d31d5e2 100644 --- a/src/mito2/src/memtable/merge_tree/dedup.rs +++ b/src/mito2/src/memtable/merge_tree/dedup.rs @@ -16,39 +16,19 @@ use std::ops::Range; use crate::error::Result; use crate::memtable::merge_tree::data::DataBatch; +use crate::memtable::merge_tree::shard::DataBatchSource; use crate::memtable::merge_tree::PkId; -pub trait DedupSource { - /// Returns whether current source is still valid. - fn is_valid(&self) -> bool; - - /// Advances source to next data batch. - fn next(&mut self) -> Result<()>; - - /// Returns current pk id. - /// # Panics - /// If source is not valid. - fn current_pk_id(&self) -> PkId; - - /// Returns the current primary key bytes. - /// # Panics - /// If source is not valid. - fn current_key(&self) -> &[u8]; - - /// Returns the data part. - /// # Panics - /// If source is not valid. - fn current_data_batch(&self) -> DataBatch; -} - -struct DedupReader { +/// A reader that dedup sorted batches from a merger. +pub struct DedupReader { prev_batch_last_row: Option<(PkId, i64)>, current_batch_range: Option>, inner: T, } -impl DedupReader { - fn try_new(inner: T) -> Result { +impl DedupReader { + /// Creates a new dedup reader. + pub fn try_new(inner: T) -> Result { let mut res = Self { prev_batch_last_row: None, current_batch_range: None, @@ -57,24 +37,13 @@ impl DedupReader { res.next()?; Ok(res) } +} +impl DataBatchSource for DedupReader { fn is_valid(&self) -> bool { self.current_batch_range.is_some() } - /// Returns current encoded primary key. - /// # Panics - /// If inner reader is exhausted. - fn current_key(&self) -> &[u8] { - self.inner.current_key() - } - - fn current_data_batch(&self) -> DataBatch { - let range = self.current_batch_range.as_ref().unwrap(); - let data_batch = self.inner.current_data_batch(); - data_batch.slice(range.start, range.len()) - } - fn next(&mut self) -> Result<()> { loop { match &mut self.prev_batch_last_row { @@ -122,6 +91,20 @@ impl DedupReader { } Ok(()) } + + fn current_pk_id(&self) -> PkId { + self.inner.current_pk_id() + } + + fn current_key(&self) -> Option<&[u8]> { + self.inner.current_key() + } + + fn current_data_batch(&self) -> DataBatch { + let range = self.current_batch_range.as_ref().unwrap(); + let data_batch = self.inner.current_data_batch(); + data_batch.slice(range.start, range.len()) + } } #[cfg(test)] @@ -129,33 +112,35 @@ mod tests { use store_api::metadata::RegionMetadataRef; use super::*; - use crate::memtable::merge_tree::data::{ - write_rows_to_buffer, DataBuffer, DataParts, DataPartsReader, + use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DataPartsReader}; + use crate::test_util::memtable_util::{ + extract_data_batch, metadata_for_test, write_rows_to_buffer, }; - use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test}; - impl DedupSource for DataPartsReader { + struct MockSource(DataPartsReader); + + impl DataBatchSource for MockSource { fn is_valid(&self) -> bool { - self.is_valid() + self.0.is_valid() } fn next(&mut self) -> Result<()> { - self.next() + self.0.next() } fn current_pk_id(&self) -> PkId { PkId { shard_id: 0, - pk_index: self.current_data_batch().pk_index(), + pk_index: self.0.current_data_batch().pk_index(), } } - fn current_key(&self) -> &[u8] { - b"abcf" + fn current_key(&self) -> Option<&[u8]> { + None } fn current_data_batch(&self) -> DataBatch { - self.current_data_batch() + self.0.current_data_batch() } } @@ -194,7 +179,7 @@ mod tests { let mut parts = DataParts::new(meta, 10, true).with_frozen(frozens); let mut res = Vec::with_capacity(expected.len()); - let mut reader = DedupReader::try_new(parts.read().unwrap()).unwrap(); + let mut reader = DedupReader::try_new(MockSource(parts.read().unwrap())).unwrap(); while reader.is_valid() { let batch = reader.current_data_batch(); res.push(extract_data_batch(&batch)); diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index c2f5d170dc1e..543dca3d1c8e 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -80,7 +80,7 @@ impl KeyDictBuilder { if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() { // The write buffer is full. Freeze a dict block. - let dict_block = self.key_buffer.finish(); + let dict_block = self.key_buffer.finish(false); self.dict_blocks.push(dict_block); } @@ -113,8 +113,8 @@ impl KeyDictBuilder { return None; } - // Finishes current dict block. - let dict_block = self.key_buffer.finish(); + // Finishes current dict block and resets the pk index. + let dict_block = self.key_buffer.finish(true); self.dict_blocks.push(dict_block); // Takes the pk to index map. let mut pk_to_index = std::mem::take(&mut self.pk_to_index); @@ -317,12 +317,15 @@ impl KeyBuffer { .unwrap_or(0) } - fn finish(&mut self) -> DictBlock { + fn finish(&mut self, reset_index: bool) -> DictBlock { let primary_key = self.key_builder.finish(); // Reserve capacity for the new builder. `finish()` the builder will leave the builder // empty with capacity 0. // TODO(yingwen): Do we need to reserve capacity for data? self.key_builder = BinaryBuilder::with_capacity(primary_key.len(), 0); + if reset_index { + self.next_pk_index = 0; + } DictBlock::new(primary_key) } diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 428efa53e53f..6d124b3cfc09 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -28,8 +28,11 @@ use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::data::{DataBatch, DataParts, DATA_INIT_CAP}; +use crate::memtable::merge_tree::dedup::DedupReader; use crate::memtable::merge_tree::metrics::WriteMetrics; -use crate::memtable::merge_tree::shard::{Shard, ShardMerger, ShardNode, ShardSource}; +use crate::memtable::merge_tree::shard::{ + BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource, +}; use crate::memtable::merge_tree::shard_builder::ShardBuilder; use crate::memtable::merge_tree::{MergeTreeConfig, PkId}; use crate::read::{Batch, BatchBuilder}; @@ -41,6 +44,8 @@ pub type PartitionKey = u32; /// A tree partition. pub struct Partition { inner: RwLock, + /// Whether to dedup batches. + dedup: bool, } pub type PartitionRef = Arc; @@ -50,6 +55,7 @@ impl Partition { pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { Partition { inner: RwLock::new(Inner::new(metadata, config)), + dedup: config.dedup, } } @@ -113,16 +119,13 @@ impl Partition { }; // Creating a shard merger will invoke next so we do it outside of the lock. - let shard_merger = ShardMerger::try_new(nodes)?; - Ok(PartitionReader { - metadata: context.metadata, - row_codec: context.row_codec, - projection: context.projection, - filters: context.filters, - pk_weights: context.pk_weights, - shard_merger, - last_yield_pk_id: None, - }) + let merger = ShardMerger::try_new(nodes)?; + if self.dedup { + let source = DedupReader::try_new(merger)?; + PartitionReader::new(context, Box::new(source)) + } else { + PartitionReader::new(context, Box::new(merger)) + } } /// Freezes the partition. @@ -156,8 +159,8 @@ impl Partition { shard_builder, shards, num_rows: 0, - dedup: config.dedup, }), + dedup: self.dedup, } } @@ -214,25 +217,74 @@ pub struct PartitionReader { projection: HashSet, filters: Vec, pk_weights: Vec, - shard_merger: ShardMerger, + source: BoxedDataBatchSource, last_yield_pk_id: Option, } impl PartitionReader { + fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result { + let mut reader = Self { + metadata: context.metadata, + row_codec: context.row_codec, + projection: context.projection, + filters: context.filters, + pk_weights: context.pk_weights, + source, + last_yield_pk_id: None, + }; + // Find next valid batch. + reader.prune_batch_by_key()?; + + Ok(reader) + } + + /// Returns true if the reader is valid. pub fn is_valid(&self) -> bool { - self.shard_merger.is_valid() + self.source.is_valid() } + /// Advances the reader. + /// + /// # Panics + /// Panics if the reader is invalid. pub fn next(&mut self) -> Result<()> { - self.shard_merger.next()?; + self.source.next()?; + + self.prune_batch_by_key() + } + + /// Converts current data batch into a [Batch]. + /// + /// # Panics + /// Panics if the reader is invalid. + pub fn convert_current_batch(&self) -> Result { + let data_batch = self.source.current_data_batch(); + data_batch_to_batch( + &self.metadata, + &self.projection, + self.source.current_key(), + data_batch, + ) + } + + pub(crate) fn into_context(self) -> ReadPartitionContext { + ReadPartitionContext { + metadata: self.metadata, + row_codec: self.row_codec, + projection: self.projection, + filters: self.filters, + pk_weights: self.pk_weights, + } + } + fn prune_batch_by_key(&mut self) -> Result<()> { if self.metadata.primary_key.is_empty() { // Nothing to prune. return Ok(()); } - while self.shard_merger.is_valid() { - let pk_id = self.shard_merger.current_pk_id(); + while self.source.is_valid() { + let pk_id = self.source.current_pk_id(); if let Some(yield_pk_id) = self.last_yield_pk_id { if pk_id == yield_pk_id { // If this batch has the same key as last returned batch. @@ -240,38 +292,18 @@ impl PartitionReader { break; } } - let key = self.shard_merger.current_key().unwrap(); + let key = self.source.current_key().unwrap(); // Prune batch by primary key. if prune_primary_key(&self.metadata, &self.filters, &self.row_codec, key) { // We need this key. self.last_yield_pk_id = Some(pk_id); break; } - self.shard_merger.next()?; + self.source.next()?; } Ok(()) } - - pub fn convert_current_batch(&self) -> Result { - let data_batch = self.shard_merger.current_data_batch(); - data_batch_to_batch( - &self.metadata, - &self.projection, - self.shard_merger.current_key(), - data_batch, - ) - } - - pub(crate) fn into_context(self) -> ReadPartitionContext { - ReadPartitionContext { - metadata: self.metadata, - row_codec: self.row_codec, - projection: self.projection, - filters: self.filters, - pk_weights: self.pk_weights, - } - } } // TODO(yingwen): Improve performance of key prunning. Now we need to find index and @@ -400,7 +432,6 @@ struct Inner { /// Shards with frozen dictionary. shards: Vec, num_rows: usize, - dedup: bool, } impl Inner { @@ -417,7 +448,6 @@ impl Inner { shard_builder, shards, num_rows: 0, - dedup: config.dedup, } } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 81ce4cb408dd..8b83b2ad1e61 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -101,6 +101,33 @@ impl Shard { } } +/// Source that returns [DataBatch]. +pub trait DataBatchSource { + /// Returns whether current source is still valid. + fn is_valid(&self) -> bool; + + /// Advances source to next data batch. + fn next(&mut self) -> Result<()>; + + /// Returns current pk id. + /// # Panics + /// If source is not valid. + fn current_pk_id(&self) -> PkId; + + /// Returns the current primary key bytes or None if it doesn't have primary key. + /// + /// # Panics + /// If source is not valid. + fn current_key(&self) -> Option<&[u8]>; + + /// Returns the data part. + /// # Panics + /// If source is not valid. + fn current_data_batch(&self) -> DataBatch; +} + +pub type BoxedDataBatchSource = Box; + /// Reader to read rows in a shard. pub struct ShardReader { shard_id: ShardId, @@ -141,6 +168,7 @@ impl ShardReader { } } +/// A merger that merges batches from multiple shards. pub(crate) struct ShardMerger { merger: Merger, } @@ -150,24 +178,26 @@ impl ShardMerger { let merger = Merger::try_new(nodes)?; Ok(ShardMerger { merger }) } +} - pub(crate) fn is_valid(&self) -> bool { +impl DataBatchSource for ShardMerger { + fn is_valid(&self) -> bool { self.merger.is_valid() } - pub(crate) fn next(&mut self) -> Result<()> { + fn next(&mut self) -> Result<()> { self.merger.next() } - pub(crate) fn current_pk_id(&self) -> PkId { + fn current_pk_id(&self) -> PkId { self.merger.current_node().current_pk_id() } - pub(crate) fn current_key(&self) -> Option<&[u8]> { + fn current_key(&self) -> Option<&[u8]> { self.merger.current_node().current_key() } - pub(crate) fn current_data_batch(&self) -> DataBatch { + fn current_data_batch(&self) -> DataBatch { let batch = self.merger.current_node().current_data_batch(); batch.slice(0, self.merger.current_rows()) } diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 0653c1307d8d..da06f2b21668 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -25,7 +25,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder} use store_api::storage::RegionId; use crate::manifest::action::RegionEdit; -use crate::memtable::{MemtableBuilder, MemtableBuilderRef}; +use crate::memtable::MemtableBuilder; use crate::region::version::{Version, VersionBuilder, VersionControl}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; @@ -79,10 +79,6 @@ impl VersionControlBuilder { self.file_purger.clone() } - pub(crate) fn memtable_builder(&self) -> MemtableBuilderRef { - self.memtable_builder.clone() - } - pub(crate) fn push_l0_file(&mut self, start_ms: i64, end_ms: i64) -> &mut Self { let file_id = FileId::random(); self.files.insert( diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3f960f416a73..d579db35eacd 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -49,6 +49,7 @@ use crate::config::MitoConfig; use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::manifest::action::RegionEdit; +use crate::memtable::merge_tree::MergeTreeMemtableBuilder; use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::memtable::MemtableBuilderRef; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; @@ -323,6 +324,16 @@ impl WorkerStarter { let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); + let memtable_builder = if let Some(config) = &self.config.experimental_memtable { + Arc::new(MergeTreeMemtableBuilder::new( + config.clone(), + Some(self.write_buffer_manager.clone()), + )) as _ + } else { + Arc::new(TimeSeriesMemtableBuilder::new(Some( + self.write_buffer_manager.clone(), + ))) as _ + }; let mut worker_thread = RegionWorkerLoop { id: self.id, config: self.config, @@ -333,9 +344,7 @@ impl WorkerStarter { wal: Wal::new(self.log_store), object_store_manager: self.object_store_manager.clone(), running: running.clone(), - memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some( - self.write_buffer_manager.clone(), - ))), + memtable_builder, scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler.clone()),