From 8ca9e014557fe3ca413a5e7ef0913620648b7729 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 14 Mar 2024 19:13:01 +0800 Subject: [PATCH] feat: Partition memtables by time if compaction window is provided (#3501) * feat: define time partitions * feat: adapt time partitions to version * feat: implement non write methods * feat: add write one to memtable * feat: implement write * chore: fix warning * fix: inner not set * refactor: add collect_iter_timestamps * test: test partitions * chore: debug log * chore: fix typos * chore: log memtable id * fix: empty check * chore: log total parts * chore: update comments --- src/mito2/src/memtable.rs | 7 +- src/mito2/src/memtable/key_values.rs | 2 +- src/mito2/src/memtable/merge_tree.rs | 80 +--- src/mito2/src/memtable/merge_tree/tree.rs | 48 ++ src/mito2/src/memtable/time_partition.rs | 551 ++++++++++++++++++++++ src/mito2/src/memtable/time_series.rs | 100 ++-- src/mito2/src/memtable/version.rs | 32 +- src/mito2/src/region/opener.rs | 17 +- src/mito2/src/region/options.rs | 8 + src/mito2/src/region/version.rs | 34 +- src/mito2/src/test_util/memtable_util.rs | 23 + src/mito2/src/test_util/version_util.rs | 9 +- 12 files changed, 784 insertions(+), 127 deletions(-) create mode 100644 src/mito2/src/memtable/time_partition.rs diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index aa3d7e2bed71..8c9cd0172a0c 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -26,6 +26,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::flush::WriteBufferManagerRef; +use crate::memtable::key_values::KeyValue; pub use crate::memtable::key_values::KeyValues; use crate::memtable::merge_tree::MergeTreeConfig; use crate::metrics::WRITE_BUFFER_BYTES; @@ -33,6 +34,7 @@ use crate::read::Batch; pub mod key_values; pub mod merge_tree; +pub mod time_partition; pub mod time_series; pub(crate) mod version; @@ -82,9 +84,12 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Returns the id of this memtable. fn id(&self) -> MemtableId; - /// Write key values into the memtable. + /// Writes key values into the memtable. fn write(&self, kvs: &KeyValues) -> Result<()>; + /// Writes one key value pair into the memtable. + fn write_one(&self, key_value: KeyValue) -> Result<()>; + /// Scans the memtable. /// `projection` selects columns to read, `None` means reading all columns. /// `filters` are the predicates to be pushed down to memtable. diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 4986a81cb2c1..f1734e5a36e0 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -71,7 +71,7 @@ impl KeyValues { /// Primary key columns have the same order as region's primary key. Field /// columns are ordered by their position in the region schema (The same order /// as users defined while creating the region). -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct KeyValue<'a> { row: &'a Row, schema: &'a Vec, diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 1789959adfee..a916f4f9b496 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -36,6 +36,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::flush::WriteBufferManagerRef; +use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::tree::MergeTree; use crate::memtable::{ @@ -127,6 +128,17 @@ impl Memtable for MergeTreeMemtable { res } + fn write_one(&self, key_value: KeyValue) -> Result<()> { + let mut metrics = WriteMetrics::default(); + let mut pk_buffer = Vec::new(); + // Ensures the memtable always updates stats. + let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics); + + self.update_stats(&metrics); + + res + } + fn iter( &self, projection: Option<&[ColumnId]>, @@ -290,16 +302,14 @@ impl MemtableBuilder for MergeTreeMemtableBuilder { #[cfg(test)] mod tests { - use std::collections::BTreeSet; - use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::scalars::ScalarVector; - use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; + use datatypes::vectors::Int64Vector; use super::*; - use crate::test_util::memtable_util; + use crate::test_util::memtable_util::{self, collect_iter_timestamps}; #[test] fn test_memtable_sorted_input() { @@ -322,23 +332,10 @@ mod tests { let expected_ts = kvs .iter() .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value()) - .collect::>(); + .collect::>(); let iter = memtable.iter(None, None).unwrap(); - let read = iter - .flat_map(|batch| { - batch - .unwrap() - .timestamps() - .as_any() - .downcast_ref::() - .unwrap() - .iter_data() - .collect::>() - .into_iter() - }) - .map(|v| v.unwrap().0.value()) - .collect::>(); + let read = collect_iter_timestamps(iter); assert_eq!(expected_ts, read); let stats = memtable.stats(); @@ -386,20 +383,7 @@ mod tests { memtable.write(&kvs).unwrap(); let iter = memtable.iter(None, None).unwrap(); - let read = iter - .flat_map(|batch| { - batch - .unwrap() - .timestamps() - .as_any() - .downcast_ref::() - .unwrap() - .iter_data() - .collect::>() - .into_iter() - }) - .map(|v| v.unwrap().0.value()) - .collect::>(); + let read = collect_iter_timestamps(iter); assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read); let iter = memtable.iter(None, None).unwrap(); @@ -514,20 +498,7 @@ mod tests { let expect = data.into_iter().map(|x| x.2).collect::>(); let iter = memtable.iter(None, None).unwrap(); - let read = iter - .flat_map(|batch| { - batch - .unwrap() - .timestamps() - .as_any() - .downcast_ref::() - .unwrap() - .iter_data() - .collect::>() - .into_iter() - }) - .map(|v| v.unwrap().0.value()) - .collect::>(); + let read = collect_iter_timestamps(iter); assert_eq!(expect, read); } @@ -564,20 +535,7 @@ mod tests { let iter = memtable .iter(None, Some(Predicate::new(vec![expr.into()]))) .unwrap(); - let read = iter - .flat_map(|batch| { - batch - .unwrap() - .timestamps() - .as_any() - .downcast_ref::() - .unwrap() - .iter_data() - .collect::>() - .into_iter() - }) - .map(|v| v.unwrap().0.value()) - .collect::>(); + let read = collect_iter_timestamps(iter); assert_eq!(timestamps, read); } } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 0a42e13fdec3..a059643dd478 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -148,6 +148,54 @@ impl MergeTree { Ok(()) } + /// Write one key value pair into the tree. + /// + /// # Panics + /// Panics if the tree is immutable (frozen). + pub fn write_one( + &self, + kv: KeyValue, + pk_buffer: &mut Vec, + metrics: &mut WriteMetrics, + ) -> Result<()> { + let has_pk = !self.metadata.primary_key.is_empty(); + + ensure!( + kv.num_primary_keys() == self.row_codec.num_fields(), + PrimaryKeyLengthMismatchSnafu { + expect: self.row_codec.num_fields(), + actual: kv.num_primary_keys(), + } + ); + // Safety: timestamp of kv must be both present and a valid timestamp value. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); + metrics.min_ts = metrics.min_ts.min(ts); + metrics.max_ts = metrics.max_ts.max(ts); + metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::(); + + if !has_pk { + // No primary key. + return self.write_no_key(kv); + } + + // Encode primary key. + pk_buffer.clear(); + if self.is_partitioned { + // Use sparse encoder for metric engine. + self.sparse_encoder + .encode_to_vec(kv.primary_keys(), pk_buffer)?; + } else { + self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?; + } + + // Write rows with + self.write_with_key(pk_buffer, kv, metrics)?; + + metrics.value_bytes += std::mem::size_of::() + std::mem::size_of::(); + + Ok(()) + } + /// Scans the tree. pub fn read( &self, diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs new file mode 100644 index 000000000000..cba3ba3079c7 --- /dev/null +++ b/src/mito2/src/memtable/time_partition.rs @@ -0,0 +1,551 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Partitions memtables by time. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use common_telemetry::debug; +use common_time::timestamp::TimeUnit; +use common_time::timestamp_millis::BucketAligned; +use common_time::Timestamp; +use smallvec::{smallvec, SmallVec}; +use snafu::OptionExt; +use store_api::metadata::RegionMetadataRef; + +use crate::error::{InvalidRequestSnafu, Result}; +use crate::memtable::key_values::KeyValue; +use crate::memtable::version::SmallMemtableVec; +use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef}; + +/// A partition holds rows with timestamps between `[min, max)`. +#[derive(Debug, Clone)] +pub struct TimePartition { + /// Memtable of the partition. + memtable: MemtableRef, + /// Time range of the partition. `None` means there is no time range. The time + /// range is `None` if and only if the [TimePartitions::part_duration] is `None`. + time_range: Option, +} + +impl TimePartition { + /// Returns whether the `ts` belongs to the partition. + fn contains_timestamp(&self, ts: Timestamp) -> bool { + let Some(range) = self.time_range else { + return true; + }; + + range.contains_timestamp(ts) + } + + /// Write rows to the part. + fn write(&self, kvs: &KeyValues) -> Result<()> { + self.memtable.write(kvs) + } +} + +type PartitionVec = SmallVec<[TimePartition; 2]>; + +/// Partitions. +#[derive(Debug)] +pub struct TimePartitions { + /// Mutable data of partitions. + inner: Mutex, + /// Duration of a partition. + /// + /// `None` means there is only one partition and the [TimePartition::time_range] is + /// also `None`. + part_duration: Option, + /// Metadata of the region. + metadata: RegionMetadataRef, + /// Builder of memtables. + builder: MemtableBuilderRef, +} + +pub type TimePartitionsRef = Arc; + +impl TimePartitions { + /// Returns a new empty partition list with optional duration. + pub fn new( + metadata: RegionMetadataRef, + builder: MemtableBuilderRef, + next_memtable_id: MemtableId, + part_duration: Option, + ) -> Self { + let mut inner = PartitionsInner::new(next_memtable_id); + if part_duration.is_none() { + // If `part_duration` is None, then we create a partition with `None` time + // range so we will write all rows to that partition. + let memtable = builder.build(inner.alloc_memtable_id(), &metadata); + debug!( + "Creates a time partition for all timestamps, region: {}, memtable_id: {}", + metadata.region_id, + memtable.id(), + ); + let part = TimePartition { + memtable, + time_range: None, + }; + inner.parts.push(part); + } + + Self { + inner: Mutex::new(inner), + part_duration, + metadata, + builder, + } + } + + /// Write key values to memtables. + /// + /// It creates new partitions if necessary. + pub fn write(&self, kvs: &KeyValues) -> Result<()> { + // Get all parts. + let parts = self.list_partitions(); + + // Checks whether all rows belongs to a single part. Checks in reverse order as we usually + // put to latest part. + for part in parts.iter().rev() { + let mut all_in_partition = true; + for kv in kvs.iter() { + // Safety: We checked the schema in the write request. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap(); + if !part.contains_timestamp(ts) { + all_in_partition = false; + break; + } + } + if !all_in_partition { + continue; + } + + // We can write all rows to this part. + return part.write(kvs); + } + + // Slow path: We have to split kvs by partitions. + self.write_multi_parts(kvs, &parts) + } + + /// Append memtables in partitions to `memtables`. + pub fn list_memtables(&self, memtables: &mut Vec) { + let inner = self.inner.lock().unwrap(); + memtables.extend(inner.parts.iter().map(|part| part.memtable.clone())); + } + + /// Returns the number of partitions. + pub fn num_partitions(&self) -> usize { + let inner = self.inner.lock().unwrap(); + inner.parts.len() + } + + /// Returns true if all memtables are empty. + pub fn is_empty(&self) -> bool { + let inner = self.inner.lock().unwrap(); + inner.parts.iter().all(|part| part.memtable.is_empty()) + } + + /// Freezes all memtables. + pub fn freeze(&self) -> Result<()> { + let inner = self.inner.lock().unwrap(); + for part in &*inner.parts { + part.memtable.freeze()?; + } + Ok(()) + } + + /// Forks latest partition. + pub fn fork(&self, metadata: &RegionMetadataRef) -> Self { + let mut inner = self.inner.lock().unwrap(); + let latest_part = inner + .parts + .iter() + .max_by_key(|part| part.time_range.map(|range| range.min_timestamp)) + .cloned(); + + let Some(old_part) = latest_part else { + return Self::new( + metadata.clone(), + self.builder.clone(), + inner.next_memtable_id, + self.part_duration, + ); + }; + let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata); + let new_part = TimePartition { + memtable, + time_range: old_part.time_range, + }; + Self { + inner: Mutex::new(PartitionsInner::with_partition( + new_part, + inner.next_memtable_id, + )), + part_duration: self.part_duration, + metadata: metadata.clone(), + builder: self.builder.clone(), + } + } + + /// Returns partition duration. + pub(crate) fn part_duration(&self) -> Option { + self.part_duration + } + + /// Returns memory usage. + pub(crate) fn memory_usage(&self) -> usize { + let inner = self.inner.lock().unwrap(); + inner + .parts + .iter() + .map(|part| part.memtable.stats().estimated_bytes) + .sum() + } + + /// Append memtables in partitions to small vec. + pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) { + let inner = self.inner.lock().unwrap(); + memtables.extend(inner.parts.iter().map(|part| part.memtable.clone())); + } + + /// Returns the next memtable id. + pub(crate) fn next_memtable_id(&self) -> MemtableId { + let inner = self.inner.lock().unwrap(); + inner.next_memtable_id + } + + /// Returns all partitions. + fn list_partitions(&self) -> PartitionVec { + let inner = self.inner.lock().unwrap(); + inner.parts.clone() + } + + /// Write to multiple partitions. + fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> { + // If part duration is `None` then there is always one partition and all rows + // will be put in that partition before invoking this method. + debug_assert!(self.part_duration.is_some()); + + let mut parts_to_write = HashMap::new(); + let mut missing_parts = HashMap::new(); + for kv in kvs.iter() { + let mut part_found = false; + // Safety: We used the timestamp before. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap(); + for part in parts { + if part.contains_timestamp(ts) { + // Safety: Since part duration is `Some` so all time range should be `Some`. + parts_to_write + .entry(part.time_range.unwrap().min_timestamp) + .or_insert_with(|| PartitionToWrite { + partition: part.clone(), + key_values: Vec::new(), + }) + .key_values + .push(kv); + part_found = true; + break; + } + } + + if !part_found { + // We need to write it to a new part. + // Safety: `new()` ensures duration is always Some if we do to this method. + let part_duration = self.part_duration.unwrap(); + let part_start = + partition_start_timestamp(ts, part_duration).with_context(|| { + InvalidRequestSnafu { + region_id: self.metadata.region_id, + reason: format!( + "timestamp {ts:?} and bucket {part_duration:?} are out of range" + ), + } + })?; + missing_parts + .entry(part_start) + .or_insert_with(Vec::new) + .push(kv); + } + } + + // Writes rows to existing parts. + for part_to_write in parts_to_write.into_values() { + for kv in part_to_write.key_values { + part_to_write.partition.memtable.write_one(kv)?; + } + } + + let part_duration = self.part_duration.unwrap(); + // Creates new parts and writes to them. Acquires the lock to avoid others create + // the same partition. + let mut inner = self.inner.lock().unwrap(); + for (part_start, key_values) in missing_parts { + let part_pos = match inner + .parts + .iter() + .position(|part| part.time_range.unwrap().min_timestamp == part_start) + { + Some(pos) => pos, + None => { + let range = PartTimeRange::from_start_duration(part_start, part_duration) + .with_context(|| InvalidRequestSnafu { + region_id: self.metadata.region_id, + reason: format!( + "Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}", + ), + })?; + let memtable = self + .builder + .build(inner.alloc_memtable_id(), &self.metadata); + debug!( + "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}", + range, + self.metadata.region_id, + part_duration, + memtable.id(), + inner.parts.len() + 1 + ); + let pos = inner.parts.len(); + inner.parts.push(TimePartition { + memtable, + time_range: Some(range), + }); + pos + } + }; + + let memtable = &inner.parts[part_pos].memtable; + for kv in key_values { + memtable.write_one(kv)?; + } + } + + Ok(()) + } +} + +/// Computes the start timestamp of the partition for `ts`. +/// +/// It always use bucket size in seconds which should fit all timestamp resolution. +fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option { + // Safety: We convert it to seconds so it never returns `None`. + let ts_sec = ts.convert_to(TimeUnit::Second).unwrap(); + let bucket_sec: i64 = bucket.as_secs().try_into().ok()?; + let start_sec = ts_sec.align_by_bucket(bucket_sec)?; + start_sec.convert_to(ts.unit()) +} + +#[derive(Debug)] +struct PartitionsInner { + /// All partitions. + parts: PartitionVec, + /// Next memtable id. + next_memtable_id: MemtableId, +} + +impl PartitionsInner { + fn new(next_memtable_id: MemtableId) -> Self { + Self { + parts: Default::default(), + next_memtable_id, + } + } + + fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self { + Self { + parts: smallvec![part], + next_memtable_id, + } + } + + fn alloc_memtable_id(&mut self) -> MemtableId { + let id = self.next_memtable_id; + self.next_memtable_id += 1; + id + } +} + +/// Time range of a partition. +#[derive(Debug, Clone, Copy)] +struct PartTimeRange { + /// Inclusive min timestamp of rows in the partition. + min_timestamp: Timestamp, + /// Exclusive max timestamp of rows in the partition. + max_timestamp: Timestamp, +} + +impl PartTimeRange { + fn from_start_duration(start: Timestamp, duration: Duration) -> Option { + let start_sec = start.convert_to(TimeUnit::Second)?; + let end_sec = start_sec.add_duration(duration).ok()?; + let min_timestamp = start_sec.convert_to(start.unit())?; + let max_timestamp = end_sec.convert_to(start.unit())?; + + Some(Self { + min_timestamp, + max_timestamp, + }) + } + + /// Returns whether the `ts` belongs to the partition. + fn contains_timestamp(&self, ts: Timestamp) -> bool { + self.min_timestamp <= ts && ts < self.max_timestamp + } +} + +struct PartitionToWrite<'a> { + partition: TimePartition, + key_values: Vec>, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::memtable::merge_tree::MergeTreeMemtableBuilder; + use crate::test_util::memtable_util::{self, collect_iter_timestamps}; + + #[test] + fn test_no_duration() { + let metadata = memtable_util::metadata_for_test(); + let builder = Arc::new(MergeTreeMemtableBuilder::default()); + let partitions = TimePartitions::new(metadata.clone(), builder, 0, None); + assert_eq!(1, partitions.num_partitions()); + assert!(partitions.is_empty()); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[1000, 3000, 7000, 5000, 6000], + 0, // sequence 0, 1, 2, 3, 4 + ); + partitions.write(&kvs).unwrap(); + + assert_eq!(1, partitions.num_partitions()); + assert!(!partitions.is_empty()); + assert!(!partitions.is_empty()); + let mut memtables = Vec::new(); + partitions.list_memtables(&mut memtables); + + let iter = memtables[0].iter(None, None).unwrap(); + let timestamps = collect_iter_timestamps(iter); + assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]); + } + + #[test] + fn test_write_single_part() { + let metadata = memtable_util::metadata_for_test(); + let builder = Arc::new(MergeTreeMemtableBuilder::default()); + let partitions = + TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10))); + assert_eq!(0, partitions.num_partitions()); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[5000, 2000, 0], + 0, // sequence 0, 1, 2 + ); + // It should creates a new partition. + partitions.write(&kvs).unwrap(); + assert_eq!(1, partitions.num_partitions()); + assert!(!partitions.is_empty()); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[3000, 7000, 4000], + 3, // sequence 3, 4, 5 + ); + // Still writes to the same partition. + partitions.write(&kvs).unwrap(); + assert_eq!(1, partitions.num_partitions()); + + let mut memtables = Vec::new(); + partitions.list_memtables(&mut memtables); + let iter = memtables[0].iter(None, None).unwrap(); + let timestamps = collect_iter_timestamps(iter); + assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]); + let parts = partitions.list_partitions(); + assert_eq!( + Timestamp::new_millisecond(0), + parts[0].time_range.unwrap().min_timestamp + ); + assert_eq!( + Timestamp::new_millisecond(10000), + parts[0].time_range.unwrap().max_timestamp + ); + } + + #[test] + fn test_write_multi_parts() { + let metadata = memtable_util::metadata_for_test(); + let builder = Arc::new(MergeTreeMemtableBuilder::default()); + let partitions = + TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5))); + assert_eq!(0, partitions.num_partitions()); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[2000, 0], + 0, // sequence 0, 1 + ); + // It should creates a new partition. + partitions.write(&kvs).unwrap(); + assert_eq!(1, partitions.num_partitions()); + assert!(!partitions.is_empty()); + + let kvs = memtable_util::build_key_values( + &metadata, + "hello".to_string(), + 0, + &[3000, 7000, 4000, 5000], + 2, // sequence 2, 3, 4, 5 + ); + // Writes 2 rows to the old partition and 1 row to a new partition. + partitions.write(&kvs).unwrap(); + assert_eq!(2, partitions.num_partitions()); + + let parts = partitions.list_partitions(); + let iter = parts[0].memtable.iter(None, None).unwrap(); + let timestamps = collect_iter_timestamps(iter); + assert_eq!( + Timestamp::new_millisecond(0), + parts[0].time_range.unwrap().min_timestamp + ); + assert_eq!( + Timestamp::new_millisecond(5000), + parts[0].time_range.unwrap().max_timestamp + ); + assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]); + let iter = parts[1].memtable.iter(None, None).unwrap(); + let timestamps = collect_iter_timestamps(iter); + assert_eq!(&[5000, 7000], ×tamps[..]); + assert_eq!( + Timestamp::new_millisecond(5000), + parts[1].time_range.unwrap().min_timestamp + ); + assert_eq!( + Timestamp::new_millisecond(10000), + parts[1].time_range.unwrap().max_timestamp + ); + } +} diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 38ad4f328a34..c31f9bea7bcf 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -38,6 +38,7 @@ use table::predicate::Predicate; use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; use crate::flush::WriteBufferManagerRef; +use crate::memtable::key_values::KeyValue; use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, @@ -110,49 +111,75 @@ impl TimeSeriesMemtable { } /// Updates memtable stats. - fn update_stats(&self, request_size: usize, min: i64, max: i64) { - self.alloc_tracker.on_allocation(request_size); + fn update_stats(&self, stats: LocalStats) { + self.alloc_tracker.on_allocation(stats.allocated); loop { let current_min = self.min_timestamp.load(Ordering::Relaxed); - if min >= current_min { + if stats.min_ts >= current_min { break; } let Err(updated) = self.min_timestamp.compare_exchange( current_min, - min, + stats.min_ts, Ordering::Relaxed, Ordering::Relaxed, ) else { break; }; - if updated == min { + if updated == stats.min_ts { break; } } loop { let current_max = self.max_timestamp.load(Ordering::Relaxed); - if max <= current_max { + if stats.max_ts <= current_max { break; } let Err(updated) = self.max_timestamp.compare_exchange( current_max, - max, + stats.max_ts, Ordering::Relaxed, Ordering::Relaxed, ) else { break; }; - if updated == max { + if updated == stats.max_ts { break; } } } + + fn write_key_value(&self, kv: KeyValue, stats: &mut LocalStats) -> Result<()> { + ensure!( + kv.num_primary_keys() == self.row_codec.num_fields(), + PrimaryKeyLengthMismatchSnafu { + expect: self.row_codec.num_fields(), + actual: kv.num_primary_keys() + } + ); + let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; + let fields = kv.fields().collect::>(); + + stats.allocated += fields.iter().map(|v| v.data_size()).sum::(); + let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded); + stats.allocated += series_allocated; + + // safety: timestamp of kv must be both present and a valid timestamp value. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); + stats.min_ts = stats.min_ts.min(ts); + stats.max_ts = stats.max_ts.max(ts); + + let mut guard = series.write().unwrap(); + guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields); + + Ok(()) + } } impl Debug for TimeSeriesMemtable { @@ -167,43 +194,30 @@ impl Memtable for TimeSeriesMemtable { } fn write(&self, kvs: &KeyValues) -> Result<()> { - let mut allocated = 0; - let mut min_ts = i64::MAX; - let mut max_ts = i64::MIN; + let mut local_stats = LocalStats::default(); for kv in kvs.iter() { - ensure!( - kv.num_primary_keys() == self.row_codec.num_fields(), - PrimaryKeyLengthMismatchSnafu { - expect: self.row_codec.num_fields(), - actual: kv.num_primary_keys() - } - ); - let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; - let fields = kv.fields().collect::>(); - - allocated += fields.iter().map(|v| v.data_size()).sum::(); - let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded); - allocated += series_allocated; - - // safety: timestamp of kv must be both present and a valid timestamp value. - let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); - min_ts = min_ts.min(ts); - max_ts = max_ts.max(ts); - - let mut guard = series.write().unwrap(); - guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields); + self.write_key_value(kv, &mut local_stats)?; } - allocated += kvs.num_rows() * std::mem::size_of::(); - allocated += kvs.num_rows() * std::mem::size_of::(); + local_stats.allocated += kvs.num_rows() * std::mem::size_of::(); + local_stats.allocated += kvs.num_rows() * std::mem::size_of::(); // TODO(hl): this maybe inaccurate since for-iteration may return early. // We may lift the primary key length check out of Memtable::write // so that we can ensure writing to memtable will succeed. - self.update_stats(allocated, min_ts, max_ts); + self.update_stats(local_stats); Ok(()) } + fn write_one(&self, key_value: KeyValue) -> Result<()> { + let mut local_stats = LocalStats::default(); + let res = self.write_key_value(key_value, &mut local_stats); + local_stats.allocated += std::mem::size_of::() + std::mem::size_of::(); + + self.update_stats(local_stats); + res + } + fn iter( &self, projection: Option<&[ColumnId]>, @@ -267,6 +281,22 @@ impl Memtable for TimeSeriesMemtable { } } +struct LocalStats { + allocated: usize, + min_ts: i64, + max_ts: i64, +} + +impl Default for LocalStats { + fn default() -> Self { + LocalStats { + allocated: 0, + min_ts: i64::MAX, + max_ts: i64::MIN, + } + } +} + type SeriesRwLockMap = RwLock, Arc>>>; struct SeriesSet { diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index c12437052144..9e18edc67345 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -20,26 +20,29 @@ use smallvec::SmallVec; use store_api::metadata::RegionMetadataRef; use crate::error::Result; +use crate::memtable::time_partition::TimePartitionsRef; use crate::memtable::{MemtableId, MemtableRef}; +pub(crate) type SmallMemtableVec = SmallVec<[MemtableRef; 2]>; + /// A version of current memtables in a region. #[derive(Debug, Clone)] pub(crate) struct MemtableVersion { /// Mutable memtable. - pub(crate) mutable: MemtableRef, + pub(crate) mutable: TimePartitionsRef, /// Immutable memtables. /// /// We only allow one flush job per region but if a flush job failed, then we /// might need to store more than one immutable memtable on the next time we /// flush the region. - immutables: SmallVec<[MemtableRef; 2]>, + immutables: SmallMemtableVec, } pub(crate) type MemtableVersionRef = Arc; impl MemtableVersion { /// Returns a new [MemtableVersion] with specific mutable memtable. - pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion { + pub(crate) fn new(mutable: TimePartitionsRef) -> MemtableVersion { MemtableVersion { mutable, immutables: SmallVec::new(), @@ -53,8 +56,8 @@ impl MemtableVersion { /// Lists mutable and immutable memtables. pub(crate) fn list_memtables(&self) -> Vec { - let mut mems = Vec::with_capacity(self.immutables.len() + 1); - mems.push(self.mutable.clone()); + let mut mems = Vec::with_capacity(self.immutables.len() + self.mutable.num_partitions()); + self.mutable.list_memtables(&mut mems); mems.extend_from_slice(&self.immutables); mems } @@ -76,15 +79,13 @@ impl MemtableVersion { // soft limit. self.mutable.freeze()?; // Fork the memtable. - let mutable = self.mutable.fork(self.next_memtable_id(), metadata); + let mutable = Arc::new(self.mutable.fork(metadata)); // Pushes the mutable memtable to immutable list. - let immutables = self - .immutables - .iter() - .cloned() - .chain([self.mutable.clone()]) - .collect(); + let mut immutables = + SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions()); + self.mutable.list_memtables_to_small_vec(&mut immutables); + immutables.extend(self.immutables.iter().cloned()); Ok(Some(MemtableVersion { mutable, immutables, @@ -103,7 +104,7 @@ impl MemtableVersion { /// Returns the memory usage of the mutable memtable. pub(crate) fn mutable_usage(&self) -> usize { - self.mutable.stats().estimated_bytes + self.mutable.memory_usage() } /// Returns the memory usage of the immutable memtables. @@ -121,9 +122,4 @@ impl MemtableVersion { pub(crate) fn is_empty(&self) -> bool { self.mutable.is_empty() && self.immutables.is_empty() } - - /// Returns the next memtable id. - pub(crate) fn next_memtable_id(&self) -> MemtableId { - self.mutable.id() + 1 - } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 75d0b9dcfb3f..5192c55469ff 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -37,6 +37,7 @@ use crate::error::{ }; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; +use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderRef; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; @@ -169,7 +170,13 @@ impl RegionOpener { RegionManifestManager::new(metadata.clone(), region_manifest_options).await?; // Initial memtable id is 0. - let mutable = self.memtable_builder.build(0, &metadata); + let part_duration = options.compaction.time_window(); + let mutable = Arc::new(TimePartitions::new( + metadata.clone(), + self.memtable_builder, + 0, + part_duration, + )); debug!("Create region {} with options: {:?}", region_id, options); @@ -265,7 +272,13 @@ impl RegionOpener { self.cache_manager.clone(), )); // Initial memtable id is 0. - let mutable = self.memtable_builder.build(0, &metadata); + let part_duration = region_options.compaction.time_window(); + let mutable = Arc::new(TimePartitions::new( + metadata.clone(), + self.memtable_builder.clone(), + 0, + part_duration, + )); let version = VersionBuilder::new(metadata, mutable) .add_files(file_purger.clone(), manifest.files.values().cloned()) .flushed_entry_id(manifest.flushed_entry_id) diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index e69c7193ff50..1667b5757303 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -94,6 +94,14 @@ pub enum CompactionOptions { Twcs(TwcsOptions), } +impl CompactionOptions { + pub(crate) fn time_window(&self) -> Option { + match self { + CompactionOptions::Twcs(opts) => opts.time_window, + } + } +} + impl Default for CompactionOptions { fn default() -> Self { Self::Twcs(TwcsOptions::default()) diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 261371640bc5..fa95255c1a5c 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -31,8 +31,9 @@ use store_api::storage::SequenceNumber; use crate::error::Result; use crate::manifest::action::RegionEdit; +use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef}; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; -use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef}; +use crate::memtable::{MemtableBuilderRef, MemtableId}; use crate::region::options::RegionOptions; use crate::sst::file::FileMeta; use crate::sst::file_purger::FilePurgerRef; @@ -122,8 +123,14 @@ impl VersionControl { /// Mark all opened files as deleted and set the delete marker in [VersionControlData] pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) { let version = self.current().version; - let new_mutable = - memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata); + let part_duration = version.memtables.mutable.part_duration(); + let next_memtable_id = version.memtables.mutable.next_memtable_id(); + let new_mutable = Arc::new(TimePartitions::new( + version.metadata.clone(), + memtable_builder.clone(), + next_memtable_id, + part_duration, + )); let mut data = self.data.write().unwrap(); data.is_dropped = true; @@ -140,7 +147,14 @@ impl VersionControl { /// new schema. Memtables of the version must be empty. pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) { let version = self.current().version; - let new_mutable = builder.build(version.memtables.next_memtable_id(), &metadata); + let part_duration = version.memtables.mutable.part_duration(); + let next_memtable_id = version.memtables.mutable.next_memtable_id(); + let new_mutable = Arc::new(TimePartitions::new( + metadata.clone(), + builder.clone(), + next_memtable_id, + part_duration, + )); debug_assert!(version.memtables.mutable.is_empty()); debug_assert!(version.memtables.immutables().is_empty()); let new_version = Arc::new( @@ -163,8 +177,14 @@ impl VersionControl { ) { let version = self.current().version; - let new_mutable = - memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata); + let part_duration = version.memtables.mutable.part_duration(); + let next_memtable_id = version.memtables.mutable.next_memtable_id(); + let new_mutable = Arc::new(TimePartitions::new( + version.metadata.clone(), + memtable_builder.clone(), + next_memtable_id, + part_duration, + )); let new_version = Arc::new( VersionBuilder::new(version.metadata.clone(), new_mutable) .flushed_entry_id(truncated_entry_id) @@ -242,7 +262,7 @@ pub(crate) struct VersionBuilder { impl VersionBuilder { /// Returns a new builder. - pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> Self { + pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self { VersionBuilder { metadata, memtables: Arc::new(MemtableVersion::new(mutable)), diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 38108dff3c12..3fe378b099a0 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -21,7 +21,9 @@ use api::v1::value::ValueData; use api::v1::{Row, Rows, SemanticType}; use datatypes::arrow::array::UInt64Array; use datatypes::data_type::ConcreteDataType; +use datatypes::scalars::ScalarVector; use datatypes::schema::ColumnSchema; +use datatypes::vectors::TimestampMillisecondVector; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use table::predicate::Predicate; @@ -58,6 +60,10 @@ impl Memtable for EmptyMemtable { Ok(()) } + fn write_one(&self, _key_value: KeyValue) -> Result<()> { + Ok(()) + } + fn iter( &self, _projection: Option<&[ColumnId]>, @@ -303,3 +309,20 @@ pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec { ]); row_codec.encode(key_value.primary_keys()).unwrap() } + +/// Collects timestamps from the batch iter. +pub(crate) fn collect_iter_timestamps(iter: BoxedBatchIterator) -> Vec { + iter.flat_map(|batch| { + batch + .unwrap() + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .collect::>() + .into_iter() + }) + .map(|v| v.unwrap().0.value()) + .collect() +} diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index da06f2b21668..c6e2f45e0b71 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -25,7 +25,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder} use store_api::storage::RegionId; use crate::manifest::action::RegionEdit; -use crate::memtable::MemtableBuilder; +use crate::memtable::time_partition::TimePartitions; use crate::region::version::{Version, VersionBuilder, VersionControl}; use crate::sst::file::{FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; @@ -101,7 +101,12 @@ impl VersionControlBuilder { pub(crate) fn build_version(&self) -> Version { let metadata = Arc::new(self.metadata.clone()); - let mutable = self.memtable_builder.build(0, &metadata); + let mutable = Arc::new(TimePartitions::new( + metadata.clone(), + self.memtable_builder.clone(), + 0, + None, + )); VersionBuilder::new(metadata, mutable) .add_files(self.file_purger.clone(), self.files.values().cloned()) .build()