From 35add260608cce7f9f3e8ac9a719df7f6b9f0eb4 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 21 Oct 2024 14:44:48 -0400 Subject: [PATCH 1/4] refactor: roll back addition of DatabaseSchemaProvider trait --- influxdb3/src/commands/serve.rs | 4 +- influxdb3_catalog/src/catalog.rs | 79 +++++++++---------- influxdb3_catalog/src/lib.rs | 32 -------- influxdb3_server/src/http.rs | 4 +- influxdb3_server/src/lib.rs | 4 +- influxdb3_server/src/query_executor.rs | 21 +++-- .../src/system_tables/parquet_files.rs | 2 +- influxdb3_write/src/last_cache/mod.rs | 20 +++-- .../src/last_cache/table_function.rs | 2 +- influxdb3_write/src/lib.rs | 4 +- influxdb3_write/src/write_buffer/mod.rs | 25 +++--- .../src/write_buffer/queryable_buffer.rs | 5 +- 12 files changed, 77 insertions(+), 125 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index f42dbe05cf9..047dce8b57b 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -436,7 +436,7 @@ pub async fn command(config: Config) -> Result<()> { .map_err(Error::InitializePersistedCatalog)?, ); - let last_cache = LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _) + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) .map_err(Error::InitializeLastCache)?; info!(instance_id = ?catalog.instance_id(), "Catalog initialized with"); @@ -473,7 +473,7 @@ pub async fn command(config: Config) -> Result<()> { )?; let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs { - db_schema_provider: write_buffer.db_schema_provider(), + catalog: write_buffer.catalog(), write_buffer: Arc::clone(&write_buffer), exec: Arc::clone(&exec), metrics: Arc::clone(&metrics), diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 347012e8ee9..f56e29baa61 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -1,7 +1,6 @@ //! Implementation of the Catalog that sits entirely in memory. use crate::catalog::Error::TableNotFound; -use crate::DatabaseSchemaProvider; use arrow::datatypes::SchemaRef; use bimap::BiHashMap; use influxdb3_id::{ColumnId, DbId, TableId}; @@ -165,6 +164,44 @@ impl Catalog { Ok(db) } + pub fn db_name_to_id(&self, db_name: &str) -> Option { + self.inner.read().db_map.get_by_right(db_name).copied() + } + + pub fn db_id_to_name(&self, db_id: DbId) -> Option> { + self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone) + } + + pub fn db_schema(&self, db_name: &str) -> Option> { + self.db_schema_and_id(db_name).map(|(_, schema)| schema) + } + + pub fn db_schema_by_id(&self, db_id: DbId) -> Option> { + self.inner.read().databases.get(&db_id).cloned() + } + + pub fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc)> { + let inner = self.inner.read(); + let db_id = inner.db_map.get_by_right(db_name)?; + inner + .databases + .get(db_id) + .map(|db| (*db_id, Arc::clone(db))) + } + + pub fn db_names(&self) -> Vec { + self.inner + .read() + .databases + .values() + .map(|db| db.name.to_string()) + .collect() + } + + pub fn list_db_schema(&self) -> Vec> { + self.inner.read().databases.values().cloned().collect() + } + pub fn sequence_number(&self) -> SequenceNumber { self.inner.read().sequence } @@ -243,46 +280,6 @@ impl Catalog { } } -impl DatabaseSchemaProvider for Catalog { - fn db_name_to_id(&self, db_name: &str) -> Option { - self.inner.read().db_map.get_by_right(db_name).copied() - } - - fn db_id_to_name(&self, db_id: DbId) -> Option> { - self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone) - } - - fn db_schema(&self, db_name: &str) -> Option> { - self.db_schema_and_id(db_name).map(|(_, schema)| schema) - } - - fn db_schema_by_id(&self, db_id: DbId) -> Option> { - self.inner.read().databases.get(&db_id).cloned() - } - - fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc)> { - let inner = self.inner.read(); - let db_id = inner.db_map.get_by_right(db_name)?; - inner - .databases - .get(db_id) - .map(|db| (*db_id, Arc::clone(db))) - } - - fn db_names(&self) -> Vec { - self.inner - .read() - .databases - .values() - .map(|db| db.name.to_string()) - .collect() - } - - fn list_db_schema(&self) -> Vec> { - self.inner.read().databases.values().cloned().collect() - } -} - #[serde_with::serde_as] #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)] pub struct InnerCatalog { diff --git a/influxdb3_catalog/src/lib.rs b/influxdb3_catalog/src/lib.rs index 17ea77518b6..a3e6442cbe8 100644 --- a/influxdb3_catalog/src/lib.rs +++ b/influxdb3_catalog/src/lib.rs @@ -1,34 +1,2 @@ -use std::sync::Arc; - -use catalog::DatabaseSchema; -use influxdb3_id::DbId; - pub mod catalog; pub(crate) mod serialize; - -/// Provide [`DatabaseSchema`] and there derivatives where needed. -/// -/// This trait captures the read-only behaviour of the [`catalog::Catalog`] as where it is used to -/// serve queries and provide schema-related info of the underlying database. -pub trait DatabaseSchemaProvider: std::fmt::Debug + Send + Sync + 'static { - /// Convert a database name to its corresponding [`DbId`] - fn db_name_to_id(&self, db_name: &str) -> Option; - - /// Convert a [`DbId`] into its corresponding database name - fn db_id_to_name(&self, db_id: DbId) -> Option>; - - /// Get the [`DatabaseSchema`] for the given name, or `None` otherwise - fn db_schema(&self, db_name: &str) -> Option>; - - /// Get the [`DatabaseSchema`] for the given [`DbId`], or `None` otherwise - fn db_schema_by_id(&self, db_id: DbId) -> Option>; - - /// Get the [`DatabaseSchema`] as well as the corresponding [`DbId`] for the given name - fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc)>; - - /// Get a list of the database names in the underlying catalog'd instance - fn db_names(&self) -> Vec; - - /// List out all [`DatabaseSchema`] in the database - fn list_db_schema(&self) -> Vec>; -} diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index c95cbe78677..e4c1843a611 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -694,7 +694,7 @@ where let (db_id, db_schema) = self .write_buffer - .db_schema_provider() + .catalog() .db_schema_and_id(&db) .ok_or_else(|| WriteBufferError::DbDoesNotExist)?; let table_id = db_schema @@ -741,7 +741,7 @@ where let (db_id, db_schema) = self .write_buffer - .db_schema_provider() + .catalog() .db_schema_and_id(&db) .ok_or_else(|| WriteBufferError::DbDoesNotExist)?; let table_id = db_schema diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index c214f9064ae..ba77cd7ca3e 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -779,7 +779,7 @@ mod tests { influxdb3_write::write_buffer::WriteBufferImpl::new( Arc::clone(&persister), Arc::clone(&catalog), - Arc::new(LastCacheProvider::new_from_db_schema_provider(catalog as _).unwrap()), + Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()), Arc::::clone(&time_provider), Arc::clone(&exec), WalConfig::test_config(), @@ -802,7 +802,7 @@ mod tests { ) .unwrap(); let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs { - db_schema_provider: write_buffer.db_schema_provider(), + catalog: write_buffer.catalog(), write_buffer: Arc::clone(&write_buffer), exec: Arc::clone(&exec), metrics: Arc::clone(&metrics), diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index 82536b933ea..1282ed3ce72 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -18,8 +18,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; use datafusion_util::config::DEFAULT_SCHEMA; use datafusion_util::MemoryStream; -use influxdb3_catalog::catalog::DatabaseSchema; -use influxdb3_catalog::DatabaseSchemaProvider; +use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_write::last_cache::LastCacheFunction; use influxdb3_write::WriteBuffer; @@ -50,7 +49,7 @@ use tracker::{ #[derive(Debug)] pub struct QueryExecutorImpl { - db_schema_provider: Arc, + catalog: Arc, write_buffer: Arc, exec: Arc, datafusion_config: Arc>, @@ -62,7 +61,7 @@ pub struct QueryExecutorImpl { /// Arguments for [`QueryExecutorImpl::new`] #[derive(Debug)] pub struct CreateQueryExecutorArgs { - pub db_schema_provider: Arc, + pub catalog: Arc, pub write_buffer: Arc, pub exec: Arc, pub metrics: Arc, @@ -75,7 +74,7 @@ pub struct CreateQueryExecutorArgs { impl QueryExecutorImpl { pub fn new( CreateQueryExecutorArgs { - db_schema_provider, + catalog, write_buffer, exec, metrics, @@ -96,7 +95,7 @@ impl QueryExecutorImpl { Arc::new(iox_time::SystemProvider::new()), )); Self { - db_schema_provider, + catalog, write_buffer, exec, datafusion_config, @@ -181,7 +180,7 @@ impl QueryExecutor for QueryExecutorImpl { } fn show_databases(&self) -> Result { - let mut databases = self.db_schema_provider.db_names(); + let mut databases = self.catalog.db_names(); // sort them to ensure consistent order: databases.sort_unstable(); let databases = StringArray::from(databases); @@ -200,7 +199,7 @@ impl QueryExecutor for QueryExecutorImpl { let mut databases = if let Some(db) = database { vec![db.to_owned()] } else { - self.db_schema_provider.db_names() + self.catalog.db_names() }; // sort them to ensure consistent order: databases.sort_unstable(); @@ -319,7 +318,7 @@ impl QueryDatabase for QueryExecutorImpl { ) -> Result>, DataFusionError> { let _span_recorder = SpanRecorder::new(span); - let db_schema = self.db_schema_provider.db_schema(name).ok_or_else(|| { + let db_schema = self.catalog.db_schema(name).ok_or_else(|| { DataFusionError::External(Box::new(Error::DatabaseNotFound { db_name: name.into(), })) @@ -675,7 +674,7 @@ mod tests { WriteBufferImpl::new( Arc::clone(&persister), Arc::clone(&catalog), - Arc::new(LastCacheProvider::new_from_db_schema_provider(catalog as _).unwrap()), + Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()), Arc::::clone(&time_provider), Arc::clone(&exec), WalConfig { @@ -696,7 +695,7 @@ mod tests { let metrics = Arc::new(Registry::new()); let datafusion_config = Arc::new(Default::default()); let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs { - db_schema_provider: write_buffer.db_schema_provider(), + catalog: write_buffer.catalog(), write_buffer: Arc::clone(&write_buffer), exec, metrics, diff --git a/influxdb3_server/src/system_tables/parquet_files.rs b/influxdb3_server/src/system_tables/parquet_files.rs index acf14a4b980..df6b9d257b1 100644 --- a/influxdb3_server/src/system_tables/parquet_files.rs +++ b/influxdb3_server/src/system_tables/parquet_files.rs @@ -94,7 +94,7 @@ impl IoxSystemTable for ParquetFilesTable { let parquet_files: Vec = self.buffer.parquet_files( self.db_id, self.buffer - .db_schema_provider() + .catalog() .db_schema_by_id(self.db_id) .expect("db exists") .table_name_to_id(table_name.as_str()) diff --git a/influxdb3_write/src/last_cache/mod.rs b/influxdb3_write/src/last_cache/mod.rs index bb8fcd86d11..bc310f4d7c6 100644 --- a/influxdb3_write/src/last_cache/mod.rs +++ b/influxdb3_write/src/last_cache/mod.rs @@ -23,7 +23,7 @@ use datafusion::{ }; use hashbrown::{HashMap, HashSet}; use indexmap::{IndexMap, IndexSet}; -use influxdb3_catalog::DatabaseSchemaProvider; +use influxdb3_catalog::catalog::Catalog; use influxdb3_id::DbId; use influxdb3_id::TableId; use influxdb3_wal::{ @@ -66,7 +66,7 @@ type CacheMap = RwLock /// Provides all last-N-value caches for the entire database pub struct LastCacheProvider { - db_schema_provider: Arc, + catalog: Arc, cache_map: CacheMap, } @@ -116,15 +116,13 @@ pub struct CreateCacheArguments { } impl LastCacheProvider { - /// Initialize a [`LastCacheProvider`] from a [`DatabaseSchemaProvider`] - pub fn new_from_db_schema_provider( - db_schema_provider: Arc, - ) -> Result { + /// Initialize a [`LastCacheProvider`] from a [`Catalog`] + pub fn new_from_catalog(catalog: Arc) -> Result { let provider = LastCacheProvider { - db_schema_provider: Arc::clone(&db_schema_provider), + catalog: Arc::clone(&catalog), cache_map: Default::default(), }; - for db_schema in db_schema_provider.list_db_schema() { + for db_schema in catalog.list_db_schema() { for table_def in db_schema.tables() { for (cache_name, cache_def) in table_def.last_caches() { assert!( @@ -193,7 +191,7 @@ impl LastCacheProvider { .iter() .flat_map(|(table_id, table_map)| { let table_name = self - .db_schema_provider + .catalog .db_schema_by_id(db) .expect("db exists") .table_id_to_name(*table_id) @@ -1620,7 +1618,7 @@ mod tests { WriteBufferImpl::new( persister, Arc::clone(&catalog), - Arc::new(LastCacheProvider::new_from_db_schema_provider(catalog as _).unwrap()), + Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()), time_provider, crate::test_help::make_exec(), WalConfig::test_config(), @@ -3205,7 +3203,7 @@ mod tests { catalog.insert_database(database); let catalog = Arc::new(catalog); // This is the function we are testing, which initializes the LastCacheProvider from the catalog: - let provider = LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _) + let provider = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _) .expect("create last cache provider from catalog"); // There should be a total of 3 caches: assert_eq!(3, provider.size()); diff --git a/influxdb3_write/src/last_cache/table_function.rs b/influxdb3_write/src/last_cache/table_function.rs index 34f088d4e50..72d5275fc6d 100644 --- a/influxdb3_write/src/last_cache/table_function.rs +++ b/influxdb3_write/src/last_cache/table_function.rs @@ -99,7 +99,7 @@ impl TableFunctionImpl for LastCacheFunction { }; let table_id = self .provider - .db_schema_provider + .catalog .db_schema_by_id(self.db_id) .expect("db exists") .table_name_to_id(table_name.as_str()) diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 7443711b194..37a4aff6909 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -16,8 +16,8 @@ use data_types::{NamespaceName, TimestampMinMax}; use datafusion::catalog::Session; use datafusion::error::DataFusionError; use datafusion::prelude::Expr; +use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::{self, SequenceNumber}; -use influxdb3_catalog::DatabaseSchemaProvider; use influxdb3_id::DbId; use influxdb3_id::ParquetFileId; use influxdb3_id::TableId; @@ -76,7 +76,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static { ) -> write_buffer::Result; /// Returns the database schema provider - fn db_schema_provider(&self) -> Arc; + fn catalog(&self) -> Arc; /// Returns the parquet files for a given database and table fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec; diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index a62bb9bae85..19869a96fd0 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -22,7 +22,7 @@ use datafusion::catalog::Session; use datafusion::common::DataFusionError; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::logical_expr::Expr; -use influxdb3_catalog::{catalog::Catalog, DatabaseSchemaProvider}; +use influxdb3_catalog::catalog::Catalog; use influxdb3_id::{DbId, TableId}; use influxdb3_wal::object_store::WalObjectStore; use influxdb3_wal::CatalogOp::CreateLastCache; @@ -407,7 +407,7 @@ impl Bufferer for WriteBufferImpl { .await } - fn db_schema_provider(&self) -> Arc { + fn catalog(&self) -> Arc { self.catalog() } @@ -587,8 +587,7 @@ mod tests { test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider)); let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); - let last_cache = - LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); let write_buffer = WriteBufferImpl::new( Arc::clone(&persister), catalog, @@ -662,8 +661,7 @@ mod tests { // now load a new buffer from object storage let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); - let last_cache = - LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); let write_buffer = WriteBufferImpl::new( Arc::clone(&persister), catalog, @@ -721,8 +719,7 @@ mod tests { // load a new write buffer to ensure its durable let catalog = Arc::new(wbuf.persister.load_or_create_catalog().await.unwrap()); - let last_cache = - LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); let wbuf = WriteBufferImpl::new( Arc::clone(&wbuf.persister), catalog, @@ -760,8 +757,7 @@ mod tests { // and do another replay and verification let catalog = Arc::new(wbuf.persister.load_or_create_catalog().await.unwrap()); - let last_cache = - LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); let wbuf = WriteBufferImpl::new( Arc::clone(&wbuf.persister), catalog, @@ -818,8 +814,7 @@ mod tests { // do another reload and verify it's gone let catalog = Arc::new(wbuf.persister.load_or_create_catalog().await.unwrap()); - let last_cache = - LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); let wbuf = WriteBufferImpl::new( Arc::clone(&wbuf.persister), catalog, @@ -975,8 +970,7 @@ mod tests { .await .unwrap(), ); - let last_cache = - LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); let write_buffer = WriteBufferImpl::new( Arc::clone(&write_buffer.persister), catalog, @@ -1984,8 +1978,7 @@ mod tests { }; let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap()); - let last_cache = - LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _).unwrap(); + let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap(); let wbuf = WriteBufferImpl::new( Arc::clone(&persister), catalog, diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index d70d0a64583..b7632b08f19 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -14,10 +14,7 @@ use datafusion::common::DataFusionError; use datafusion::logical_expr::Expr; use datafusion_util::stream_from_batches; use hashbrown::HashMap; -use influxdb3_catalog::{ - catalog::{Catalog, DatabaseSchema}, - DatabaseSchemaProvider, -}; +use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{DbId, TableId}; use influxdb3_wal::{CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch}; use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; From cc31481588e694c896ca1a7562c44fe4504c8c42 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 21 Oct 2024 14:51:08 -0400 Subject: [PATCH 2/4] refactor: make parquet metrics optional in telemetry for pro --- influxdb3/src/commands/serve.rs | 6 +++--- influxdb3_telemetry/src/store.rs | 18 ++++++++++-------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 047dce8b57b..e49c7b93aa5 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -458,7 +458,7 @@ pub async fn command(config: Config) -> Result<()> { &config.object_store_config, catalog.instance_id(), num_cpus, - Arc::clone(&write_buffer_impl.persisted_files()), + Some(Arc::clone(&write_buffer_impl.persisted_files())), config.telemetry_endpoint, ) .await; @@ -511,7 +511,7 @@ async fn setup_telemetry_store( object_store_config: &ObjectStoreConfig, instance_id: Arc, num_cpus: usize, - persisted_files: Arc, + persisted_files: Option>, telemetry_endpoint: String, ) -> Arc { let os = std::env::consts::OS; @@ -530,7 +530,7 @@ async fn setup_telemetry_store( Arc::from(influx_version), Arc::from(storage_type), num_cpus, - persisted_files, + persisted_files.map(|p| p as _), telemetry_endpoint, ) .await diff --git a/influxdb3_telemetry/src/store.rs b/influxdb3_telemetry/src/store.rs index b8ec6fb3f58..6a6b6ffebc3 100644 --- a/influxdb3_telemetry/src/store.rs +++ b/influxdb3_telemetry/src/store.rs @@ -26,7 +26,7 @@ use crate::{ #[derive(Debug)] pub struct TelemetryStore { inner: parking_lot::Mutex, - persisted_files: Arc, + persisted_files: Option>, } const SAMPLER_INTERVAL_SECS: u64 = 60; @@ -39,7 +39,7 @@ impl TelemetryStore { influx_version: Arc, storage_type: Arc, cores: usize, - persisted_files: Arc, + persisted_files: Option>, telemetry_endpoint: String, ) -> Arc { debug!( @@ -77,7 +77,7 @@ impl TelemetryStore { let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); Arc::new(TelemetryStore { inner: parking_lot::Mutex::new(inner), - persisted_files, + persisted_files: Some(persisted_files), }) } @@ -117,11 +117,13 @@ impl TelemetryStore { pub(crate) fn snapshot(&self) -> TelemetryPayload { let inner_store = self.inner.lock(); - let (file_count, size_mb, row_count) = self.persisted_files.get_metrics(); let mut payload = inner_store.snapshot(); - payload.parquet_file_count = file_count; - payload.parquet_file_size_mb = size_mb; - payload.parquet_row_count = row_count; + if let Some(persisted_files) = &self.persisted_files { + let (file_count, size_mb, row_count) = persisted_files.get_metrics(); + payload.parquet_file_count = file_count; + payload.parquet_file_size_mb = size_mb; + payload.parquet_row_count = row_count; + } payload } } @@ -305,7 +307,7 @@ mod tests { Arc::from("OSS-v3.0"), Arc::from("Memory"), 10, - parqet_file_metrics, + Some(parqet_file_metrics), "http://localhost/telemetry".to_owned(), ) .await; From a13325658cc6d055fe73c132534fbc9288e285d8 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 21 Oct 2024 14:53:32 -0400 Subject: [PATCH 3/4] refactor: make ParquetFileId Hash --- influxdb3_id/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb3_id/src/lib.rs b/influxdb3_id/src/lib.rs index a0df4cd8ca1..aefa812d588 100644 --- a/influxdb3_id/src/lib.rs +++ b/influxdb3_id/src/lib.rs @@ -117,7 +117,7 @@ impl Display for ColumnId { /// The next file id to be used when persisting `ParquetFile`s pub static NEXT_FILE_ID: AtomicU64 = AtomicU64::new(0); -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone, PartialOrd, Ord)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone, PartialOrd, Ord, Hash)] /// A newtype wrapper for ids used with `ParquetFile` pub struct ParquetFileId(u64); From 758fe623ce05f71e60c04845a21831b7eb77a534 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 21 Oct 2024 15:04:19 -0400 Subject: [PATCH 4/4] refactor: test harness logging --- influxdb3/tests/server/main.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/influxdb3/tests/server/main.rs b/influxdb3/tests/server/main.rs index 15943550da1..718e0e1179d 100644 --- a/influxdb3/tests/server/main.rs +++ b/influxdb3/tests/server/main.rs @@ -93,11 +93,15 @@ impl ConfigProvider for TestConfig { /// A running instance of the `influxdb3 serve` process /// -/// Logs will be emitted to stdout/stderr if the TEST_LOG environment -/// variable is set, e.g., +/// Logs will be emitted to stdout/stderr if the TEST_LOG environment variable is set, e.g., /// ``` -/// TEST_LOG= cargo test +/// TEST_LOG= cargo nextest run -p influxdb3 --nocapture /// ``` +/// This will forward the value provided in `TEST_LOG` to the `LOG_FILTER` env var on the running +/// `influxdb` binary. By default, a log filter of `info` is used, which would provide similar +/// output to what is seen in production, however, per-crate filters can be provided via this +/// argument, e.g., `info,influxdb3_write=debug` would emit logs at `INFO` level for all crates +/// except for the `influxdb3_write` crate, which will emit logs at the `DEBUG` level. pub struct TestServer { auth_token: Option, bind_addr: SocketAddr, @@ -128,10 +132,12 @@ impl TestServer { .args(["--wal-flush-interval", "10ms"]) .args(config.as_args()); - // If TEST_LOG env var is not defined, discard stdout/stderr - if std::env::var("TEST_LOG").is_err() { - command = command.stdout(Stdio::null()).stderr(Stdio::null()); - } + // If TEST_LOG env var is not defined, discard stdout/stderr, otherwise, pass it to the + // inner binary in the "LOG_FILTER" env var: + command = match std::env::var("TEST_LOG") { + Ok(val) => command.env("LOG_FILTER", if val.is_empty() { "info" } else { &val }), + Err(_) => command.stdout(Stdio::null()).stderr(Stdio::null()), + }; let server_process = command.spawn().expect("spawn the influxdb3 server process");