Skip to content

Commit

Permalink
refactor: use SerdeVecMap in PersistedSnapshot (#25541)
Browse files Browse the repository at this point in the history
* refactor: use SerdeVecMap in PersistedSnapshot

This changes from the use of a HashMap to store the DB -> Table structure
in the PersistedSnapshot files to using a SerdeVecMap, which will have
the identifiers serialized as integers instead of strings.

* test: add a snapshot test for persisted snapshots
  • Loading branch information
hiltontj authored Nov 12, 2024
1 parent 814eb31 commit 2ac3df1
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 13 deletions.
23 changes: 19 additions & 4 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use datafusion::prelude::Expr;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_id::ParquetFileId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_id::{ColumnId, DbId};
use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber};
use iox_query::QueryChunk;
use iox_time::Time;
use last_cache::LastCacheProvider;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -180,7 +180,7 @@ pub struct PersistedSnapshot {
pub max_time: i64,
/// The collection of databases that had tables persisted in this snapshot. The tables will then have their
/// name and the parquet file.
pub databases: HashMap<DbId, DatabaseTables>,
pub databases: SerdeVecMap<DbId, DatabaseTables>,
}

impl PersistedSnapshot {
Expand All @@ -203,7 +203,7 @@ impl PersistedSnapshot {
row_count: 0,
min_time: i64::MAX,
max_time: i64::MIN,
databases: HashMap::new(),
databases: SerdeVecMap::new(),
}
}

Expand Down Expand Up @@ -232,7 +232,7 @@ impl PersistedSnapshot {

#[derive(Debug, Serialize, Deserialize, Default, Eq, PartialEq, Clone)]
pub struct DatabaseTables {
pub tables: hashbrown::HashMap<TableId, Vec<ParquetFile>>,
pub tables: SerdeVecMap<TableId, Vec<ParquetFile>>,
}

/// The summary data for a persisted parquet file in a snapshot.
Expand All @@ -256,6 +256,21 @@ impl ParquetFile {
}
}

#[cfg(test)]
impl ParquetFile {
pub(crate) fn create_for_test(path: impl Into<String>) -> Self {
Self {
id: ParquetFileId::new(),
path: path.into(),
size_bytes: 1024,
row_count: 1,
chunk_time: 0,
min_time: 0,
max_time: 1,
}
}
}

/// The precision of the timestamp
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
Expand Down
85 changes: 76 additions & 9 deletions influxdb3_write/src/persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
#[cfg(test)]
mod tests {
use super::*;
use crate::ParquetFileId;
use crate::{DatabaseTables, ParquetFile, ParquetFileId};
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
use object_store::memory::InMemory;
use observability_deps::tracing::info;
Expand All @@ -407,7 +407,7 @@ mod tests {
arrow::array::Int32Array, arrow::datatypes::DataType, arrow::datatypes::Field,
arrow::datatypes::Schema, chrono::Utc,
datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder,
object_store::local::LocalFileSystem, std::collections::HashMap,
object_store::local::LocalFileSystem,
};

#[tokio::test]
Expand Down Expand Up @@ -466,7 +466,7 @@ mod tests {
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: CatalogSequenceNumber::new(0),
databases: HashMap::new(),
databases: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
Expand All @@ -490,7 +490,7 @@ mod tests {
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: HashMap::new(),
databases: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
Expand All @@ -505,7 +505,7 @@ mod tests {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
wal_file_sequence_number: WalFileSequenceNumber::new(1),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: HashMap::new(),
databases: SerdeVecMap::new(),
max_time: 1,
min_time: 0,
row_count: 0,
Expand All @@ -520,7 +520,7 @@ mod tests {
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
wal_file_sequence_number: WalFileSequenceNumber::new(2),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: HashMap::new(),
databases: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
Expand Down Expand Up @@ -556,7 +556,7 @@ mod tests {
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: HashMap::new(),
databases: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
Expand Down Expand Up @@ -585,7 +585,7 @@ mod tests {
snapshot_sequence_number: SnapshotSequenceNumber::new(id),
wal_file_sequence_number: WalFileSequenceNumber::new(id),
catalog_sequence_number: CatalogSequenceNumber::new(id as u32),
databases: HashMap::new(),
databases: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
Expand Down Expand Up @@ -654,6 +654,73 @@ mod tests {
assert!(snapshots.is_empty());
}

#[test]
fn persisted_snapshot_structure() {
let databases = [
(
DbId::new(),
DatabaseTables {
tables: [
(
TableId::new(),
vec![
ParquetFile::create_for_test("1.parquet"),
ParquetFile::create_for_test("2.parquet"),
],
),
(
TableId::new(),
vec![
ParquetFile::create_for_test("3.parquet"),
ParquetFile::create_for_test("4.parquet"),
],
),
]
.into(),
},
),
(
DbId::new(),
DatabaseTables {
tables: [
(
TableId::new(),
vec![
ParquetFile::create_for_test("5.parquet"),
ParquetFile::create_for_test("6.parquet"),
],
),
(
TableId::new(),
vec![
ParquetFile::create_for_test("7.parquet"),
ParquetFile::create_for_test("8.parquet"),
],
),
]
.into(),
},
),
]
.into();
let snapshot = PersistedSnapshot {
host_id: "host".to_string(),
next_file_id: ParquetFileId::new(),
next_db_id: DbId::new(),
next_table_id: TableId::new(),
next_column_id: ColumnId::new(),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: CatalogSequenceNumber::new(0),
parquet_size_bytes: 1_024,
row_count: 1,
min_time: 0,
max_time: 1,
databases,
};
insta::assert_json_snapshot!(snapshot);
}

#[tokio::test]
async fn get_parquet_bytes() {
let local_disk =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
---
source: influxdb3_write/src/persister.rs
expression: snapshot
---
{
"host_id": "host",
"next_file_id": 8,
"next_db_id": 2,
"next_table_id": 4,
"next_column_id": 0,
"snapshot_sequence_number": 0,
"wal_file_sequence_number": 0,
"catalog_sequence_number": 0,
"parquet_size_bytes": 1024,
"row_count": 1,
"min_time": 0,
"max_time": 1,
"databases": [
[
0,
{
"tables": [
[
0,
[
{
"id": 0,
"path": "1.parquet",
"size_bytes": 1024,
"row_count": 1,
"chunk_time": 0,
"min_time": 0,
"max_time": 1
},
{
"id": 1,
"path": "2.parquet",
"size_bytes": 1024,
"row_count": 1,
"chunk_time": 0,
"min_time": 0,
"max_time": 1
}
]
],
[
1,
[
{
"id": 2,
"path": "3.parquet",
"size_bytes": 1024,
"row_count": 1,
"chunk_time": 0,
"min_time": 0,
"max_time": 1
},
{
"id": 3,
"path": "4.parquet",
"size_bytes": 1024,
"row_count": 1,
"chunk_time": 0,
"min_time": 0,
"max_time": 1
}
]
]
]
}
],
[
1,
{
"tables": [
[
2,
[
{
"id": 4,
"path": "5.parquet",
"size_bytes": 1024,
"row_count": 1,
"chunk_time": 0,
"min_time": 0,
"max_time": 1
},
{
"id": 5,
"path": "6.parquet",
"size_bytes": 1024,
"row_count": 1,
"chunk_time": 0,
"min_time": 0,
"max_time": 1
}
]
],
[
3,
[
{
"id": 6,
"path": "7.parquet",
"size_bytes": 1024,
"row_count": 1,
"chunk_time": 0,
"min_time": 0,
"max_time": 1
},
{
"id": 7,
"path": "8.parquet",
"size_bytes": 1024,
"row_count": 1,
"chunk_time": 0,
"min_time": 0,
"max_time": 1
}
]
]
]
}
]
]
}

0 comments on commit 2ac3df1

Please sign in to comment.