From b64c075cdb6d1caf3c1c22b19ecf006963ca9979 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 15 Jan 2025 14:16:53 +0800 Subject: [PATCH] feat: introduce `PrimaryKeyEncoding` (#5312) * feat: introduce `PrimaryKeyEncoding` * fix: fix unit tests * chore: add empty line * test: add unit tests * chore: fmt code * refactor: introduce new codec trait to support various encoding * fix: fix unit tests * chore: update sqlness result * chore: apply suggestions from CR * chore: apply suggestions from CR --- src/mito2/benches/memtable_bench.rs | 17 +- src/mito2/src/engine/basic_test.rs | 2 +- src/mito2/src/manifest/manager.rs | 2 +- src/mito2/src/manifest/tests/checkpoint.rs | 2 +- src/mito2/src/memtable/bulk/context.rs | 4 +- src/mito2/src/memtable/bulk/part.rs | 12 +- src/mito2/src/memtable/partition_tree.rs | 59 +- .../src/memtable/partition_tree/partition.rs | 96 ++- .../src/memtable/partition_tree/shard.rs | 11 +- .../memtable/partition_tree/shard_builder.rs | 8 +- src/mito2/src/memtable/partition_tree/tree.rs | 30 +- src/mito2/src/memtable/time_series.rs | 24 +- src/mito2/src/read/compat.rs | 14 +- src/mito2/src/read/projection.rs | 16 +- src/mito2/src/region/opener.rs | 46 +- src/mito2/src/region/options.rs | 28 +- src/mito2/src/row_converter.rs | 639 ++-------------- src/mito2/src/row_converter/dense.rs | 679 ++++++++++++++++++ .../src/sst/index/bloom_filter/creator.rs | 4 +- src/mito2/src/sst/index/codec.rs | 11 +- .../src/sst/index/inverted_index/creator.rs | 4 +- src/mito2/src/sst/parquet/file_range.rs | 6 +- src/mito2/src/sst/parquet/format.rs | 10 +- src/mito2/src/sst/parquet/reader.rs | 10 +- src/mito2/src/test_util/memtable_util.rs | 11 +- src/mito2/src/test_util/sst_util.rs | 4 +- src/mito2/src/worker/handle_create.rs | 7 +- src/store-api/src/codec.rs | 26 + src/store-api/src/lib.rs | 1 + src/store-api/src/metadata.rs | 32 + .../region_statistics.result | 2 +- 31 files changed, 1042 insertions(+), 775 deletions(-) create mode 100644 src/mito2/src/row_converter/dense.rs create mode 100644 src/store-api/src/codec.rs diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 4309520cdd0c..74ff58a8ec1f 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -25,6 +25,7 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable use mito2::memtable::time_series::TimeSeriesMemtable; use mito2::memtable::{KeyValues, Memtable}; use mito2::region::options::MergeMode; +use mito2::row_converter::DensePrimaryKeyCodec; use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema}; use rand::rngs::ThreadRng; use rand::seq::SliceRandom; @@ -43,8 +44,14 @@ fn write_rows(c: &mut Criterion) { // Note that this test only generate one time series. let mut group = c.benchmark_group("write"); group.bench_function("partition_tree", |b| { - let memtable = - PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default()); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let memtable = PartitionTreeMemtable::new( + 1, + codec, + metadata.clone(), + None, + &PartitionTreeConfig::default(), + ); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); b.iter(|| { @@ -71,7 +78,8 @@ fn full_scan(c: &mut Criterion) { let mut group = c.benchmark_group("full_scan"); group.sample_size(10); group.bench_function("partition_tree", |b| { - let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let memtable = PartitionTreeMemtable::new(1, codec, metadata.clone(), None, &config); for kvs in generator.iter() { memtable.write(&kvs).unwrap(); } @@ -108,7 +116,8 @@ fn filter_1_host(c: &mut Criterion) { let mut group = c.benchmark_group("filter_1_host"); group.sample_size(10); group.bench_function("partition_tree", |b| { - let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let memtable = PartitionTreeMemtable::new(1, codec, metadata.clone(), None, &config); for kvs in generator.iter() { memtable.write(&kvs).unwrap(); } diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 534c3bea3012..10be1d269fff 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -553,7 +553,7 @@ async fn test_region_usage() { // region is empty now, check manifest size let region = engine.get_region(region_id).unwrap(); let region_stat = region.region_statistic(); - assert_eq!(region_stat.manifest_size, 686); + assert_eq!(region_stat.manifest_size, 717); // put some rows let rows = Rows { diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 1ef69805419a..b77111d570a0 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -592,6 +592,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_usage(); - assert_eq!(manifest_size, 1173); + assert_eq!(manifest_size, 1204); } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 7e20bc2ced98..332d94be12d3 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -154,7 +154,7 @@ async fn manager_with_checkpoint_distance_1() { .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); let expected_json = - "{\"size\":848,\"version\":10,\"checksum\":4186457347,\"extend_metadata\":{}}"; + "{\"size\":879,\"version\":10,\"checksum\":2245967096,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 8929ae4383b2..a3c019ebec40 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::row_converter::McmpRowCodec; +use crate::row_converter::DensePrimaryKeyCodec; use crate::sst::parquet::file_range::RangeBase; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::SimpleFilterContext; @@ -41,7 +41,7 @@ impl BulkIterContext { projection: &Option<&[ColumnId]>, predicate: Option, ) -> Self { - let codec = McmpRowCodec::new_with_primary_keys(®ion_metadata); + let codec = DensePrimaryKeyCodec::new(®ion_metadata); let simple_filters = predicate .as_ref() diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 89147abf1e8d..2de5f841af1f 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -48,7 +48,7 @@ use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::BulkPartIter; use crate::memtable::key_values::KeyValuesRef; use crate::memtable::BoxedBatchIterator; -use crate::row_converter::{McmpRowCodec, RowCodec}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; @@ -103,7 +103,7 @@ pub struct BulkPartMeta { pub struct BulkPartEncoder { metadata: RegionMetadataRef, - pk_encoder: McmpRowCodec, + pk_encoder: DensePrimaryKeyCodec, row_group_size: usize, dedup: bool, writer_props: Option, @@ -115,7 +115,7 @@ impl BulkPartEncoder { dedup: bool, row_group_size: usize, ) -> BulkPartEncoder { - let codec = McmpRowCodec::new_with_primary_keys(&metadata); + let codec = DensePrimaryKeyCodec::new(&metadata); let writer_props = Some( WriterProperties::builder() .set_write_batch_size(row_group_size) @@ -174,7 +174,7 @@ impl BulkPartEncoder { fn mutations_to_record_batch( mutations: &[Mutation], metadata: &RegionMetadataRef, - pk_encoder: &McmpRowCodec, + pk_encoder: &DensePrimaryKeyCodec, dedup: bool, ) -> Result> { let total_rows: usize = mutations @@ -538,7 +538,7 @@ mod tests { .map(|r| r.rows.len()) .sum(); - let pk_encoder = McmpRowCodec::new_with_primary_keys(&metadata); + let pk_encoder = DensePrimaryKeyCodec::new(&metadata); let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup) .unwrap() @@ -557,7 +557,7 @@ mod tests { let batch_values = batches .into_iter() .map(|b| { - let pk_values = pk_encoder.decode(b.primary_key()).unwrap(); + let pk_values = pk_encoder.decode_dense(b.primary_key()).unwrap(); let timestamps = b .timestamps() .as_any() diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index d38623208300..458d6a6d69c5 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -28,7 +28,9 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use common_base::readable_size::ReadableSize; +pub(crate) use partition::DensePrimaryKeyFilter; use serde::{Deserialize, Serialize}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; @@ -43,6 +45,7 @@ use crate::memtable::{ MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, }; use crate::region::options::MergeMode; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec}; /// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size. pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8; @@ -263,13 +266,14 @@ impl PartitionTreeMemtable { /// Returns a new memtable. pub fn new( id: MemtableId, + row_codec: Arc, metadata: RegionMetadataRef, write_buffer_manager: Option, config: &PartitionTreeConfig, ) -> Self { Self::with_tree( id, - PartitionTree::new(metadata, config, write_buffer_manager.clone()), + PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()), ) } @@ -320,12 +324,22 @@ impl PartitionTreeMemtableBuilder { impl MemtableBuilder for PartitionTreeMemtableBuilder { fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(PartitionTreeMemtable::new( - id, - metadata.clone(), - self.write_buffer_manager.clone(), - &self.config, - )) + match metadata.primary_key_encoding { + PrimaryKeyEncoding::Dense => { + let codec = Arc::new(DensePrimaryKeyCodec::new(metadata)); + Arc::new(PartitionTreeMemtable::new( + id, + codec, + metadata.clone(), + self.write_buffer_manager.clone(), + &self.config, + )) + } + PrimaryKeyEncoding::Sparse => { + //TODO(weny): Implement sparse primary key encoding. + todo!() + } + } } } @@ -358,7 +372,7 @@ mod tests { use store_api::storage::RegionId; use super::*; - use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::test_util::memtable_util::{ self, collect_iter_timestamps, region_metadata_to_row_schema, }; @@ -378,8 +392,14 @@ mod tests { let timestamps = (0..100).collect::>(); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); - let memtable = - PartitionTreeMemtable::new(1, metadata, None, &PartitionTreeConfig::default()); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let memtable = PartitionTreeMemtable::new( + 1, + codec, + metadata.clone(), + None, + &PartitionTreeConfig::default(), + ); memtable.write(&kvs).unwrap(); let expected_ts = kvs @@ -414,8 +434,14 @@ mod tests { } else { memtable_util::metadata_with_primary_key(vec![], false) }; - let memtable = - PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default()); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + let memtable = PartitionTreeMemtable::new( + 1, + codec, + metadata.clone(), + None, + &PartitionTreeConfig::default(), + ); let kvs = memtable_util::build_key_values( &metadata, @@ -510,8 +536,10 @@ mod tests { fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) { let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); let memtable = PartitionTreeMemtable::new( 1, + codec, metadata.clone(), None, &PartitionTreeConfig { @@ -719,12 +747,7 @@ mod tests { ) .build(1, &metadata); - let codec = McmpRowCodec::new( - metadata - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect(), - ); + let codec = DensePrimaryKeyCodec::new(&metadata); memtable .write(&build_key_values( diff --git a/src/mito2/src/memtable/partition_tree/partition.rs b/src/mito2/src/memtable/partition_tree/partition.rs index 9df076c2a8b8..d527f581f9fe 100644 --- a/src/mito2/src/memtable/partition_tree/partition.rs +++ b/src/mito2/src/memtable/partition_tree/partition.rs @@ -22,6 +22,7 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::ColumnId; @@ -38,7 +39,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PkId}; use crate::memtable::stats::WriteMetrics; use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; use crate::read::{Batch, BatchBuilder}; -use crate::row_converter::{McmpRowCodec, RowCodec}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyFilter}; /// Key of a partition. pub type PartitionKey = u32; @@ -65,7 +66,7 @@ impl Partition { pub fn write_with_key( &self, primary_key: &mut Vec, - row_codec: &McmpRowCodec, + row_codec: &dyn PrimaryKeyCodec, key_value: KeyValue, re_encode: bool, metrics: &mut WriteMetrics, @@ -85,17 +86,25 @@ impl Partition { // Key does not yet exist in shard or builder, encode and insert the full primary key. if re_encode { - // `primary_key` is sparse, re-encode the full primary key. - let sparse_key = primary_key.clone(); - primary_key.clear(); - row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?; - let pk_id = inner.shard_builder.write_with_key( - primary_key, - Some(&sparse_key), - &key_value, - metrics, - ); - inner.pk_to_pk_id.insert(sparse_key, pk_id); + match row_codec.encoding() { + PrimaryKeyEncoding::Dense => { + // `primary_key` is sparse, re-encode the full primary key. + let sparse_key = primary_key.clone(); + primary_key.clear(); + row_codec.encode_key_value(&key_value, primary_key)?; + let pk_id = inner.shard_builder.write_with_key( + primary_key, + Some(&sparse_key), + &key_value, + metrics, + ); + inner.pk_to_pk_id.insert(sparse_key, pk_id); + } + PrimaryKeyEncoding::Sparse => { + // TODO(weny): support sparse primary key. + todo!() + } + } } else { // `primary_key` is already the full primary key. let pk_id = inner @@ -126,18 +135,23 @@ impl Partition { Ok(()) } + fn build_primary_key_filter( + need_prune_key: bool, + metadata: &RegionMetadataRef, + row_codec: &dyn PrimaryKeyCodec, + filters: &Arc>, + ) -> Option> { + if need_prune_key { + let filter = row_codec.primary_key_filter(metadata, filters.clone()); + Some(filter) + } else { + None + } + } + /// Scans data in the partition. pub fn read(&self, mut context: ReadPartitionContext) -> Result { let start = Instant::now(); - let key_filter = if context.need_prune_key { - Some(PrimaryKeyFilter::new( - context.metadata.clone(), - context.filters.clone(), - context.row_codec.clone(), - )) - } else { - None - }; let (builder_source, shard_reader_builders) = { let inner = self.inner.read().unwrap(); let mut shard_source = Vec::with_capacity(inner.shards.len() + 1); @@ -157,20 +171,33 @@ impl Partition { }; context.metrics.num_shards += shard_reader_builders.len(); + let mut nodes = shard_reader_builders .into_iter() .map(|builder| { + let primary_key_filter = Self::build_primary_key_filter( + context.need_prune_key, + &context.metadata, + context.row_codec.as_ref(), + &context.filters, + ); Ok(ShardNode::new(ShardSource::Shard( - builder.build(key_filter.clone())?, + builder.build(primary_key_filter)?, ))) }) .collect::>>()?; if let Some(builder) = builder_source { context.metrics.num_builder += 1; + let primary_key_filter = Self::build_primary_key_filter( + context.need_prune_key, + &context.metadata, + context.row_codec.as_ref(), + &context.filters, + ); // Move the initialization of ShardBuilderReader out of read lock. let shard_builder_reader = - builder.build(Some(&context.pk_weights), key_filter.clone())?; + builder.build(Some(&context.pk_weights), primary_key_filter)?; nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader))); } @@ -354,19 +381,20 @@ impl PartitionReader { } } +/// Dense primary key filter. #[derive(Clone)] -pub(crate) struct PrimaryKeyFilter { +pub struct DensePrimaryKeyFilter { metadata: RegionMetadataRef, filters: Arc>, - codec: Arc, + codec: DensePrimaryKeyCodec, offsets_buf: Vec, } -impl PrimaryKeyFilter { +impl DensePrimaryKeyFilter { pub(crate) fn new( metadata: RegionMetadataRef, filters: Arc>, - codec: Arc, + codec: DensePrimaryKeyCodec, ) -> Self { Self { metadata, @@ -375,8 +403,10 @@ impl PrimaryKeyFilter { offsets_buf: Vec::new(), } } +} - pub(crate) fn prune_primary_key(&mut self, pk: &[u8]) -> bool { +impl PrimaryKeyFilter for DensePrimaryKeyFilter { + fn prune_primary_key(&mut self, pk: &[u8]) -> bool { if self.filters.is_empty() { return true; } @@ -428,7 +458,7 @@ impl PrimaryKeyFilter { /// Structs to reuse across readers to avoid allocating for each reader. pub(crate) struct ReadPartitionContext { metadata: RegionMetadataRef, - row_codec: Arc, + row_codec: Arc, projection: HashSet, filters: Arc>, /// Buffer to store pk weights. @@ -467,16 +497,16 @@ impl Drop for ReadPartitionContext { impl ReadPartitionContext { pub(crate) fn new( metadata: RegionMetadataRef, - row_codec: Arc, + row_codec: Arc, projection: HashSet, - filters: Vec, + filters: Arc>, ) -> ReadPartitionContext { let need_prune_key = Self::need_prune_key(&metadata, &filters); ReadPartitionContext { metadata, row_codec, projection, - filters: Arc::new(filters), + filters, pk_weights: Vec::new(), need_prune_key, metrics: Default::default(), diff --git a/src/mito2/src/memtable/partition_tree/shard.rs b/src/mito2/src/memtable/partition_tree/shard.rs index cabd7e0f1222..5154bf93516c 100644 --- a/src/mito2/src/memtable/partition_tree/shard.rs +++ b/src/mito2/src/memtable/partition_tree/shard.rs @@ -26,10 +26,10 @@ use crate::memtable::partition_tree::data::{ }; use crate::memtable::partition_tree::dict::KeyDictRef; use crate::memtable::partition_tree::merger::{Merger, Node}; -use crate::memtable::partition_tree::partition::PrimaryKeyFilter; use crate::memtable::partition_tree::shard_builder::ShardBuilderReader; use crate::memtable::partition_tree::{PkId, PkIndex, ShardId}; use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; +use crate::row_converter::PrimaryKeyFilter; /// Shard stores data related to the same key dictionary. pub struct Shard { @@ -146,7 +146,10 @@ pub struct ShardReaderBuilder { } impl ShardReaderBuilder { - pub(crate) fn build(self, key_filter: Option) -> Result { + pub(crate) fn build( + self, + key_filter: Option>, + ) -> Result { let ShardReaderBuilder { shard_id, key_dict, @@ -163,7 +166,7 @@ pub struct ShardReader { shard_id: ShardId, key_dict: Option, parts_reader: DataPartsReader, - key_filter: Option, + key_filter: Option>, last_yield_pk_index: Option, keys_before_pruning: usize, keys_after_pruning: usize, @@ -176,7 +179,7 @@ impl ShardReader { shard_id: ShardId, key_dict: Option, parts_reader: DataPartsReader, - key_filter: Option, + key_filter: Option>, data_build_cost: Duration, ) -> Result { let has_pk = key_dict.is_some(); diff --git a/src/mito2/src/memtable/partition_tree/shard_builder.rs b/src/mito2/src/memtable/partition_tree/shard_builder.rs index c3bd642eb0ed..f3a00f746977 100644 --- a/src/mito2/src/memtable/partition_tree/shard_builder.rs +++ b/src/mito2/src/memtable/partition_tree/shard_builder.rs @@ -26,11 +26,11 @@ use crate::memtable::partition_tree::data::{ DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP, }; use crate::memtable::partition_tree::dict::{DictBuilderReader, KeyDictBuilder}; -use crate::memtable::partition_tree::partition::PrimaryKeyFilter; use crate::memtable::partition_tree::shard::Shard; use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId}; use crate::memtable::stats::WriteMetrics; use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; +use crate::row_converter::PrimaryKeyFilter; /// Builder to write keys and data to a shard that the key dictionary /// is still active. @@ -189,7 +189,7 @@ impl ShardBuilderReaderBuilder { pub(crate) fn build( self, pk_weights: Option<&[u16]>, - key_filter: Option, + key_filter: Option>, ) -> Result { let now = Instant::now(); let data_reader = self.data_reader.build(pk_weights)?; @@ -208,7 +208,7 @@ pub struct ShardBuilderReader { shard_id: ShardId, dict_reader: DictBuilderReader, data_reader: DataBufferReader, - key_filter: Option, + key_filter: Option>, last_yield_pk_index: Option, keys_before_pruning: usize, keys_after_pruning: usize, @@ -221,7 +221,7 @@ impl ShardBuilderReader { shard_id: ShardId, dict_reader: DictBuilderReader, data_reader: DataBufferReader, - key_filter: Option, + key_filter: Option>, data_build_cost: Duration, ) -> Result { let mut reader = ShardBuilderReader { diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 6ed5eae68bd9..81e281080415 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -43,7 +43,7 @@ use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_ST use crate::read::dedup::LastNonNullIter; use crate::read::Batch; use crate::region::options::MergeMode; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{PrimaryKeyCodec, SortField}; /// The partition tree. pub struct PartitionTree { @@ -52,7 +52,7 @@ pub struct PartitionTree { /// Metadata of the region. pub(crate) metadata: RegionMetadataRef, /// Primary key codec. - row_codec: Arc, + row_codec: Arc, /// Partitions in the tree. partitions: RwLock>, /// Whether the tree has multiple partitions. @@ -65,16 +65,11 @@ pub struct PartitionTree { impl PartitionTree { /// Creates a new partition tree. pub fn new( + row_codec: Arc, metadata: RegionMetadataRef, config: &PartitionTreeConfig, write_buffer_manager: Option, - ) -> PartitionTree { - let row_codec = McmpRowCodec::new( - metadata - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect(), - ); + ) -> Self { let sparse_encoder = SparseEncoder { fields: metadata .primary_key_columns() @@ -93,7 +88,7 @@ impl PartitionTree { PartitionTree { config, metadata, - row_codec: Arc::new(row_codec), + row_codec, partitions: Default::default(), is_partitioned, write_buffer_manager, @@ -141,7 +136,7 @@ impl PartitionTree { self.sparse_encoder .encode_to_vec(kv.primary_keys(), pk_buffer)?; } else { - self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?; + self.row_codec.encode_key_value(&kv, pk_buffer)?; } // Write rows with @@ -191,7 +186,7 @@ impl PartitionTree { self.sparse_encoder .encode_to_vec(kv.primary_keys(), pk_buffer)?; } else { - self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?; + self.row_codec.encode_key_value(&kv, pk_buffer)?; } // Write rows with @@ -238,7 +233,7 @@ impl PartitionTree { self.metadata.clone(), self.row_codec.clone(), projection, - filters, + Arc::new(filters), ); iter.fetch_next_partition(context)?; @@ -278,7 +273,12 @@ impl PartitionTree { || self.metadata.column_metadatas != metadata.column_metadatas { // The schema has changed, we can't reuse the tree. - return PartitionTree::new(metadata, &self.config, self.write_buffer_manager.clone()); + return PartitionTree::new( + self.row_codec.clone(), + metadata, + &self.config, + self.write_buffer_manager.clone(), + ); } let mut total_shared_size = 0; @@ -353,7 +353,7 @@ impl PartitionTree { partition.write_with_key( primary_key, - &self.row_codec, + self.row_codec.as_ref(), key_value, self.is_partitioned, // If tree is partitioned, re-encode is required to get the full primary key. metrics, diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 23452d783a6b..8331a2f58220 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -51,7 +51,7 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::dedup::LastNonNullIter; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::region::options::MergeMode; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; /// Initial vector builder capacity. const INITIAL_BUILDER_CAPACITY: usize = 0; @@ -95,7 +95,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder { pub struct TimeSeriesMemtable { id: MemtableId, region_metadata: RegionMetadataRef, - row_codec: Arc, + row_codec: Arc, series_set: SeriesSet, alloc_tracker: AllocTracker, max_timestamp: AtomicI64, @@ -115,12 +115,7 @@ impl TimeSeriesMemtable { dedup: bool, merge_mode: MergeMode, ) -> Self { - let row_codec = Arc::new(McmpRowCodec::new( - region_metadata - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect(), - )); + let row_codec = Arc::new(DensePrimaryKeyCodec::new(®ion_metadata)); let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone()); let dedup = if merge_mode == MergeMode::LastNonNull { false @@ -350,11 +345,11 @@ type SeriesRwLockMap = RwLock, Arc>>>; struct SeriesSet { region_metadata: RegionMetadataRef, series: Arc, - codec: Arc, + codec: Arc, } impl SeriesSet { - fn new(region_metadata: RegionMetadataRef, codec: Arc) -> Self { + fn new(region_metadata: RegionMetadataRef, codec: Arc) -> Self { Self { region_metadata, series: Default::default(), @@ -451,7 +446,7 @@ struct Iter { predicate: Vec, pk_schema: arrow::datatypes::SchemaRef, pk_datatypes: Vec, - codec: Arc, + codec: Arc, dedup: bool, metrics: Metrics, } @@ -465,7 +460,7 @@ impl Iter { predicate: Option, pk_schema: arrow::datatypes::SchemaRef, pk_datatypes: Vec, - codec: Arc, + codec: Arc, dedup: bool, ) -> Result { let predicate = predicate @@ -560,7 +555,7 @@ impl Iterator for Iter { } fn prune_primary_key( - codec: &Arc, + codec: &Arc, pk: &[u8], series: &mut Series, datatypes: &[ConcreteDataType], @@ -896,6 +891,7 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::row_converter::SortField; fn schema_for_test() -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); @@ -1160,7 +1156,7 @@ mod tests { #[test] fn test_series_set_concurrency() { let schema = schema_for_test(); - let row_codec = Arc::new(McmpRowCodec::new( + let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields( schema .primary_key_columns() .map(|c| SortField::new(c.column_schema.data_type.clone())) diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 11fb62e78854..1de5d624210c 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -26,7 +26,7 @@ use store_api::storage::ColumnId; use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result}; use crate::read::projection::ProjectionMapper; use crate::read::{Batch, BatchColumn, BatchReader}; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField}; /// Reader to adapt schema of underlying reader to expected schema. pub struct CompatReader { @@ -127,7 +127,7 @@ pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) -> #[derive(Debug)] struct CompatPrimaryKey { /// Row converter to append values to primary keys. - converter: McmpRowCodec, + converter: DensePrimaryKeyCodec, /// Default values to append. values: Vec, } @@ -138,10 +138,7 @@ impl CompatPrimaryKey { let mut buffer = Vec::with_capacity(batch.primary_key().len() + self.converter.estimated_size()); buffer.extend_from_slice(batch.primary_key()); - self.converter.encode_to_vec( - self.values.iter().map(|value| value.as_value_ref()), - &mut buffer, - )?; + self.converter.encode_values(&self.values, &mut buffer)?; batch.set_primary_key(buffer); @@ -268,7 +265,7 @@ fn may_compat_primary_key( })?; values.push(default_value); } - let converter = McmpRowCodec::new(fields); + let converter = DensePrimaryKeyCodec::with_fields(fields); Ok(Some(CompatPrimaryKey { converter, values })) } @@ -366,6 +363,7 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::row_converter::PrimaryKeyCodecExt; use crate::test_util::{check_reader_result, VecBatchReader}; /// Creates a new [RegionMetadata]. @@ -400,7 +398,7 @@ mod tests { let fields = (0..keys.len()) .map(|_| SortField::new(ConcreteDataType::string_datatype())) .collect(); - let converter = McmpRowCodec::new(fields); + let converter = DensePrimaryKeyCodec::with_fields(fields); let row = keys.iter().map(|str_opt| match str_opt { Some(v) => ValueRef::String(v), None => ValueRef::Null, diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index e091cf076853..3a4ade9c193f 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -33,7 +33,7 @@ use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, Result}; use crate::read::Batch; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec}; /// Only cache vector when its length `<=` this value. const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; @@ -47,7 +47,7 @@ pub struct ProjectionMapper { /// Output record batch contains tags. has_tags: bool, /// Decoder for primary key. - codec: McmpRowCodec, + codec: DensePrimaryKeyCodec, /// Schema for converted [RecordBatch]. output_schema: SchemaRef, /// Ids of columns to project. It keeps ids in the same order as the `projection` @@ -80,12 +80,7 @@ impl ProjectionMapper { // Safety: idx is valid. column_schemas.push(metadata.schema.column_schemas()[*idx].clone()); } - let codec = McmpRowCodec::new( - metadata - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect(), - ); + let codec = DensePrimaryKeyCodec::new(metadata); // Safety: Columns come from existing schema. let output_schema = Arc::new(Schema::new(column_schemas)); // Get fields in each batch. @@ -186,7 +181,7 @@ impl ProjectionMapper { Some(v) => v.to_vec(), None => self .codec - .decode(batch.primary_key()) + .decode_dense(batch.primary_key()) .map_err(BoxedError::new) .context(ExternalSnafu)?, } @@ -291,6 +286,7 @@ mod tests { use super::*; use crate::cache::CacheManager; use crate::read::BatchBuilder; + use crate::row_converter::{PrimaryKeyCodecExt, SortField}; use crate::test_util::meta_util::TestRegionMetadataBuilder; fn new_batch( @@ -299,7 +295,7 @@ mod tests { fields: &[(ColumnId, i64)], num_rows: usize, ) -> Batch { - let converter = McmpRowCodec::new( + let converter = DensePrimaryKeyCodec::with_fields( (0..tags.len()) .map(|_| SortField::new(ConcreteDataType::int64_datatype())) .collect(), diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index b2a76490cc27..085dd5ba08c5 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -24,10 +24,10 @@ use futures::future::BoxFuture; use futures::StreamExt; use object_store::manager::ObjectStoreManagerRef; use object_store::util::{join_dir, normalize_dir}; -use snafu::{ensure, OptionExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; -use store_api::metadata::{ColumnMetadata, RegionMetadata}; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_engine::RegionRole; use store_api::storage::{ColumnId, RegionId}; @@ -35,7 +35,8 @@ use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ - EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result, StaleLogEntrySnafu, + EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, + Result, StaleLogEntrySnafu, }; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; @@ -59,7 +60,7 @@ use crate::wal::{EntryId, Wal}; /// Builder to create a new [MitoRegion] or open an existing one. pub(crate) struct RegionOpener { region_id: RegionId, - metadata: Option, + metadata_builder: Option, memtable_builder_provider: MemtableBuilderProvider, object_store_manager: ObjectStoreManagerRef, region_dir: String, @@ -90,7 +91,7 @@ impl RegionOpener { ) -> RegionOpener { RegionOpener { region_id, - metadata: None, + metadata_builder: None, memtable_builder_provider, object_store_manager, region_dir: normalize_dir(region_dir), @@ -106,16 +107,27 @@ impl RegionOpener { } } - /// Sets metadata of the region to create. - pub(crate) fn metadata(mut self, metadata: RegionMetadata) -> Self { - self.metadata = Some(metadata); + /// Sets metadata builder of the region to create. + pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self { + self.metadata_builder = Some(builder); self } + /// Builds the region metadata. + /// + /// # Panics + /// - Panics if `options` is not set. + /// - Panics if `metadata_builder` is not set. + fn build_metadata(&mut self) -> Result { + let options = self.options.as_ref().unwrap(); + let mut metadata_builder = self.metadata_builder.take().unwrap(); + metadata_builder.primary_key_encoding(options.primary_key_encoding()); + metadata_builder.build().context(InvalidMetadataSnafu) + } + /// Parses and sets options for the region. - pub(crate) fn parse_options(mut self, options: HashMap) -> Result { - self.options = Some(RegionOptions::try_from(&options)?); - Ok(self) + pub(crate) fn parse_options(self, options: HashMap) -> Result { + self.options(RegionOptions::try_from(&options)?) } /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of @@ -151,21 +163,21 @@ impl RegionOpener { /// Opens the region if it already exists. /// /// # Panics - /// - Panics if metadata is not set. - /// - Panics if options is not set. + /// - Panics if `metadata_builder` is not set. + /// - Panics if `options` is not set. pub(crate) async fn create_or_open( mut self, config: &MitoConfig, wal: &Wal, ) -> Result { let region_id = self.region_id; - + let metadata = self.build_metadata()?; // Tries to open the region. match self.maybe_open(config, wal).await { Ok(Some(region)) => { let recovered = region.metadata(); // Checks the schema of the region. - let expect = self.metadata.as_ref().unwrap(); + let expect = &metadata; check_recovered_region( &recovered, expect.region_id, @@ -189,13 +201,13 @@ impl RegionOpener { ); } } + // Safety: must be set before calling this method. let options = self.options.take().unwrap(); let object_store = self.object_store(&options.storage)?.clone(); let provider = self.provider(&options.wal_options); - + let metadata = Arc::new(metadata); // Create a manifest manager for this region and writes regions to the manifest file. let region_manifest_options = self.manifest_options(config, &options)?; - let metadata = Arc::new(self.metadata.unwrap()); let manifest_manager = RegionManifestManager::new( metadata.clone(), region_manifest_options, diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 4514137cc335..f33c537017d4 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -27,6 +27,7 @@ use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; use serde_with::{serde_as, with_prefix, DisplayFromStr, NoneAsEmptyString}; use snafu::{ensure, ResultExt}; +use store_api::codec::PrimaryKeyEncoding; use store_api::storage::ColumnId; use strum::EnumString; @@ -93,10 +94,19 @@ impl RegionOptions { !self.append_mode } - /// Returns the `merge_mode` if it is set, otherwise returns the default `MergeMode`. + /// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`]. pub fn merge_mode(&self) -> MergeMode { self.merge_mode.unwrap_or_default() } + + /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`]. + pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding { + self.memtable + .as_ref() + .map_or(PrimaryKeyEncoding::default(), |memtable| { + memtable.primary_key_encoding() + }) + } } impl TryFrom<&HashMap> for RegionOptions { @@ -319,6 +329,16 @@ pub enum MemtableOptions { with_prefix!(prefix_partition_tree "memtable.partition_tree."); +impl MemtableOptions { + /// Returns the primary key encoding mode. + pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding { + match self { + MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding, + _ => PrimaryKeyEncoding::Dense, + } + } +} + /// Partition tree memtable options. #[serde_as] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -332,6 +352,8 @@ pub struct PartitionTreeOptions { pub data_freeze_threshold: usize, /// Total bytes of dictionary to keep in fork. pub fork_dictionary_bytes: ReadableSize, + /// Primary key encoding mode. + pub primary_key_encoding: PrimaryKeyEncoding, } impl Default for PartitionTreeOptions { @@ -350,6 +372,7 @@ impl Default for PartitionTreeOptions { index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD, data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD, fork_dictionary_bytes, + primary_key_encoding: PrimaryKeyEncoding::Dense, } } } @@ -644,6 +667,7 @@ mod tests { index_max_keys_per_shard: 2048, data_freeze_threshold: 2048, fork_dictionary_bytes: ReadableSize::mb(128), + primary_key_encoding: PrimaryKeyEncoding::Dense, })), merge_mode: Some(MergeMode::LastNonNull), }; @@ -679,6 +703,7 @@ mod tests { index_max_keys_per_shard: 2048, data_freeze_threshold: 2048, fork_dictionary_bytes: ReadableSize::mb(128), + primary_key_encoding: PrimaryKeyEncoding::Dense, })), merge_mode: Some(MergeMode::LastNonNull), }; @@ -747,6 +772,7 @@ mod tests { index_max_keys_per_shard: 2048, data_freeze_threshold: 2048, fork_dictionary_bytes: ReadableSize::mb(128), + primary_key_encoding: PrimaryKeyEncoding::Dense, })), merge_mode: Some(MergeMode::LastNonNull), }; diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index d1545148fcaf..f05c1ce0bc34 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -12,32 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -use bytes::Buf; -use common_base::bytes::Bytes; -use common_decimal::Decimal128; -use common_time::time::Time; -use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth}; -use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::Value; -use datatypes::types::IntervalType; -use datatypes::value::ValueRef; -use memcomparable::{Deserializer, Serializer}; -use paste::paste; -use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use store_api::metadata::RegionMetadata; +mod dense; -use crate::error; -use crate::error::{FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu}; +use std::sync::Arc; + +use common_recordbatch::filter::SimpleFilterEvaluator; +use datatypes::value::{Value, ValueRef}; +pub use dense::{DensePrimaryKeyCodec, SortField}; +use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::RegionMetadataRef; + +use crate::error::Result; +use crate::memtable::key_values::KeyValue; /// Row value encoder/decoder. -pub trait RowCodec { +pub trait PrimaryKeyCodecExt { /// Encodes rows to bytes. /// # Note /// Ensure the length of row iterator matches the length of fields. fn encode<'a, I>(&self, row: I) -> Result> where - I: Iterator>; + I: Iterator>, + { + let mut buffer = Vec::new(); + self.encode_to_vec(row, &mut buffer)?; + Ok(buffer) + } /// Encodes rows to specific vec. /// # Note @@ -50,594 +50,41 @@ pub trait RowCodec { fn decode(&self, bytes: &[u8]) -> Result>; } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SortField { - pub(crate) data_type: ConcreteDataType, +pub trait PrimaryKeyFilter: Send + Sync { + /// Returns true if need to prune the primary key. + fn prune_primary_key(&mut self, pk: &[u8]) -> bool; } -impl SortField { - pub fn new(data_type: ConcreteDataType) -> Self { - Self { data_type } - } +pub trait PrimaryKeyCodec: Send + Sync { + /// Encodes a key value to bytes. + fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec) -> Result<()>; - pub fn estimated_size(&self) -> usize { - match &self.data_type { - ConcreteDataType::Boolean(_) => 2, - ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, - ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, - ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5, - ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, - ConcreteDataType::Float32(_) => 5, - ConcreteDataType::Float64(_) => 9, - ConcreteDataType::Binary(_) - | ConcreteDataType::Json(_) - | ConcreteDataType::Vector(_) => 11, - ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes. - ConcreteDataType::Date(_) => 5, - ConcreteDataType::DateTime(_) => 9, - ConcreteDataType::Timestamp(_) => 10, - ConcreteDataType::Time(_) => 10, - ConcreteDataType::Duration(_) => 10, - ConcreteDataType::Interval(_) => 18, - ConcreteDataType::Decimal128(_) => 19, - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) => 0, - } - } -} + /// Encodes values to bytes. + fn encode_values(&self, values: &[Value], buffer: &mut Vec) -> Result<()>; -impl SortField { - pub(crate) fn serialize( - &self, - serializer: &mut Serializer<&mut Vec>, - value: &ValueRef, - ) -> Result<()> { - macro_rules! cast_value_and_serialize { - ( - $self: ident; - $serializer: ident; - $( - $ty: ident, $f: ident - ),* - ) => { - match &$self.data_type { - $( - ConcreteDataType::$ty(_) => { - paste!{ - value - .[]() - .context(FieldTypeMismatchSnafu)? - .serialize($serializer) - .context(SerializeFieldSnafu)?; - } - } - )* - ConcreteDataType::Timestamp(_) => { - let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?; - timestamp - .map(|t|t.value()) - .serialize($serializer) - .context(SerializeFieldSnafu)?; - } - ConcreteDataType::Interval(IntervalType::YearMonth(_)) => { - let interval = value.as_interval_year_month().context(FieldTypeMismatchSnafu)?; - interval.map(|i| i.to_i32()) - .serialize($serializer) - .context(SerializeFieldSnafu)?; - } - ConcreteDataType::Interval(IntervalType::DayTime(_)) => { - let interval = value.as_interval_day_time().context(FieldTypeMismatchSnafu)?; - interval.map(|i| i.to_i64()) - .serialize($serializer) - .context(SerializeFieldSnafu)?; - } - ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => { - let interval = value.as_interval_month_day_nano().context(FieldTypeMismatchSnafu)?; - interval.map(|i| i.to_i128()) - .serialize($serializer) - .context(SerializeFieldSnafu)?; - } - ConcreteDataType::List(_) | - ConcreteDataType::Dictionary(_) | - ConcreteDataType::Null(_) => { - return error::NotSupportedFieldSnafu { - data_type: $self.data_type.clone() - }.fail() - } - } - }; - } - cast_value_and_serialize!(self; serializer; - Boolean, boolean, - Binary, binary, - Int8, i8, - UInt8, u8, - Int16, i16, - UInt16, u16, - Int32, i32, - UInt32, u32, - Int64, i64, - UInt64, u64, - Float32, f32, - Float64, f64, - String, string, - Date, date, - DateTime, datetime, - Time, time, - Duration, duration, - Decimal128, decimal128, - Json, binary, - Vector, binary - ); - - Ok(()) - } + /// Returns the number of fields in the primary key. + fn num_fields(&self) -> usize; - fn deserialize(&self, deserializer: &mut Deserializer) -> Result { - use common_time::DateTime; - macro_rules! deserialize_and_build_value { - ( - $self: ident; - $serializer: ident; - $( - $ty: ident, $f: ident - ),* - ) => { - - match &$self.data_type { - $( - ConcreteDataType::$ty(_) => { - Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?)) - } - )* - ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from( - Option::>::deserialize(deserializer) - .context(error::DeserializeFieldSnafu)? - .map(Bytes::from), - )), - ConcreteDataType::Timestamp(ty) => { - let timestamp = Option::::deserialize(deserializer) - .context(error::DeserializeFieldSnafu)? - .map(|t|ty.create_timestamp(t)); - Ok(Value::from(timestamp)) - } - ConcreteDataType::Interval(IntervalType::YearMonth(_)) => { - let interval = Option::::deserialize(deserializer) - .context(error::DeserializeFieldSnafu)? - .map(IntervalYearMonth::from_i32); - Ok(Value::from(interval)) - } - ConcreteDataType::Interval(IntervalType::DayTime(_)) => { - let interval = Option::::deserialize(deserializer) - .context(error::DeserializeFieldSnafu)? - .map(IntervalDayTime::from_i64); - Ok(Value::from(interval)) - } - ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => { - let interval = Option::::deserialize(deserializer) - .context(error::DeserializeFieldSnafu)? - .map(IntervalMonthDayNano::from_i128); - Ok(Value::from(interval)) - } - ConcreteDataType::List(l) => NotSupportedFieldSnafu { - data_type: ConcreteDataType::List(l.clone()), - } - .fail(), - ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu { - data_type: ConcreteDataType::Dictionary(d.clone()), - } - .fail(), - ConcreteDataType::Null(n) => NotSupportedFieldSnafu { - data_type: ConcreteDataType::Null(n.clone()), - } - .fail(), - } - }; - } - deserialize_and_build_value!(self; deserializer; - Boolean, bool, - Int8, i8, - Int16, i16, - Int32, i32, - Int64, i64, - UInt8, u8, - UInt16, u16, - UInt32, u32, - UInt64, u64, - Float32, f32, - Float64, f64, - String, String, - Date, Date, - Time, Time, - DateTime, DateTime, - Duration, Duration, - Decimal128, Decimal128 - ) - } - - /// Skip deserializing this field, returns the length of it. - fn skip_deserialize( + /// Returns a primary key filter factory. + fn primary_key_filter( &self, - bytes: &[u8], - deserializer: &mut Deserializer<&[u8]>, - ) -> Result { - let pos = deserializer.position(); - if bytes[pos] == 0 { - deserializer.advance(1); - return Ok(1); - } - - let to_skip = match &self.data_type { - ConcreteDataType::Boolean(_) => 2, - ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, - ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, - ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5, - ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, - ConcreteDataType::Float32(_) => 5, - ConcreteDataType::Float64(_) => 9, - ConcreteDataType::Binary(_) - | ConcreteDataType::Json(_) - | ConcreteDataType::Vector(_) => { - // Now the encoder encode binary as a list of bytes so we can't use - // skip bytes. - let pos_before = deserializer.position(); - let mut current = pos_before + 1; - while bytes[current] == 1 { - current += 2; - } - let to_skip = current - pos_before + 1; - deserializer.advance(to_skip); - return Ok(to_skip); - } - ConcreteDataType::String(_) => { - let pos_before = deserializer.position(); - deserializer.advance(1); - deserializer - .skip_bytes() - .context(error::DeserializeFieldSnafu)?; - return Ok(deserializer.position() - pos_before); - } - ConcreteDataType::Date(_) => 5, - ConcreteDataType::DateTime(_) => 9, - ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option - ConcreteDataType::Time(_) => 10, // i64 and 1 byte time unit - ConcreteDataType::Duration(_) => 10, - ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5, - ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9, - ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17, - ConcreteDataType::Decimal128(_) => 19, - ConcreteDataType::Null(_) - | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) => 0, - }; - deserializer.advance(to_skip); - Ok(to_skip) - } -} - -/// A memory-comparable row [Value] encoder/decoder. -#[derive(Debug)] -pub struct McmpRowCodec { - fields: Vec, -} - -impl McmpRowCodec { - /// Creates [McmpRowCodec] instance with all primary keys in given `metadata`. - pub fn new_with_primary_keys(metadata: &RegionMetadata) -> Self { - Self::new( - metadata - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect(), - ) - } + metadata: &RegionMetadataRef, + filters: Arc>, + ) -> Box; - pub fn new(fields: Vec) -> Self { - Self { fields } + /// Returns the estimated size of the primary key. + fn estimated_size(&self) -> Option { + None } - pub fn num_fields(&self) -> usize { - self.fields.len() - } + /// Returns the encoding type of the primary key. + fn encoding(&self) -> PrimaryKeyEncoding; - /// Estimated length for encoded bytes. - pub fn estimated_size(&self) -> usize { - self.fields.iter().map(|f| f.estimated_size()).sum() - } - - /// Decode value at `pos` in `bytes`. + /// Decodes the primary key from the given bytes. /// - /// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`. - pub fn decode_value_at( - &self, - bytes: &[u8], - pos: usize, - offsets_buf: &mut Vec, - ) -> Result { - let mut deserializer = Deserializer::new(bytes); - if pos < offsets_buf.len() { - // We computed the offset before. - let to_skip = offsets_buf[pos]; - deserializer.advance(to_skip); - return self.fields[pos].deserialize(&mut deserializer); - } - - if offsets_buf.is_empty() { - let mut offset = 0; - // Skip values before `pos`. - for i in 0..pos { - // Offset to skip before reading value i. - offsets_buf.push(offset); - let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; - offset += skip; - } - // Offset to skip before reading this value. - offsets_buf.push(offset); - } else { - // Offsets are not enough. - let value_start = offsets_buf.len() - 1; - // Advances to decode value at `value_start`. - let mut offset = offsets_buf[value_start]; - deserializer.advance(offset); - for i in value_start..pos { - // Skip value i. - let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?; - // Offset for the value at i + 1. - offset += skip; - offsets_buf.push(offset); - } - } - - self.fields[pos].deserialize(&mut deserializer) - } -} - -impl RowCodec for McmpRowCodec { - fn encode<'a, I>(&self, row: I) -> Result> - where - I: Iterator>, - { - let mut buffer = Vec::new(); - self.encode_to_vec(row, &mut buffer)?; - Ok(buffer) - } - - fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> - where - I: Iterator>, - { - buffer.reserve(self.estimated_size()); - let mut serializer = Serializer::new(buffer); - for (value, field) in row.zip(self.fields.iter()) { - field.serialize(&mut serializer, &value)?; - } - Ok(()) - } - - fn decode(&self, bytes: &[u8]) -> Result> { - let mut deserializer = Deserializer::new(bytes); - let mut values = Vec::with_capacity(self.fields.len()); - for f in &self.fields { - let value = f.deserialize(&mut deserializer)?; - values.push(value); - } - Ok(values) - } -} - -#[cfg(test)] -mod tests { - use common_base::bytes::StringBytes; - use common_time::{ - DateTime, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, - }; - use datatypes::value::Value; - - use super::*; - - fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec) { - let encoder = McmpRowCodec::new( - data_types - .iter() - .map(|t| SortField::new(t.clone())) - .collect::>(), - ); - - let value_ref = row.iter().map(|v| v.as_value_ref()).collect::>(); - - let result = encoder.encode(value_ref.iter().cloned()).unwrap(); - let decoded = encoder.decode(&result).unwrap(); - assert_eq!(decoded, row); - let mut decoded = Vec::new(); - let mut offsets = Vec::new(); - // Iter two times to test offsets buffer. - for _ in 0..2 { - decoded.clear(); - for i in 0..data_types.len() { - let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap(); - decoded.push(value); - } - assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets); - assert_eq!(decoded, row); - } - } + /// Returns a [`Vec`] that follows the primary key ordering. + fn decode_dense(&self, bytes: &[u8]) -> Result>; - #[test] - fn test_memcmp() { - let encoder = McmpRowCodec::new(vec![ - SortField::new(ConcreteDataType::string_datatype()), - SortField::new(ConcreteDataType::int64_datatype()), - ]); - let values = [Value::String("abcdefgh".into()), Value::Int64(128)]; - let value_ref = values.iter().map(|v| v.as_value_ref()).collect::>(); - let result = encoder.encode(value_ref.iter().cloned()).unwrap(); - - let decoded = encoder.decode(&result).unwrap(); - assert_eq!(&values, &decoded as &[Value]); - } - - #[test] - fn test_memcmp_timestamp() { - check_encode_and_decode( - &[ - ConcreteDataType::timestamp_millisecond_datatype(), - ConcreteDataType::int64_datatype(), - ], - vec![ - Value::Timestamp(Timestamp::new_millisecond(42)), - Value::Int64(43), - ], - ); - } - - #[test] - fn test_memcmp_duration() { - check_encode_and_decode( - &[ - ConcreteDataType::duration_millisecond_datatype(), - ConcreteDataType::int64_datatype(), - ], - vec![ - Value::Duration(Duration::new_millisecond(44)), - Value::Int64(45), - ], - ) - } - - #[test] - fn test_memcmp_binary() { - check_encode_and_decode( - &[ - ConcreteDataType::binary_datatype(), - ConcreteDataType::int64_datatype(), - ], - vec![ - Value::Binary(Bytes::from("hello".as_bytes())), - Value::Int64(43), - ], - ); - } - - #[test] - fn test_memcmp_string() { - check_encode_and_decode( - &[ConcreteDataType::string_datatype()], - vec![Value::String(StringBytes::from("hello"))], - ); - - check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]); - - check_encode_and_decode( - &[ConcreteDataType::string_datatype()], - vec![Value::String("".into())], - ); - check_encode_and_decode( - &[ConcreteDataType::string_datatype()], - vec![Value::String("world".into())], - ); - } - - #[test] - fn test_encode_null() { - check_encode_and_decode( - &[ - ConcreteDataType::string_datatype(), - ConcreteDataType::int32_datatype(), - ], - vec![Value::String(StringBytes::from("abcd")), Value::Null], - ) - } - - #[test] - fn test_encode_multiple_rows() { - check_encode_and_decode( - &[ - ConcreteDataType::string_datatype(), - ConcreteDataType::int64_datatype(), - ConcreteDataType::boolean_datatype(), - ], - vec![ - Value::String("hello".into()), - Value::Int64(42), - Value::Boolean(false), - ], - ); - - check_encode_and_decode( - &[ - ConcreteDataType::string_datatype(), - ConcreteDataType::int64_datatype(), - ConcreteDataType::boolean_datatype(), - ], - vec![ - Value::String("world".into()), - Value::Int64(43), - Value::Boolean(true), - ], - ); - - check_encode_and_decode( - &[ - ConcreteDataType::string_datatype(), - ConcreteDataType::int64_datatype(), - ConcreteDataType::boolean_datatype(), - ], - vec![Value::Null, Value::Int64(43), Value::Boolean(true)], - ); - - // All types. - check_encode_and_decode( - &[ - ConcreteDataType::boolean_datatype(), - ConcreteDataType::int8_datatype(), - ConcreteDataType::uint8_datatype(), - ConcreteDataType::int16_datatype(), - ConcreteDataType::uint16_datatype(), - ConcreteDataType::int32_datatype(), - ConcreteDataType::uint32_datatype(), - ConcreteDataType::int64_datatype(), - ConcreteDataType::uint64_datatype(), - ConcreteDataType::float32_datatype(), - ConcreteDataType::float64_datatype(), - ConcreteDataType::binary_datatype(), - ConcreteDataType::string_datatype(), - ConcreteDataType::date_datatype(), - ConcreteDataType::datetime_datatype(), - ConcreteDataType::timestamp_millisecond_datatype(), - ConcreteDataType::time_millisecond_datatype(), - ConcreteDataType::duration_millisecond_datatype(), - ConcreteDataType::interval_year_month_datatype(), - ConcreteDataType::interval_day_time_datatype(), - ConcreteDataType::interval_month_day_nano_datatype(), - ConcreteDataType::decimal128_default_datatype(), - ConcreteDataType::vector_datatype(3), - ], - vec![ - Value::Boolean(true), - Value::Int8(8), - Value::UInt8(8), - Value::Int16(16), - Value::UInt16(16), - Value::Int32(32), - Value::UInt32(32), - Value::Int64(64), - Value::UInt64(64), - Value::Float32(1.0.into()), - Value::Float64(1.0.into()), - Value::Binary(b"hello"[..].into()), - Value::String("world".into()), - Value::Date(Date::new(10)), - Value::DateTime(DateTime::new(11)), - Value::Timestamp(Timestamp::new_millisecond(12)), - Value::Time(Time::new_millisecond(13)), - Value::Duration(Duration::new_millisecond(14)), - Value::IntervalYearMonth(IntervalYearMonth::new(1)), - Value::IntervalDayTime(IntervalDayTime::new(1, 15)), - Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)), - Value::Decimal128(Decimal128::from(16)), - Value::Binary(Bytes::from(vec![0; 12])), - ], - ); - } + /// Decode the leftmost value from bytes. + fn decode_leftmost(&self, bytes: &[u8]) -> Result>; } diff --git a/src/mito2/src/row_converter/dense.rs b/src/mito2/src/row_converter/dense.rs new file mode 100644 index 000000000000..f84b5905e855 --- /dev/null +++ b/src/mito2/src/row_converter/dense.rs @@ -0,0 +1,679 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use bytes::Buf; +use common_base::bytes::Bytes; +use common_decimal::Decimal128; +use common_recordbatch::filter::SimpleFilterEvaluator; +use common_time::time::Time; +use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth}; +use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::Value; +use datatypes::types::IntervalType; +use datatypes::value::ValueRef; +use memcomparable::{Deserializer, Serializer}; +use paste::paste; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; + +use super::PrimaryKeyFilter; +use crate::error::{ + self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu, +}; +use crate::memtable::key_values::KeyValue; +use crate::memtable::partition_tree::DensePrimaryKeyFilter; +use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyCodecExt}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SortField { + pub(crate) data_type: ConcreteDataType, +} + +impl SortField { + pub fn new(data_type: ConcreteDataType) -> Self { + Self { data_type } + } + + pub fn estimated_size(&self) -> usize { + match &self.data_type { + ConcreteDataType::Boolean(_) => 2, + ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, + ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, + ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5, + ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, + ConcreteDataType::Float32(_) => 5, + ConcreteDataType::Float64(_) => 9, + ConcreteDataType::Binary(_) + | ConcreteDataType::Json(_) + | ConcreteDataType::Vector(_) => 11, + ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes. + ConcreteDataType::Date(_) => 5, + ConcreteDataType::DateTime(_) => 9, + ConcreteDataType::Timestamp(_) => 10, + ConcreteDataType::Time(_) => 10, + ConcreteDataType::Duration(_) => 10, + ConcreteDataType::Interval(_) => 18, + ConcreteDataType::Decimal128(_) => 19, + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => 0, + } + } +} + +impl SortField { + pub(crate) fn serialize( + &self, + serializer: &mut Serializer<&mut Vec>, + value: &ValueRef, + ) -> Result<()> { + macro_rules! cast_value_and_serialize { + ( + $self: ident; + $serializer: ident; + $( + $ty: ident, $f: ident + ),* + ) => { + match &$self.data_type { + $( + ConcreteDataType::$ty(_) => { + paste!{ + value + .[]() + .context(FieldTypeMismatchSnafu)? + .serialize($serializer) + .context(SerializeFieldSnafu)?; + } + } + )* + ConcreteDataType::Timestamp(_) => { + let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?; + timestamp + .map(|t|t.value()) + .serialize($serializer) + .context(SerializeFieldSnafu)?; + } + ConcreteDataType::Interval(IntervalType::YearMonth(_)) => { + let interval = value.as_interval_year_month().context(FieldTypeMismatchSnafu)?; + interval.map(|i| i.to_i32()) + .serialize($serializer) + .context(SerializeFieldSnafu)?; + } + ConcreteDataType::Interval(IntervalType::DayTime(_)) => { + let interval = value.as_interval_day_time().context(FieldTypeMismatchSnafu)?; + interval.map(|i| i.to_i64()) + .serialize($serializer) + .context(SerializeFieldSnafu)?; + } + ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => { + let interval = value.as_interval_month_day_nano().context(FieldTypeMismatchSnafu)?; + interval.map(|i| i.to_i128()) + .serialize($serializer) + .context(SerializeFieldSnafu)?; + } + ConcreteDataType::List(_) | + ConcreteDataType::Dictionary(_) | + ConcreteDataType::Null(_) => { + return error::NotSupportedFieldSnafu { + data_type: $self.data_type.clone() + }.fail() + } + } + }; + } + cast_value_and_serialize!(self; serializer; + Boolean, boolean, + Binary, binary, + Int8, i8, + UInt8, u8, + Int16, i16, + UInt16, u16, + Int32, i32, + UInt32, u32, + Int64, i64, + UInt64, u64, + Float32, f32, + Float64, f64, + String, string, + Date, date, + DateTime, datetime, + Time, time, + Duration, duration, + Decimal128, decimal128, + Json, binary, + Vector, binary + ); + + Ok(()) + } + + fn deserialize(&self, deserializer: &mut Deserializer) -> Result { + use common_time::DateTime; + macro_rules! deserialize_and_build_value { + ( + $self: ident; + $serializer: ident; + $( + $ty: ident, $f: ident + ),* + ) => { + + match &$self.data_type { + $( + ConcreteDataType::$ty(_) => { + Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?)) + } + )* + ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from( + Option::>::deserialize(deserializer) + .context(error::DeserializeFieldSnafu)? + .map(Bytes::from), + )), + ConcreteDataType::Timestamp(ty) => { + let timestamp = Option::::deserialize(deserializer) + .context(error::DeserializeFieldSnafu)? + .map(|t|ty.create_timestamp(t)); + Ok(Value::from(timestamp)) + } + ConcreteDataType::Interval(IntervalType::YearMonth(_)) => { + let interval = Option::::deserialize(deserializer) + .context(error::DeserializeFieldSnafu)? + .map(IntervalYearMonth::from_i32); + Ok(Value::from(interval)) + } + ConcreteDataType::Interval(IntervalType::DayTime(_)) => { + let interval = Option::::deserialize(deserializer) + .context(error::DeserializeFieldSnafu)? + .map(IntervalDayTime::from_i64); + Ok(Value::from(interval)) + } + ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => { + let interval = Option::::deserialize(deserializer) + .context(error::DeserializeFieldSnafu)? + .map(IntervalMonthDayNano::from_i128); + Ok(Value::from(interval)) + } + ConcreteDataType::List(l) => NotSupportedFieldSnafu { + data_type: ConcreteDataType::List(l.clone()), + } + .fail(), + ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu { + data_type: ConcreteDataType::Dictionary(d.clone()), + } + .fail(), + ConcreteDataType::Null(n) => NotSupportedFieldSnafu { + data_type: ConcreteDataType::Null(n.clone()), + } + .fail(), + } + }; + } + deserialize_and_build_value!(self; deserializer; + Boolean, bool, + Int8, i8, + Int16, i16, + Int32, i32, + Int64, i64, + UInt8, u8, + UInt16, u16, + UInt32, u32, + UInt64, u64, + Float32, f32, + Float64, f64, + String, String, + Date, Date, + Time, Time, + DateTime, DateTime, + Duration, Duration, + Decimal128, Decimal128 + ) + } + + /// Skip deserializing this field, returns the length of it. + fn skip_deserialize( + &self, + bytes: &[u8], + deserializer: &mut Deserializer<&[u8]>, + ) -> Result { + let pos = deserializer.position(); + if bytes[pos] == 0 { + deserializer.advance(1); + return Ok(1); + } + + let to_skip = match &self.data_type { + ConcreteDataType::Boolean(_) => 2, + ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2, + ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3, + ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5, + ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, + ConcreteDataType::Float32(_) => 5, + ConcreteDataType::Float64(_) => 9, + ConcreteDataType::Binary(_) + | ConcreteDataType::Json(_) + | ConcreteDataType::Vector(_) => { + // Now the encoder encode binary as a list of bytes so we can't use + // skip bytes. + let pos_before = deserializer.position(); + let mut current = pos_before + 1; + while bytes[current] == 1 { + current += 2; + } + let to_skip = current - pos_before + 1; + deserializer.advance(to_skip); + return Ok(to_skip); + } + ConcreteDataType::String(_) => { + let pos_before = deserializer.position(); + deserializer.advance(1); + deserializer + .skip_bytes() + .context(error::DeserializeFieldSnafu)?; + return Ok(deserializer.position() - pos_before); + } + ConcreteDataType::Date(_) => 5, + ConcreteDataType::DateTime(_) => 9, + ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option + ConcreteDataType::Time(_) => 10, // i64 and 1 byte time unit + ConcreteDataType::Duration(_) => 10, + ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5, + ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9, + ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17, + ConcreteDataType::Decimal128(_) => 19, + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => 0, + }; + deserializer.advance(to_skip); + Ok(to_skip) + } +} + +impl PrimaryKeyCodecExt for DensePrimaryKeyCodec { + fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + where + I: Iterator>, + { + self.encode_dense(row, buffer) + } + + fn decode(&self, bytes: &[u8]) -> Result> { + self.decode_dense(bytes) + } +} + +/// A memory-comparable row [`Value`] encoder/decoder. +#[derive(Clone, Debug)] +pub struct DensePrimaryKeyCodec { + /// Primary key fields. + ordered_primary_key_columns: Arc>, +} + +impl DensePrimaryKeyCodec { + pub fn new(metadata: &RegionMetadata) -> Self { + let ordered_primary_key_columns = Arc::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect::>(), + ); + + Self { + ordered_primary_key_columns, + } + } + + pub fn with_fields(fields: Vec) -> Self { + Self { + ordered_primary_key_columns: Arc::new(fields), + } + } + + fn encode_dense<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + where + I: Iterator>, + { + let mut serializer = Serializer::new(buffer); + for (value, field) in row.zip(self.ordered_primary_key_columns.iter()) { + field.serialize(&mut serializer, &value)?; + } + Ok(()) + } + + /// Decode value at `pos` in `bytes`. + /// + /// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`. + pub fn decode_value_at( + &self, + bytes: &[u8], + pos: usize, + offsets_buf: &mut Vec, + ) -> Result { + let mut deserializer = Deserializer::new(bytes); + if pos < offsets_buf.len() { + // We computed the offset before. + let to_skip = offsets_buf[pos]; + deserializer.advance(to_skip); + return self.ordered_primary_key_columns[pos].deserialize(&mut deserializer); + } + + if offsets_buf.is_empty() { + let mut offset = 0; + // Skip values before `pos`. + for i in 0..pos { + // Offset to skip before reading value i. + offsets_buf.push(offset); + let skip = self.ordered_primary_key_columns[i] + .skip_deserialize(bytes, &mut deserializer)?; + offset += skip; + } + // Offset to skip before reading this value. + offsets_buf.push(offset); + } else { + // Offsets are not enough. + let value_start = offsets_buf.len() - 1; + // Advances to decode value at `value_start`. + let mut offset = offsets_buf[value_start]; + deserializer.advance(offset); + for i in value_start..pos { + // Skip value i. + let skip = self.ordered_primary_key_columns[i] + .skip_deserialize(bytes, &mut deserializer)?; + // Offset for the value at i + 1. + offset += skip; + offsets_buf.push(offset); + } + } + + self.ordered_primary_key_columns[pos].deserialize(&mut deserializer) + } + + pub fn estimated_size(&self) -> usize { + self.ordered_primary_key_columns + .iter() + .map(|f| f.estimated_size()) + .sum() + } +} + +impl PrimaryKeyCodec for DensePrimaryKeyCodec { + fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec) -> Result<()> { + self.encode_dense(key_value.primary_keys(), buffer) + } + + fn encode_values(&self, values: &[Value], buffer: &mut Vec) -> Result<()> { + self.encode_dense(values.iter().map(|v| v.as_value_ref()), buffer) + } + + fn estimated_size(&self) -> Option { + Some(self.estimated_size()) + } + + fn num_fields(&self) -> usize { + self.ordered_primary_key_columns.len() + } + + fn encoding(&self) -> PrimaryKeyEncoding { + PrimaryKeyEncoding::Dense + } + + fn primary_key_filter( + &self, + metadata: &RegionMetadataRef, + filters: Arc>, + ) -> Box { + Box::new(DensePrimaryKeyFilter::new( + metadata.clone(), + filters, + self.clone(), + )) + } + + fn decode_dense(&self, bytes: &[u8]) -> Result> { + let mut deserializer = Deserializer::new(bytes); + let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len()); + for f in self.ordered_primary_key_columns.iter() { + let value = f.deserialize(&mut deserializer)?; + values.push(value); + } + Ok(values) + } + + fn decode_leftmost(&self, bytes: &[u8]) -> Result> { + // TODO(weny, yinwen): avoid decoding the whole primary key. + let mut values = self.decode_dense(bytes)?; + Ok(values.pop()) + } +} + +#[cfg(test)] +mod tests { + use common_base::bytes::StringBytes; + use common_time::{ + DateTime, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, + }; + use datatypes::value::Value; + + use super::*; + + fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec) { + let encoder = DensePrimaryKeyCodec::with_fields( + data_types + .iter() + .map(|t| SortField::new(t.clone())) + .collect::>(), + ); + + let value_ref = row.iter().map(|v| v.as_value_ref()).collect::>(); + + let result = encoder.encode(value_ref.iter().cloned()).unwrap(); + let decoded = encoder.decode(&result).unwrap(); + assert_eq!(decoded, row); + let mut decoded = Vec::new(); + let mut offsets = Vec::new(); + // Iter two times to test offsets buffer. + for _ in 0..2 { + decoded.clear(); + for i in 0..data_types.len() { + let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap(); + decoded.push(value); + } + assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets); + assert_eq!(decoded, row); + } + } + + #[test] + fn test_memcmp() { + let encoder = DensePrimaryKeyCodec::with_fields(vec![ + SortField::new(ConcreteDataType::string_datatype()), + SortField::new(ConcreteDataType::int64_datatype()), + ]); + let values = [Value::String("abcdefgh".into()), Value::Int64(128)]; + let value_ref = values.iter().map(|v| v.as_value_ref()).collect::>(); + let result = encoder.encode(value_ref.iter().cloned()).unwrap(); + + let decoded = encoder.decode(&result).unwrap(); + assert_eq!(&values, &decoded as &[Value]); + } + + #[test] + fn test_memcmp_timestamp() { + check_encode_and_decode( + &[ + ConcreteDataType::timestamp_millisecond_datatype(), + ConcreteDataType::int64_datatype(), + ], + vec![ + Value::Timestamp(Timestamp::new_millisecond(42)), + Value::Int64(43), + ], + ); + } + + #[test] + fn test_memcmp_duration() { + check_encode_and_decode( + &[ + ConcreteDataType::duration_millisecond_datatype(), + ConcreteDataType::int64_datatype(), + ], + vec![ + Value::Duration(Duration::new_millisecond(44)), + Value::Int64(45), + ], + ) + } + + #[test] + fn test_memcmp_binary() { + check_encode_and_decode( + &[ + ConcreteDataType::binary_datatype(), + ConcreteDataType::int64_datatype(), + ], + vec![ + Value::Binary(Bytes::from("hello".as_bytes())), + Value::Int64(43), + ], + ); + } + + #[test] + fn test_memcmp_string() { + check_encode_and_decode( + &[ConcreteDataType::string_datatype()], + vec![Value::String(StringBytes::from("hello"))], + ); + + check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]); + + check_encode_and_decode( + &[ConcreteDataType::string_datatype()], + vec![Value::String("".into())], + ); + check_encode_and_decode( + &[ConcreteDataType::string_datatype()], + vec![Value::String("world".into())], + ); + } + + #[test] + fn test_encode_null() { + check_encode_and_decode( + &[ + ConcreteDataType::string_datatype(), + ConcreteDataType::int32_datatype(), + ], + vec![Value::String(StringBytes::from("abcd")), Value::Null], + ) + } + + #[test] + fn test_encode_multiple_rows() { + check_encode_and_decode( + &[ + ConcreteDataType::string_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::boolean_datatype(), + ], + vec![ + Value::String("hello".into()), + Value::Int64(42), + Value::Boolean(false), + ], + ); + + check_encode_and_decode( + &[ + ConcreteDataType::string_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::boolean_datatype(), + ], + vec![ + Value::String("world".into()), + Value::Int64(43), + Value::Boolean(true), + ], + ); + + check_encode_and_decode( + &[ + ConcreteDataType::string_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::boolean_datatype(), + ], + vec![Value::Null, Value::Int64(43), Value::Boolean(true)], + ); + + // All types. + check_encode_and_decode( + &[ + ConcreteDataType::boolean_datatype(), + ConcreteDataType::int8_datatype(), + ConcreteDataType::uint8_datatype(), + ConcreteDataType::int16_datatype(), + ConcreteDataType::uint16_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::uint32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::uint64_datatype(), + ConcreteDataType::float32_datatype(), + ConcreteDataType::float64_datatype(), + ConcreteDataType::binary_datatype(), + ConcreteDataType::string_datatype(), + ConcreteDataType::date_datatype(), + ConcreteDataType::datetime_datatype(), + ConcreteDataType::timestamp_millisecond_datatype(), + ConcreteDataType::time_millisecond_datatype(), + ConcreteDataType::duration_millisecond_datatype(), + ConcreteDataType::interval_year_month_datatype(), + ConcreteDataType::interval_day_time_datatype(), + ConcreteDataType::interval_month_day_nano_datatype(), + ConcreteDataType::decimal128_default_datatype(), + ConcreteDataType::vector_datatype(3), + ], + vec![ + Value::Boolean(true), + Value::Int8(8), + Value::UInt8(8), + Value::Int16(16), + Value::UInt16(16), + Value::Int32(32), + Value::UInt32(32), + Value::Int64(64), + Value::UInt64(64), + Value::Float32(1.0.into()), + Value::Float64(1.0.into()), + Value::Binary(b"hello"[..].into()), + Value::String("world".into()), + Value::Date(Date::new(10)), + Value::DateTime(DateTime::new(11)), + Value::Timestamp(Timestamp::new_millisecond(12)), + Value::Time(Time::new_millisecond(13)), + Value::Duration(Duration::new_millisecond(14)), + Value::IntervalYearMonth(IntervalYearMonth::new(1)), + Value::IntervalDayTime(IntervalDayTime::new(1, 15)), + Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)), + Value::Decimal128(Decimal128::from(16)), + Value::Binary(Bytes::from(vec![0; 12])), + ], + ); + } +} diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index f8701b95c1cd..99ff2c3547e6 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -338,7 +338,7 @@ pub(crate) mod tests { use super::*; use crate::read::BatchColumn; - use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::index::puffin_manager::PuffinManagerFactory; pub fn mock_object_store() -> ObjectStore { @@ -412,7 +412,7 @@ pub(crate) mod tests { pub fn new_batch(str_tag: impl AsRef, u64_field: impl IntoIterator) -> Batch { let fields = vec![SortField::new(ConcreteDataType::string_datatype())]; - let codec = McmpRowCodec::new(fields); + let codec = DensePrimaryKeyCodec::with_fields(fields); let row: [ValueRef; 1] = [str_tag.as_ref().into()]; let primary_key = codec.encode(row.into_iter()).unwrap(); diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs index f2d0bbaf4a7d..23702ba41448 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito2/src/sst/index/codec.rs @@ -20,7 +20,7 @@ use store_api::metadata::ColumnMetadata; use store_api::storage::ColumnId; use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result}; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField}; /// Encodes index values according to their data types for sorting and storage use. pub struct IndexValueCodec; @@ -66,7 +66,7 @@ pub struct IndexValuesCodec { /// The data types of tag columns. fields: Vec, /// The decoder for the primary key. - decoder: McmpRowCodec, + decoder: DensePrimaryKeyCodec, } impl IndexValuesCodec { @@ -81,7 +81,7 @@ impl IndexValuesCodec { }) .unzip(); - let decoder = McmpRowCodec::new(fields.clone()); + let decoder = DensePrimaryKeyCodec::with_fields(fields.clone()); Self { column_ids, fields, @@ -94,7 +94,7 @@ impl IndexValuesCodec { &self, primary_key: &[u8], ) -> Result)>> { - let values = self.decoder.decode(primary_key)?; + let values = self.decoder.decode_dense(primary_key)?; let iter = values .into_iter() @@ -119,6 +119,7 @@ mod tests { use super::*; use crate::error::Error; + use crate::row_converter::{PrimaryKeyCodecExt, SortField}; #[test] fn test_encode_value_basic() { @@ -165,7 +166,7 @@ mod tests { }, ]; - let primary_key = McmpRowCodec::new(vec![ + let primary_key = DensePrimaryKeyCodec::with_fields(vec![ SortField::new(ConcreteDataType::string_datatype()), SortField::new(ConcreteDataType::int64_datatype()), ]) diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 138035d554a1..54c671788357 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -318,7 +318,7 @@ mod tests { use crate::cache::index::inverted_index::InvertedIndexCache; use crate::metrics::CACHE_BYTES; use crate::read::BatchColumn; - use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location; @@ -384,7 +384,7 @@ mod tests { SortField::new(ConcreteDataType::string_datatype()), SortField::new(ConcreteDataType::int32_datatype()), ]; - let codec = McmpRowCodec::new(fields); + let codec = DensePrimaryKeyCodec::with_fields(fields); let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()]; let primary_key = codec.encode(row.into_iter()).unwrap(); diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index bc333a6df544..607a15ac883b 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -33,7 +33,7 @@ use crate::read::compat::CompatBatch; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::prune::PruneReader; use crate::read::Batch; -use crate::row_converter::{McmpRowCodec, RowCodec}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; @@ -156,7 +156,7 @@ impl FileRangeContext { reader_builder: RowGroupReaderBuilder, filters: Vec, read_format: ReadFormat, - codec: McmpRowCodec, + codec: DensePrimaryKeyCodec, ) -> Self { Self { reader_builder, @@ -241,7 +241,7 @@ pub(crate) struct RangeBase { /// Helper to read the SST. pub(crate) read_format: ReadFormat, /// Decoder for primary keys - pub(crate) codec: McmpRowCodec, + pub(crate) codec: DensePrimaryKeyCodec, /// Optional helper to compat batches. pub(crate) compat_batch: Option, } diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 125621044511..e10d29d7acb4 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -48,7 +48,7 @@ use crate::error::{ ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, }; use crate::read::{Batch, BatchBuilder, BatchColumn}; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField}; use crate::sst::file::{FileMeta, FileTimeRange}; use crate::sst::to_sst_arrow_schema; @@ -402,8 +402,9 @@ impl ReadFormat { return None; } - let converter = - McmpRowCodec::new(vec![SortField::new(column.column_schema.data_type.clone())]); + let converter = DensePrimaryKeyCodec::with_fields(vec![SortField::new( + column.column_schema.data_type.clone(), + )]); let values = row_groups.iter().map(|meta| { let stats = meta .borrow() @@ -421,8 +422,7 @@ impl ReadFormat { Statistics::Double(_) => None, Statistics::ByteArray(s) => { let bytes = if is_min { s.min_bytes() } else { s.max_bytes() }; - let mut values = converter.decode(bytes).ok()?; - values.pop() + converter.decode_leftmost(bytes).ok()? } Statistics::FixedLenByteArray(_) => None, } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index f15f2759df0e..6854c072a1a3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -49,7 +49,7 @@ use crate::metrics::{ }; use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; -use crate::row_converter::{McmpRowCodec, SortField}; +use crate::row_converter::DensePrimaryKeyCodec; use crate::sst::file::FileHandle; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; @@ -253,13 +253,7 @@ impl ParquetReaderBuilder { vec![] }; - let codec = McmpRowCodec::new( - read_format - .metadata() - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect(), - ); + let codec = DensePrimaryKeyCodec::new(read_format.metadata()); let context = FileRangeContext::new(reader_builder, filters, read_format, codec); diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 72e32c3f0ae5..373218e91f4c 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -36,7 +36,7 @@ use crate::memtable::{ BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, MemtableRef, MemtableStats, }; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; /// Empty memtable for test. #[derive(Debug, Default)] @@ -313,12 +313,7 @@ pub(crate) fn encode_keys( key_values: &KeyValues, keys: &mut Vec>, ) { - let row_codec = McmpRowCodec::new( - metadata - .primary_key_columns() - .map(|c| SortField::new(c.column_schema.data_type.clone())) - .collect(), - ); + let row_codec = DensePrimaryKeyCodec::new(metadata); for kv in key_values.iter() { let key = row_codec.encode(kv.primary_keys()).unwrap(); keys.push(key); @@ -327,7 +322,7 @@ pub(crate) fn encode_keys( /// Encode one key. pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec { - let row_codec = McmpRowCodec::new(vec![ + let row_codec = DensePrimaryKeyCodec::with_fields(vec![ SortField::new(ConcreteDataType::string_datatype()), SortField::new(ConcreteDataType::uint32_datatype()), ]); diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index ecce870d851e..cb0b3d12fb84 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -29,7 +29,7 @@ use store_api::metadata::{ use store_api::storage::RegionId; use crate::read::{Batch, BatchBuilder, Source}; -use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader}; @@ -87,7 +87,7 @@ pub fn new_primary_key(tags: &[&str]) -> Vec { let fields = (0..tags.len()) .map(|_| SortField::new(ConcreteDataType::string_datatype())) .collect(); - let converter = McmpRowCodec::new(fields); + let converter = DensePrimaryKeyCodec::with_fields(fields); converter .encode(tags.iter().map(|tag| ValueRef::String(tag))) .unwrap() diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 200ed1913fa6..0b5ee62aadaa 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -17,13 +17,12 @@ use std::sync::Arc; use common_telemetry::info; -use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataBuilder; use store_api::region_request::{AffectedRows, RegionCreateRequest}; use store_api::storage::RegionId; -use crate::error::{InvalidMetadataSnafu, Result}; +use crate::error::Result; use crate::region::opener::{check_recovered_region, RegionOpener}; use crate::worker::RegionWorkerLoop; @@ -52,7 +51,7 @@ impl RegionWorkerLoop { builder.push_column_metadata(column); } builder.primary_key(request.primary_key); - let metadata = builder.build().context(InvalidMetadataSnafu)?; + // Create a MitoRegion from the RegionMetadata. let region = RegionOpener::new( region_id, @@ -64,7 +63,7 @@ impl RegionWorkerLoop { self.intermediate_manager.clone(), self.time_provider.clone(), ) - .metadata(metadata) + .metadata_builder(builder) .parse_options(request.options)? .cache(Some(self.cache_manager.clone())) .create_or_open(&self.config, &self.wal) diff --git a/src/store-api/src/codec.rs b/src/store-api/src/codec.rs new file mode 100644 index 000000000000..6ddcb6d17829 --- /dev/null +++ b/src/store-api/src/codec.rs @@ -0,0 +1,26 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +/// Primary key encoding mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PrimaryKeyEncoding { + #[default] + /// Dense primary key encoding. + Dense, + /// Sparse primary key encoding. + Sparse, +} diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index 95ea196f7139..cd0416dc29e7 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -15,6 +15,7 @@ //! Storage related APIs +pub mod codec; pub mod data_source; pub mod logstore; pub mod manifest; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 57351e19ca99..0c1b55a95a6c 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -33,6 +33,7 @@ use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use snafu::{ensure, Location, OptionExt, ResultExt, Snafu}; +use crate::codec::PrimaryKeyEncoding; use crate::region_request::{AddColumn, AddColumnLocation, AlterKind, ModifyColumnType}; use crate::storage::consts::is_internal_column; use crate::storage::{ColumnId, RegionId}; @@ -145,6 +146,9 @@ pub struct RegionMetadata { /// /// The version starts from 0. Altering the schema bumps the version. pub schema_version: u64, + + /// Primary key encoding mode. + pub primary_key_encoding: PrimaryKeyEncoding, } impl fmt::Debug for RegionMetadata { @@ -173,6 +177,8 @@ impl<'de> Deserialize<'de> for RegionMetadata { primary_key: Vec, region_id: RegionId, schema_version: u64, + #[serde(default)] + primary_key_encoding: PrimaryKeyEncoding, } let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?; @@ -187,6 +193,7 @@ impl<'de> Deserialize<'de> for RegionMetadata { primary_key: without_schema.primary_key, region_id: without_schema.region_id, schema_version: without_schema.schema_version, + primary_key_encoding: without_schema.primary_key_encoding, }) } } @@ -320,6 +327,7 @@ impl RegionMetadata { primary_key: projected_primary_key, region_id: self.region_id, schema_version: self.schema_version, + primary_key_encoding: self.primary_key_encoding, }) } @@ -504,6 +512,7 @@ pub struct RegionMetadataBuilder { column_metadatas: Vec, primary_key: Vec, schema_version: u64, + primary_key_encoding: PrimaryKeyEncoding, } impl RegionMetadataBuilder { @@ -514,6 +523,7 @@ impl RegionMetadataBuilder { column_metadatas: vec![], primary_key: vec![], schema_version: 0, + primary_key_encoding: PrimaryKeyEncoding::Dense, } } @@ -524,9 +534,16 @@ impl RegionMetadataBuilder { primary_key: existing.primary_key, region_id: existing.region_id, schema_version: existing.schema_version, + primary_key_encoding: existing.primary_key_encoding, } } + /// Sets the primary key encoding mode. + pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self { + self.primary_key_encoding = encoding; + self + } + /// Pushes a new column metadata to this region's metadata. pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self { self.column_metadatas.push(column_metadata); @@ -582,6 +599,7 @@ impl RegionMetadataBuilder { primary_key: self.primary_key, region_id: self.region_id, schema_version: self.schema_version, + primary_key_encoding: self.primary_key_encoding, }; meta.validate()?; @@ -1515,4 +1533,18 @@ mod test { let formatted = format!("{:?}", region_metadata); assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }"); } + + #[test] + fn test_region_metadata_deserialize_default_primary_key_encoding() { + let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#; + let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap(); + assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense); + + let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#; + let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap(); + assert_eq!( + deserialized.primary_key_encoding, + PrimaryKeyEncoding::Sparse + ); + } } diff --git a/tests/cases/standalone/common/information_schema/region_statistics.result b/tests/cases/standalone/common/information_schema/region_statistics.result index b4f931300a12..9c7addf316d2 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.result +++ b/tests/cases/standalone/common/information_schema/region_statistics.result @@ -29,7 +29,7 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) +-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ | SUM(information_schema.region_statistics.region_rows) | SUM(information_schema.region_statistics.disk_size) | SUM(information_schema.region_statistics.sst_size) | SUM(information_schema.region_statistics.index_size) | +-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ -| 3 | 2145 | 0 | 0 | +| 3 | 2238 | 0 | 0 | +-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';