Skip to content

Commit

Permalink
feat: add column names to last cache sys table
Browse files Browse the repository at this point in the history
closes: #25511
  • Loading branch information
praveen-influx committed Nov 8, 2024
1 parent 391b67f commit 5c0838c
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 52 deletions.
42 changes: 21 additions & 21 deletions influxdb3/tests/server/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,12 @@ async fn last_caches_table() {
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!(
[
"+-------+---------------------+-------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+---------------------+-------------+---------------+-------+-------+",
"| cpu | cpu_host_last_cache | [1] | | 1 | 14400 |",
"| mem | mem_last_cache | [6, 5] | [7, 8] | 1 | 60 |",
"+-------+---------------------+-------------+---------------+-------+-------+",
"+-------+---------------------+----------------+------------------+------------------+--------------------+-------+-------+",
"| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |",
"+-------+---------------------+----------------+------------------+------------------+--------------------+-------+-------+",
"| cpu | cpu_host_last_cache | [1] | [host] | | | 1 | 14400 |",
"| mem | mem_last_cache | [6, 5] | [host, region] | [7, 8] | [usage, time] | 1 | 60 |",
"+-------+---------------------+----------------+------------------+------------------+--------------------+-------+-------+",
],
&batches
);
Expand All @@ -204,11 +204,11 @@ async fn last_caches_table() {
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [11, 10, 9] | | 5 | 14400 |",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
"| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [11, 10, 9] | [cpu, host, region] | | | 5 | 14400 |",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
],
&batches
);
Expand Down Expand Up @@ -237,11 +237,11 @@ async fn last_caches_table() {
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!(
[
"+-------+----------------+-------------+---------------+-------+-----+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+----------------+-------------+---------------+-------+-----+",
"| mem | mem_last_cache | [6, 5] | [7, 8] | 1 | 60 |",
"+-------+----------------+-------------+---------------+-------+-----+",
"+-------+----------------+----------------+------------------+------------------+--------------------+-------+-----+",
"| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |",
"+-------+----------------+----------------+------------------+------------------+--------------------+-------+-----+",
"| mem | mem_last_cache | [6, 5] | [host, region] | [7, 8] | [usage, time] | 1 | 60 |",
"+-------+----------------+----------------+------------------+------------------+--------------------+-------+-----+",
],
&batches
);
Expand All @@ -268,11 +268,11 @@ async fn last_caches_table() {
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [11, 10, 9] | | 5 | 14400 |",
"+-------+--------------------------------+-------------+---------------+-------+-------+",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
"| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [11, 10, 9] | [cpu, host, region] | | | 5 | 14400 |",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
],
&batches
);
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,8 +948,8 @@ impl TableDefinition {
self.column_map.get_by_right(&name.into()).copied()
}

pub fn column_id_to_name(&self, id: ColumnId) -> Option<Arc<str>> {
self.column_map.get_by_left(&id).cloned()
pub fn column_id_to_name(&self, id: &ColumnId) -> Option<Arc<str>> {
self.column_map.get_by_left(id).cloned()
}

pub fn column_name_to_id_unchecked(&self, name: Arc<str>) -> ColumnId {
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl Database {
query_log: Arc<QueryLog>,
) -> Self {
let system_schema_provider = Arc::new(SystemSchemaProvider::new(
db_schema.id,
Arc::clone(&db_schema),
Arc::clone(&query_log),
Arc::clone(&write_buffer),
));
Expand Down
96 changes: 76 additions & 20 deletions influxdb3_server/src/system_tables/last_caches.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
use std::sync::Arc;

use arrow::array::{GenericListBuilder, UInt32Builder};
use arrow::array::{GenericListBuilder, StringViewBuilder, UInt32Builder};
use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{error::DataFusionError, logical_expr::Expr};
use influxdb3_id::DbId;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef};
use influxdb3_write::last_cache::LastCacheProvider;
use iox_system_tables::IoxSystemTable;

pub(super) struct LastCachesTable {
db_id: DbId,
db_schema: Arc<DatabaseSchema>,
schema: SchemaRef,
provider: Arc<LastCacheProvider>,
}

impl LastCachesTable {
pub(super) fn new(db_id: DbId, provider: Arc<LastCacheProvider>) -> Self {
pub(super) fn new(db_schema: Arc<DatabaseSchema>, provider: Arc<LastCacheProvider>) -> Self {
Self {
db_id,
db_schema,
schema: last_caches_schema(),
provider,
}
Expand All @@ -30,15 +30,25 @@ fn last_caches_schema() -> SchemaRef {
Field::new("table", DataType::Utf8, false),
Field::new("name", DataType::Utf8, false),
Field::new(
"key_columns",
"key_column_ids",
DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
false,
),
Field::new(
"value_columns",
"key_column_names",
DataType::List(Arc::new(Field::new("item", DataType::Utf8View, true))),
false,
),
Field::new(
"value_column_ids",
DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
true,
),
Field::new(
"value_column_names",
DataType::List(Arc::new(Field::new("item", DataType::Utf8View, true))),
true,
),
Field::new("count", DataType::UInt64, false),
Field::new("ttl", DataType::UInt64, false),
];
Expand All @@ -56,27 +66,28 @@ impl IoxSystemTable for LastCachesTable {
_filters: Option<Vec<Expr>>,
_limit: Option<usize>,
) -> Result<RecordBatch, DataFusionError> {
let caches = self.provider.get_last_caches_for_db(self.db_id);
from_last_cache_definitions(self.schema(), &caches)
let caches = self.provider.get_last_caches_for_db(self.db_schema.id);
from_last_cache_definitions(&self.db_schema, self.schema(), &caches)
}
}

fn from_last_cache_definitions(
schema: SchemaRef,
caches: &[LastCacheDefinition],
db_schema: &DatabaseSchema,
sys_table_schema: SchemaRef,
cache_defns: &[LastCacheDefinition],
) -> Result<RecordBatch, DataFusionError> {
let mut columns: Vec<ArrayRef> = vec![];

// Table Name
columns.push(Arc::new(
caches
cache_defns
.iter()
.map(|c| Some(c.table.to_string()))
.collect::<StringArray>(),
));
// Cache Name
columns.push(Arc::new(
caches
cache_defns
.iter()
.map(|c| Some(c.name.to_string()))
.collect::<StringArray>(),
Expand All @@ -86,21 +97,40 @@ fn from_last_cache_definitions(
let values_builder = UInt32Builder::new();
let mut builder = GenericListBuilder::<i32, _>::new(values_builder);

for c in caches {
for c in cache_defns {
c.key_columns
.iter()
.for_each(|k| builder.values().append_value(k.as_u32()));
builder.append(true);
}
Arc::new(builder.finish())
});
// Value Columns
// Key Column names
columns.push({
let values_builder = StringViewBuilder::new();
let mut builder = GenericListBuilder::<i32, StringViewBuilder>::new(values_builder);

for cache_defn in cache_defns {
cache_defn.key_columns.iter().for_each(|k| {
let table_defn = db_schema
.table_definition_by_id(cache_defn.table_id)
.expect("table should exist for last cache");
let col_name = table_defn
.column_id_to_name(k)
.expect("column id should have name associated to it");
builder.values().append_value(col_name);
});
builder.append(true);
}
Arc::new(builder.finish())
});
// Value Column ids
columns.push({
let values_builder = UInt32Builder::new();
let mut builder = GenericListBuilder::<i32, _>::new(values_builder);

for c in caches {
match &c.value_columns {
for cached_defn in cache_defns {
match &cached_defn.value_columns {
LastCacheValueColumnsDef::Explicit { columns } => {
columns
.iter()
Expand All @@ -114,15 +144,41 @@ fn from_last_cache_definitions(
}
Arc::new(builder.finish())
});
// Value Column names
columns.push({
let values_builder = StringViewBuilder::new();
let mut builder = GenericListBuilder::<i32, _>::new(values_builder);

for c in cache_defns {
match &c.value_columns {
LastCacheValueColumnsDef::Explicit { columns } => {
columns.iter().for_each(|v| {
let table_defn = db_schema
.table_definition_by_id(c.table_id)
.expect("table should exist for last cache");
let col_name = table_defn
.column_id_to_name(v)
.expect("column id should have name associated to it");
builder.values().append_value(col_name);
});
builder.append(true);
}
LastCacheValueColumnsDef::AllNonKeyColumns => {
builder.append_null();
}
}
}
Arc::new(builder.finish())
});
columns.push(Arc::new(
caches
cache_defns
.iter()
.map(|e| Some(e.count.into()))
.collect::<UInt64Array>(),
));
columns.push(Arc::new(
caches.iter().map(|e| Some(e.ttl)).collect::<UInt64Array>(),
cache_defns.iter().map(|e| Some(e.ttl)).collect::<UInt64Array>(),
));

Ok(RecordBatch::try_new(schema, columns)?)
Ok(RecordBatch::try_new(sys_table_schema, columns)?)
}
13 changes: 9 additions & 4 deletions influxdb3_server/src/system_tables/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{any::Any, collections::HashMap, sync::Arc};

use datafusion::{catalog::SchemaProvider, datasource::TableProvider, error::DataFusionError};
use influxdb3_id::DbId;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_write::WriteBuffer;
use iox_query::query_log::QueryLog;
use iox_system_tables::SystemTableProvider;
Expand Down Expand Up @@ -38,19 +38,24 @@ impl std::fmt::Debug for SystemSchemaProvider {
}

impl SystemSchemaProvider {
pub(crate) fn new(db_id: DbId, query_log: Arc<QueryLog>, buffer: Arc<dyn WriteBuffer>) -> Self {
pub(crate) fn new(
db_schema: Arc<DatabaseSchema>,
query_log: Arc<QueryLog>,
buffer: Arc<dyn WriteBuffer>,
) -> Self {
let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new();
let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new(
query_log,
))));
tables.insert(QUERIES_TABLE_NAME, queries);
let last_caches = Arc::new(SystemTableProvider::new(Arc::new(LastCachesTable::new(
db_id,
Arc::clone(&db_schema),
buffer.last_cache_provider(),
))));
tables.insert(LAST_CACHES_TABLE_NAME, last_caches);
let parquet_files = Arc::new(SystemTableProvider::new(Arc::new(ParquetFilesTable::new(
db_id, buffer,
db_schema.id,
buffer,
))));
tables.insert(PARQUET_FILES_TABLE_NAME, parquet_files);
Self { tables }
Expand Down
6 changes: 3 additions & 3 deletions influxdb3_write/src/last_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl LastCacheProvider {
.iter()
.map(|id| {
table_def
.column_id_to_name(*id)
.column_id_to_name(id)
.map(|name| (*id, name))
.ok_or(Error::KeyColumnDoesNotExist { column_id: *id })
})
Expand All @@ -144,7 +144,7 @@ impl LastCacheProvider {
.iter()
.map(|id| {
table_def
.column_id_to_name(*id)
.column_id_to_name(id)
.map(|name| (*id, name))
.ok_or(Error::ValueColumnDoesNotExist { column_id: *id })
})
Expand Down Expand Up @@ -408,7 +408,7 @@ impl LastCacheProvider {
.iter()
.map(|id| {
table_def
.column_id_to_name(*id)
.column_id_to_name(id)
.map(|name| (*id, name))
.expect("a valid column id for key column")
})
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_write/src/write_buffer/table_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl MutableTableChunk {
let (col_type, col) = builder.into_influxcol_and_arrow();
schema_builder.influx_column(
table_def
.column_id_to_name(col_id)
.column_id_to_name(&col_id)
.expect("valid column id")
.as_ref(),
col_type,
Expand Down

0 comments on commit 5c0838c

Please sign in to comment.