Skip to content

Commit

Permalink
feat: introduce PrimaryKeyEncoding (#5312)
Browse files Browse the repository at this point in the history
* feat: introduce `PrimaryKeyEncoding`

* fix: fix unit tests

* chore: add empty line

* test: add unit tests

* chore: fmt code

* refactor: introduce new codec trait to support various encoding

* fix: fix unit tests

* chore: update sqlness result

* chore: apply suggestions from CR

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Jan 15, 2025
1 parent 57f8afc commit b64c075
Show file tree
Hide file tree
Showing 31 changed files with 1,042 additions and 775 deletions.
17 changes: 13 additions & 4 deletions src/mito2/benches/memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{KeyValues, Memtable};
use mito2::region::options::MergeMode;
use mito2::row_converter::DensePrimaryKeyCodec;
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
use rand::rngs::ThreadRng;
use rand::seq::SliceRandom;
Expand All @@ -43,8 +44,14 @@ fn write_rows(c: &mut Criterion) {
// Note that this test only generate one time series.
let mut group = c.benchmark_group("write");
group.bench_function("partition_tree", |b| {
let memtable =
PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default());
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig::default(),
);
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
b.iter(|| {
Expand All @@ -71,7 +78,8 @@ fn full_scan(c: &mut Criterion) {
let mut group = c.benchmark_group("full_scan");
group.sample_size(10);
group.bench_function("partition_tree", |b| {
let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config);
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(1, codec, metadata.clone(), None, &config);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}
Expand Down Expand Up @@ -108,7 +116,8 @@ fn filter_1_host(c: &mut Criterion) {
let mut group = c.benchmark_group("filter_1_host");
group.sample_size(10);
group.bench_function("partition_tree", |b| {
let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config);
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(1, codec, metadata.clone(), None, &config);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ async fn test_region_usage() {
// region is empty now, check manifest size
let region = engine.get_region(region_id).unwrap();
let region_stat = region.region_statistic();
assert_eq!(region_stat.manifest_size, 686);
assert_eq!(region_stat.manifest_size, 717);

// put some rows
let rows = Rows {
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,6 @@ mod test {

// get manifest size again
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, 1173);
assert_eq!(manifest_size, 1204);
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async fn manager_with_checkpoint_distance_1() {
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json =
"{\"size\":848,\"version\":10,\"checksum\":4186457347,\"extend_metadata\":{}}";
"{\"size\":879,\"version\":10,\"checksum\":2245967096,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);

// reopen the manager
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/memtable/bulk/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::row_converter::McmpRowCodec;
use crate::row_converter::DensePrimaryKeyCodec;
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;
Expand All @@ -41,7 +41,7 @@ impl BulkIterContext {
projection: &Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Self {
let codec = McmpRowCodec::new_with_primary_keys(&region_metadata);
let codec = DensePrimaryKeyCodec::new(&region_metadata);

let simple_filters = predicate
.as_ref()
Expand Down
12 changes: 6 additions & 6 deletions src/mito2/src/memtable/bulk/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::key_values::KeyValuesRef;
use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
Expand Down Expand Up @@ -103,7 +103,7 @@ pub struct BulkPartMeta {

pub struct BulkPartEncoder {
metadata: RegionMetadataRef,
pk_encoder: McmpRowCodec,
pk_encoder: DensePrimaryKeyCodec,
row_group_size: usize,
dedup: bool,
writer_props: Option<WriterProperties>,
Expand All @@ -115,7 +115,7 @@ impl BulkPartEncoder {
dedup: bool,
row_group_size: usize,
) -> BulkPartEncoder {
let codec = McmpRowCodec::new_with_primary_keys(&metadata);
let codec = DensePrimaryKeyCodec::new(&metadata);
let writer_props = Some(
WriterProperties::builder()
.set_write_batch_size(row_group_size)
Expand Down Expand Up @@ -174,7 +174,7 @@ impl BulkPartEncoder {
fn mutations_to_record_batch(
mutations: &[Mutation],
metadata: &RegionMetadataRef,
pk_encoder: &McmpRowCodec,
pk_encoder: &DensePrimaryKeyCodec,
dedup: bool,
) -> Result<Option<(RecordBatch, i64, i64)>> {
let total_rows: usize = mutations
Expand Down Expand Up @@ -538,7 +538,7 @@ mod tests {
.map(|r| r.rows.len())
.sum();

let pk_encoder = McmpRowCodec::new_with_primary_keys(&metadata);
let pk_encoder = DensePrimaryKeyCodec::new(&metadata);

let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
.unwrap()
Expand All @@ -557,7 +557,7 @@ mod tests {
let batch_values = batches
.into_iter()
.map(|b| {
let pk_values = pk_encoder.decode(b.primary_key()).unwrap();
let pk_values = pk_encoder.decode_dense(b.primary_key()).unwrap();
let timestamps = b
.timestamps()
.as_any()
Expand Down
59 changes: 41 additions & 18 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
pub(crate) use partition::DensePrimaryKeyFilter;
use serde::{Deserialize, Serialize};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
Expand All @@ -43,6 +45,7 @@ use crate::memtable::{
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
};
use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec};

/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
Expand Down Expand Up @@ -263,13 +266,14 @@ impl PartitionTreeMemtable {
/// Returns a new memtable.
pub fn new(
id: MemtableId,
row_codec: Arc<dyn PrimaryKeyCodec>,
metadata: RegionMetadataRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
config: &PartitionTreeConfig,
) -> Self {
Self::with_tree(
id,
PartitionTree::new(metadata, config, write_buffer_manager.clone()),
PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
)
}

Expand Down Expand Up @@ -320,12 +324,22 @@ impl PartitionTreeMemtableBuilder {

impl MemtableBuilder for PartitionTreeMemtableBuilder {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(PartitionTreeMemtable::new(
id,
metadata.clone(),
self.write_buffer_manager.clone(),
&self.config,
))
match metadata.primary_key_encoding {
PrimaryKeyEncoding::Dense => {
let codec = Arc::new(DensePrimaryKeyCodec::new(metadata));
Arc::new(PartitionTreeMemtable::new(
id,
codec,
metadata.clone(),
self.write_buffer_manager.clone(),
&self.config,
))
}
PrimaryKeyEncoding::Sparse => {
//TODO(weny): Implement sparse primary key encoding.
todo!()
}
}
}
}

Expand Down Expand Up @@ -358,7 +372,7 @@ mod tests {
use store_api::storage::RegionId;

use super::*;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::test_util::memtable_util::{
self, collect_iter_timestamps, region_metadata_to_row_schema,
};
Expand All @@ -378,8 +392,14 @@ mod tests {
let timestamps = (0..100).collect::<Vec<_>>();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
let memtable =
PartitionTreeMemtable::new(1, metadata, None, &PartitionTreeConfig::default());
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig::default(),
);
memtable.write(&kvs).unwrap();

let expected_ts = kvs
Expand Down Expand Up @@ -414,8 +434,14 @@ mod tests {
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
let memtable =
PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default());
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig::default(),
);

let kvs = memtable_util::build_key_values(
&metadata,
Expand Down Expand Up @@ -510,8 +536,10 @@ mod tests {

fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig {
Expand Down Expand Up @@ -719,12 +747,7 @@ mod tests {
)
.build(1, &metadata);

let codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
let codec = DensePrimaryKeyCodec::new(&metadata);

memtable
.write(&build_key_values(
Expand Down
Loading

0 comments on commit b64c075

Please sign in to comment.