Skip to content

Commit

Permalink
refactor: changes needed for IDs in pro (#25479)
Browse files Browse the repository at this point in the history
* refactor: roll back addition of DatabaseSchemaProvider trait

* refactor: make parquet metrics optional in telemetry for pro

* refactor: make ParquetFileId Hash

* refactor: test harness logging
  • Loading branch information
hiltontj authored Oct 21, 2024
1 parent 5473a94 commit ce9276d
Show file tree
Hide file tree
Showing 15 changed files with 104 additions and 144 deletions.
10 changes: 5 additions & 5 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ pub async fn command(config: Config) -> Result<()> {
.map_err(Error::InitializePersistedCatalog)?,
);

let last_cache = LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _)
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
.map_err(Error::InitializeLastCache)?;
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");

Expand All @@ -458,7 +458,7 @@ pub async fn command(config: Config) -> Result<()> {
&config.object_store_config,
catalog.instance_id(),
num_cpus,
Arc::clone(&write_buffer_impl.persisted_files()),
Some(Arc::clone(&write_buffer_impl.persisted_files())),
config.telemetry_endpoint,
)
.await;
Expand All @@ -473,7 +473,7 @@ pub async fn command(config: Config) -> Result<()> {
)?;

let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
db_schema_provider: write_buffer.db_schema_provider(),
catalog: write_buffer.catalog(),
write_buffer: Arc::clone(&write_buffer),
exec: Arc::clone(&exec),
metrics: Arc::clone(&metrics),
Expand Down Expand Up @@ -511,7 +511,7 @@ async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
num_cpus: usize,
persisted_files: Arc<PersistedFiles>,
persisted_files: Option<Arc<PersistedFiles>>,
telemetry_endpoint: String,
) -> Arc<TelemetryStore> {
let os = std::env::consts::OS;
Expand All @@ -530,7 +530,7 @@ async fn setup_telemetry_store(
Arc::from(influx_version),
Arc::from(storage_type),
num_cpus,
persisted_files,
persisted_files.map(|p| p as _),
telemetry_endpoint,
)
.await
Expand Down
20 changes: 13 additions & 7 deletions influxdb3/tests/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@ impl ConfigProvider for TestConfig {

/// A running instance of the `influxdb3 serve` process
///
/// Logs will be emitted to stdout/stderr if the TEST_LOG environment
/// variable is set, e.g.,
/// Logs will be emitted to stdout/stderr if the TEST_LOG environment variable is set, e.g.,
/// ```
/// TEST_LOG= cargo test
/// TEST_LOG= cargo nextest run -p influxdb3 --nocapture
/// ```
/// This will forward the value provided in `TEST_LOG` to the `LOG_FILTER` env var on the running
/// `influxdb` binary. By default, a log filter of `info` is used, which would provide similar
/// output to what is seen in production, however, per-crate filters can be provided via this
/// argument, e.g., `info,influxdb3_write=debug` would emit logs at `INFO` level for all crates
/// except for the `influxdb3_write` crate, which will emit logs at the `DEBUG` level.
pub struct TestServer {
auth_token: Option<String>,
bind_addr: SocketAddr,
Expand Down Expand Up @@ -128,10 +132,12 @@ impl TestServer {
.args(["--wal-flush-interval", "10ms"])
.args(config.as_args());

// If TEST_LOG env var is not defined, discard stdout/stderr
if std::env::var("TEST_LOG").is_err() {
command = command.stdout(Stdio::null()).stderr(Stdio::null());
}
// If TEST_LOG env var is not defined, discard stdout/stderr, otherwise, pass it to the
// inner binary in the "LOG_FILTER" env var:
command = match std::env::var("TEST_LOG") {
Ok(val) => command.env("LOG_FILTER", if val.is_empty() { "info" } else { &val }),
Err(_) => command.stdout(Stdio::null()).stderr(Stdio::null()),
};

let server_process = command.spawn().expect("spawn the influxdb3 server process");

Expand Down
79 changes: 38 additions & 41 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Implementation of the Catalog that sits entirely in memory.

use crate::catalog::Error::TableNotFound;
use crate::DatabaseSchemaProvider;
use arrow::datatypes::SchemaRef;
use bimap::BiHashMap;
use influxdb3_id::{ColumnId, DbId, TableId};
Expand Down Expand Up @@ -165,6 +164,44 @@ impl Catalog {
Ok(db)
}

pub fn db_name_to_id(&self, db_name: &str) -> Option<DbId> {
self.inner.read().db_map.get_by_right(db_name).copied()
}

pub fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>> {
self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone)
}

pub fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>> {
self.db_schema_and_id(db_name).map(|(_, schema)| schema)
}

pub fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>> {
self.inner.read().databases.get(&db_id).cloned()
}

pub fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)> {
let inner = self.inner.read();
let db_id = inner.db_map.get_by_right(db_name)?;
inner
.databases
.get(db_id)
.map(|db| (*db_id, Arc::clone(db)))
}

pub fn db_names(&self) -> Vec<String> {
self.inner
.read()
.databases
.values()
.map(|db| db.name.to_string())
.collect()
}

pub fn list_db_schema(&self) -> Vec<Arc<DatabaseSchema>> {
self.inner.read().databases.values().cloned().collect()
}

pub fn sequence_number(&self) -> SequenceNumber {
self.inner.read().sequence
}
Expand Down Expand Up @@ -243,46 +280,6 @@ impl Catalog {
}
}

impl DatabaseSchemaProvider for Catalog {
fn db_name_to_id(&self, db_name: &str) -> Option<DbId> {
self.inner.read().db_map.get_by_right(db_name).copied()
}

fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>> {
self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone)
}

fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>> {
self.db_schema_and_id(db_name).map(|(_, schema)| schema)
}

fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>> {
self.inner.read().databases.get(&db_id).cloned()
}

fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)> {
let inner = self.inner.read();
let db_id = inner.db_map.get_by_right(db_name)?;
inner
.databases
.get(db_id)
.map(|db| (*db_id, Arc::clone(db)))
}

fn db_names(&self) -> Vec<String> {
self.inner
.read()
.databases
.values()
.map(|db| db.name.to_string())
.collect()
}

fn list_db_schema(&self) -> Vec<Arc<DatabaseSchema>> {
self.inner.read().databases.values().cloned().collect()
}
}

#[serde_with::serde_as]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
pub struct InnerCatalog {
Expand Down
32 changes: 0 additions & 32 deletions influxdb3_catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,2 @@
use std::sync::Arc;

use catalog::DatabaseSchema;
use influxdb3_id::DbId;

pub mod catalog;
pub(crate) mod serialize;

/// Provide [`DatabaseSchema`] and there derivatives where needed.
///
/// This trait captures the read-only behaviour of the [`catalog::Catalog`] as where it is used to
/// serve queries and provide schema-related info of the underlying database.
pub trait DatabaseSchemaProvider: std::fmt::Debug + Send + Sync + 'static {
/// Convert a database name to its corresponding [`DbId`]
fn db_name_to_id(&self, db_name: &str) -> Option<DbId>;

/// Convert a [`DbId`] into its corresponding database name
fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>>;

/// Get the [`DatabaseSchema`] for the given name, or `None` otherwise
fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>>;

/// Get the [`DatabaseSchema`] for the given [`DbId`], or `None` otherwise
fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>>;

/// Get the [`DatabaseSchema`] as well as the corresponding [`DbId`] for the given name
fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)>;

/// Get a list of the database names in the underlying catalog'd instance
fn db_names(&self) -> Vec<String>;

/// List out all [`DatabaseSchema`] in the database
fn list_db_schema(&self) -> Vec<Arc<DatabaseSchema>>;
}
2 changes: 1 addition & 1 deletion influxdb3_id/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Display for ColumnId {
/// The next file id to be used when persisting `ParquetFile`s
pub static NEXT_FILE_ID: AtomicU64 = AtomicU64::new(0);

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone, PartialOrd, Ord)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone, PartialOrd, Ord, Hash)]
/// A newtype wrapper for ids used with `ParquetFile`
pub struct ParquetFileId(u64);

Expand Down
4 changes: 2 additions & 2 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ where

let (db_id, db_schema) = self
.write_buffer
.db_schema_provider()
.catalog()
.db_schema_and_id(&db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let table_id = db_schema
Expand Down Expand Up @@ -741,7 +741,7 @@ where

let (db_id, db_schema) = self
.write_buffer
.db_schema_provider()
.catalog()
.db_schema_and_id(&db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let table_id = db_schema
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ mod tests {
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_db_schema_provider(catalog as _).unwrap()),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),
Expand All @@ -802,7 +802,7 @@ mod tests {
)
.unwrap();
let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs {
db_schema_provider: write_buffer.db_schema_provider(),
catalog: write_buffer.catalog(),
write_buffer: Arc::clone(&write_buffer),
exec: Arc::clone(&exec),
metrics: Arc::clone(&metrics),
Expand Down
21 changes: 10 additions & 11 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_catalog::DatabaseSchemaProvider;
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::last_cache::LastCacheFunction;
use influxdb3_write::WriteBuffer;
Expand Down Expand Up @@ -50,7 +49,7 @@ use tracker::{

#[derive(Debug)]
pub struct QueryExecutorImpl {
db_schema_provider: Arc<dyn DatabaseSchemaProvider>,
catalog: Arc<Catalog>,
write_buffer: Arc<dyn WriteBuffer>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
Expand All @@ -62,7 +61,7 @@ pub struct QueryExecutorImpl {
/// Arguments for [`QueryExecutorImpl::new`]
#[derive(Debug)]
pub struct CreateQueryExecutorArgs {
pub db_schema_provider: Arc<dyn DatabaseSchemaProvider>,
pub catalog: Arc<Catalog>,
pub write_buffer: Arc<dyn WriteBuffer>,
pub exec: Arc<Executor>,
pub metrics: Arc<Registry>,
Expand All @@ -75,7 +74,7 @@ pub struct CreateQueryExecutorArgs {
impl QueryExecutorImpl {
pub fn new(
CreateQueryExecutorArgs {
db_schema_provider,
catalog,
write_buffer,
exec,
metrics,
Expand All @@ -96,7 +95,7 @@ impl QueryExecutorImpl {
Arc::new(iox_time::SystemProvider::new()),
));
Self {
db_schema_provider,
catalog,
write_buffer,
exec,
datafusion_config,
Expand Down Expand Up @@ -181,7 +180,7 @@ impl QueryExecutor for QueryExecutorImpl {
}

fn show_databases(&self) -> Result<SendableRecordBatchStream, Self::Error> {
let mut databases = self.db_schema_provider.db_names();
let mut databases = self.catalog.db_names();
// sort them to ensure consistent order:
databases.sort_unstable();
let databases = StringArray::from(databases);
Expand All @@ -200,7 +199,7 @@ impl QueryExecutor for QueryExecutorImpl {
let mut databases = if let Some(db) = database {
vec![db.to_owned()]
} else {
self.db_schema_provider.db_names()
self.catalog.db_names()
};
// sort them to ensure consistent order:
databases.sort_unstable();
Expand Down Expand Up @@ -319,7 +318,7 @@ impl QueryDatabase for QueryExecutorImpl {
) -> Result<Option<Arc<dyn QueryNamespace>>, DataFusionError> {
let _span_recorder = SpanRecorder::new(span);

let db_schema = self.db_schema_provider.db_schema(name).ok_or_else(|| {
let db_schema = self.catalog.db_schema(name).ok_or_else(|| {
DataFusionError::External(Box::new(Error::DatabaseNotFound {
db_name: name.into(),
}))
Expand Down Expand Up @@ -675,7 +674,7 @@ mod tests {
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_db_schema_provider(catalog as _).unwrap()),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig {
Expand All @@ -696,7 +695,7 @@ mod tests {
let metrics = Arc::new(Registry::new());
let datafusion_config = Arc::new(Default::default());
let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs {
db_schema_provider: write_buffer.db_schema_provider(),
catalog: write_buffer.catalog(),
write_buffer: Arc::clone(&write_buffer),
exec,
metrics,
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/system_tables/parquet_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl IoxSystemTable for ParquetFilesTable {
let parquet_files: Vec<ParquetFile> = self.buffer.parquet_files(
self.db_id,
self.buffer
.db_schema_provider()
.catalog()
.db_schema_by_id(self.db_id)
.expect("db exists")
.table_name_to_id(table_name.as_str())
Expand Down
Loading

0 comments on commit ce9276d

Please sign in to comment.