Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: changes needed for IDs in pro #25479

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ pub async fn command(config: Config) -> Result<()> {
.map_err(Error::InitializePersistedCatalog)?,
);

let last_cache = LastCacheProvider::new_from_db_schema_provider(Arc::clone(&catalog) as _)
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _)
.map_err(Error::InitializeLastCache)?;
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");

Expand All @@ -458,7 +458,7 @@ pub async fn command(config: Config) -> Result<()> {
&config.object_store_config,
catalog.instance_id(),
num_cpus,
Arc::clone(&write_buffer_impl.persisted_files()),
Some(Arc::clone(&write_buffer_impl.persisted_files())),
config.telemetry_endpoint,
)
.await;
Expand All @@ -473,7 +473,7 @@ pub async fn command(config: Config) -> Result<()> {
)?;

let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
db_schema_provider: write_buffer.db_schema_provider(),
catalog: write_buffer.catalog(),
write_buffer: Arc::clone(&write_buffer),
exec: Arc::clone(&exec),
metrics: Arc::clone(&metrics),
Expand Down Expand Up @@ -511,7 +511,7 @@ async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
num_cpus: usize,
persisted_files: Arc<PersistedFiles>,
persisted_files: Option<Arc<PersistedFiles>>,
telemetry_endpoint: String,
) -> Arc<TelemetryStore> {
let os = std::env::consts::OS;
Expand All @@ -530,7 +530,7 @@ async fn setup_telemetry_store(
Arc::from(influx_version),
Arc::from(storage_type),
num_cpus,
persisted_files,
persisted_files.map(|p| p as _),
telemetry_endpoint,
)
.await
Expand Down
20 changes: 13 additions & 7 deletions influxdb3/tests/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@ impl ConfigProvider for TestConfig {

/// A running instance of the `influxdb3 serve` process
///
/// Logs will be emitted to stdout/stderr if the TEST_LOG environment
/// variable is set, e.g.,
/// Logs will be emitted to stdout/stderr if the TEST_LOG environment variable is set, e.g.,
/// ```
/// TEST_LOG= cargo test
/// TEST_LOG= cargo nextest run -p influxdb3 --nocapture
/// ```
/// This will forward the value provided in `TEST_LOG` to the `LOG_FILTER` env var on the running
/// `influxdb` binary. By default, a log filter of `info` is used, which would provide similar
/// output to what is seen in production, however, per-crate filters can be provided via this
/// argument, e.g., `info,influxdb3_write=debug` would emit logs at `INFO` level for all crates
/// except for the `influxdb3_write` crate, which will emit logs at the `DEBUG` level.
pub struct TestServer {
auth_token: Option<String>,
bind_addr: SocketAddr,
Expand Down Expand Up @@ -128,10 +132,12 @@ impl TestServer {
.args(["--wal-flush-interval", "10ms"])
.args(config.as_args());

// If TEST_LOG env var is not defined, discard stdout/stderr
if std::env::var("TEST_LOG").is_err() {
command = command.stdout(Stdio::null()).stderr(Stdio::null());
}
// If TEST_LOG env var is not defined, discard stdout/stderr, otherwise, pass it to the
// inner binary in the "LOG_FILTER" env var:
command = match std::env::var("TEST_LOG") {
Ok(val) => command.env("LOG_FILTER", if val.is_empty() { "info" } else { &val }),
Err(_) => command.stdout(Stdio::null()).stderr(Stdio::null()),
};

let server_process = command.spawn().expect("spawn the influxdb3 server process");

Expand Down
79 changes: 38 additions & 41 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Implementation of the Catalog that sits entirely in memory.

use crate::catalog::Error::TableNotFound;
use crate::DatabaseSchemaProvider;
use arrow::datatypes::SchemaRef;
use bimap::BiHashMap;
use influxdb3_id::{ColumnId, DbId, TableId};
Expand Down Expand Up @@ -165,6 +164,44 @@ impl Catalog {
Ok(db)
}

pub fn db_name_to_id(&self, db_name: &str) -> Option<DbId> {
self.inner.read().db_map.get_by_right(db_name).copied()
}

pub fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>> {
self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone)
}

pub fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>> {
self.db_schema_and_id(db_name).map(|(_, schema)| schema)
}

pub fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>> {
self.inner.read().databases.get(&db_id).cloned()
}

pub fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)> {
let inner = self.inner.read();
let db_id = inner.db_map.get_by_right(db_name)?;
inner
.databases
.get(db_id)
.map(|db| (*db_id, Arc::clone(db)))
}

pub fn db_names(&self) -> Vec<String> {
self.inner
.read()
.databases
.values()
.map(|db| db.name.to_string())
.collect()
}

pub fn list_db_schema(&self) -> Vec<Arc<DatabaseSchema>> {
self.inner.read().databases.values().cloned().collect()
}

pub fn sequence_number(&self) -> SequenceNumber {
self.inner.read().sequence
}
Expand Down Expand Up @@ -243,46 +280,6 @@ impl Catalog {
}
}

impl DatabaseSchemaProvider for Catalog {
fn db_name_to_id(&self, db_name: &str) -> Option<DbId> {
self.inner.read().db_map.get_by_right(db_name).copied()
}

fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>> {
self.inner.read().db_map.get_by_left(&db_id).map(Arc::clone)
}

fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>> {
self.db_schema_and_id(db_name).map(|(_, schema)| schema)
}

fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>> {
self.inner.read().databases.get(&db_id).cloned()
}

fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)> {
let inner = self.inner.read();
let db_id = inner.db_map.get_by_right(db_name)?;
inner
.databases
.get(db_id)
.map(|db| (*db_id, Arc::clone(db)))
}

fn db_names(&self) -> Vec<String> {
self.inner
.read()
.databases
.values()
.map(|db| db.name.to_string())
.collect()
}

fn list_db_schema(&self) -> Vec<Arc<DatabaseSchema>> {
self.inner.read().databases.values().cloned().collect()
}
}

#[serde_with::serde_as]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
pub struct InnerCatalog {
Expand Down
32 changes: 0 additions & 32 deletions influxdb3_catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,2 @@
use std::sync::Arc;

use catalog::DatabaseSchema;
use influxdb3_id::DbId;

pub mod catalog;
pub(crate) mod serialize;

/// Provide [`DatabaseSchema`] and there derivatives where needed.
///
/// This trait captures the read-only behaviour of the [`catalog::Catalog`] as where it is used to
/// serve queries and provide schema-related info of the underlying database.
pub trait DatabaseSchemaProvider: std::fmt::Debug + Send + Sync + 'static {
/// Convert a database name to its corresponding [`DbId`]
fn db_name_to_id(&self, db_name: &str) -> Option<DbId>;

/// Convert a [`DbId`] into its corresponding database name
fn db_id_to_name(&self, db_id: DbId) -> Option<Arc<str>>;

/// Get the [`DatabaseSchema`] for the given name, or `None` otherwise
fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>>;

/// Get the [`DatabaseSchema`] for the given [`DbId`], or `None` otherwise
fn db_schema_by_id(&self, db_id: DbId) -> Option<Arc<DatabaseSchema>>;

/// Get the [`DatabaseSchema`] as well as the corresponding [`DbId`] for the given name
fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)>;

/// Get a list of the database names in the underlying catalog'd instance
fn db_names(&self) -> Vec<String>;

/// List out all [`DatabaseSchema`] in the database
fn list_db_schema(&self) -> Vec<Arc<DatabaseSchema>>;
}
2 changes: 1 addition & 1 deletion influxdb3_id/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Display for ColumnId {
/// The next file id to be used when persisting `ParquetFile`s
pub static NEXT_FILE_ID: AtomicU64 = AtomicU64::new(0);

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone, PartialOrd, Ord)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone, PartialOrd, Ord, Hash)]
/// A newtype wrapper for ids used with `ParquetFile`
pub struct ParquetFileId(u64);

Expand Down
4 changes: 2 additions & 2 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ where

let (db_id, db_schema) = self
.write_buffer
.db_schema_provider()
.catalog()
.db_schema_and_id(&db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let table_id = db_schema
Expand Down Expand Up @@ -741,7 +741,7 @@ where

let (db_id, db_schema) = self
.write_buffer
.db_schema_provider()
.catalog()
.db_schema_and_id(&db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let table_id = db_schema
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ mod tests {
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_db_schema_provider(catalog as _).unwrap()),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),
Expand All @@ -802,7 +802,7 @@ mod tests {
)
.unwrap();
let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs {
db_schema_provider: write_buffer.db_schema_provider(),
catalog: write_buffer.catalog(),
write_buffer: Arc::clone(&write_buffer),
exec: Arc::clone(&exec),
metrics: Arc::clone(&metrics),
Expand Down
21 changes: 10 additions & 11 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_catalog::DatabaseSchemaProvider;
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::last_cache::LastCacheFunction;
use influxdb3_write::WriteBuffer;
Expand Down Expand Up @@ -50,7 +49,7 @@ use tracker::{

#[derive(Debug)]
pub struct QueryExecutorImpl {
db_schema_provider: Arc<dyn DatabaseSchemaProvider>,
catalog: Arc<Catalog>,
write_buffer: Arc<dyn WriteBuffer>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
Expand All @@ -62,7 +61,7 @@ pub struct QueryExecutorImpl {
/// Arguments for [`QueryExecutorImpl::new`]
#[derive(Debug)]
pub struct CreateQueryExecutorArgs {
pub db_schema_provider: Arc<dyn DatabaseSchemaProvider>,
pub catalog: Arc<Catalog>,
pub write_buffer: Arc<dyn WriteBuffer>,
pub exec: Arc<Executor>,
pub metrics: Arc<Registry>,
Expand All @@ -75,7 +74,7 @@ pub struct CreateQueryExecutorArgs {
impl QueryExecutorImpl {
pub fn new(
CreateQueryExecutorArgs {
db_schema_provider,
catalog,
write_buffer,
exec,
metrics,
Expand All @@ -96,7 +95,7 @@ impl QueryExecutorImpl {
Arc::new(iox_time::SystemProvider::new()),
));
Self {
db_schema_provider,
catalog,
write_buffer,
exec,
datafusion_config,
Expand Down Expand Up @@ -181,7 +180,7 @@ impl QueryExecutor for QueryExecutorImpl {
}

fn show_databases(&self) -> Result<SendableRecordBatchStream, Self::Error> {
let mut databases = self.db_schema_provider.db_names();
let mut databases = self.catalog.db_names();
// sort them to ensure consistent order:
databases.sort_unstable();
let databases = StringArray::from(databases);
Expand All @@ -200,7 +199,7 @@ impl QueryExecutor for QueryExecutorImpl {
let mut databases = if let Some(db) = database {
vec![db.to_owned()]
} else {
self.db_schema_provider.db_names()
self.catalog.db_names()
};
// sort them to ensure consistent order:
databases.sort_unstable();
Expand Down Expand Up @@ -319,7 +318,7 @@ impl QueryDatabase for QueryExecutorImpl {
) -> Result<Option<Arc<dyn QueryNamespace>>, DataFusionError> {
let _span_recorder = SpanRecorder::new(span);

let db_schema = self.db_schema_provider.db_schema(name).ok_or_else(|| {
let db_schema = self.catalog.db_schema(name).ok_or_else(|| {
DataFusionError::External(Box::new(Error::DatabaseNotFound {
db_name: name.into(),
}))
Expand Down Expand Up @@ -675,7 +674,7 @@ mod tests {
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Arc::new(LastCacheProvider::new_from_db_schema_provider(catalog as _).unwrap()),
Arc::new(LastCacheProvider::new_from_catalog(catalog as _).unwrap()),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig {
Expand All @@ -696,7 +695,7 @@ mod tests {
let metrics = Arc::new(Registry::new());
let datafusion_config = Arc::new(Default::default());
let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs {
db_schema_provider: write_buffer.db_schema_provider(),
catalog: write_buffer.catalog(),
write_buffer: Arc::clone(&write_buffer),
exec,
metrics,
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/system_tables/parquet_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl IoxSystemTable for ParquetFilesTable {
let parquet_files: Vec<ParquetFile> = self.buffer.parquet_files(
self.db_id,
self.buffer
.db_schema_provider()
.catalog()
.db_schema_by_id(self.db_id)
.expect("db exists")
.table_name_to_id(table_name.as_str())
Expand Down
Loading