Skip to content

Commit

Permalink
feat(metric-engine): introduce index options from metric engine (#5374)
Browse files Browse the repository at this point in the history
* feat(metric-engine): introduce index options from metric engine

* chore: fmt toml

* test: add sqlness test

* fix: ignore internal columns

* chore: remove unused dep

* chore: update sqlness result

* chore: ignore metric engine internal columns

* chore: refine code styling

* test: update sqlness test

* refactor: refactor `create_table_constraints`

* test: show index

* chore: apply suggestions from CR

* fix: set inverted index explicitly

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Jan 20, 2025
1 parent 5287d46 commit 385b1bc
Show file tree
Hide file tree
Showing 25 changed files with 734 additions and 146 deletions.
2 changes: 1 addition & 1 deletion src/api/src/v1/column_def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ mod tests {
case_sensitive: false,
})
.unwrap();
schema.with_inverted_index(true);
schema.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
Expand Down
15 changes: 14 additions & 1 deletion src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ impl ColumnSchema {
self
}

pub fn with_inverted_index(&mut self, value: bool) {
/// Set the inverted index for the column.
/// Similar to [with_inverted_index] but don't take the ownership.
///
/// [with_inverted_index]: Self::with_inverted_index
pub fn set_inverted_index(&mut self, value: bool) {
match value {
true => {
self.metadata
Expand All @@ -170,6 +174,15 @@ impl ColumnSchema {
}
}

/// Set the inverted index for the column.
/// Similar to [set_inverted_index] but take the ownership and return a owned value.
///
/// [set_inverted_index]: Self::set_inverted_index
pub fn with_inverted_index(mut self, value: bool) -> Self {
self.set_inverted_index(value);
self
}

// Put a placeholder to invalidate schemas.all(!has_inverted_index_key).
pub fn insert_inverted_index_placeholder(&mut self) {
self.metadata
Expand Down
35 changes: 31 additions & 4 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::{debug, info, warn};
use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
Expand All @@ -26,9 +27,10 @@ use store_api::region_request::{
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ConcreteDataType, RegionId};

use crate::engine::IndexOptions;
use crate::error::{
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, Result,
MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION};
use crate::utils;
Expand Down Expand Up @@ -64,13 +66,16 @@ impl DataRegion {
&self,
region_id: RegionId,
columns: &mut [ColumnMetadata],
index_options: IndexOptions,
) -> Result<()> {
let region_id = utils::to_data_region_id(region_id);

let mut retries = 0;
// submit alter request
while retries < MAX_RETRIES {
let request = self.assemble_alter_request(region_id, columns).await?;
let request = self
.assemble_alter_request(region_id, columns, index_options)
.await?;

let _timer = MITO_DDL_DURATION.start_timer();

Expand All @@ -97,6 +102,7 @@ impl DataRegion {
&self,
region_id: RegionId,
columns: &mut [ColumnMetadata],
index_options: IndexOptions,
) -> Result<RegionRequest> {
// retrieve underlying version
let region_metadata = self
Expand Down Expand Up @@ -142,6 +148,19 @@ impl DataRegion {

c.column_id = new_column_id_start + delta as u32;
c.column_schema.set_nullable();
match index_options {
IndexOptions::Inverted => {
c.column_schema.set_inverted_index(true);
}
IndexOptions::Skipping { granularity } => {
c.column_schema
.set_skipping_options(&SkippingIndexOptions {
granularity,
index_type: SkippingIndexType::BloomFilter,
})
.context(SetSkippingIndexOptionSnafu)?;
}
}

Ok(AddColumn {
column_metadata: c.clone(),
Expand Down Expand Up @@ -256,7 +275,11 @@ mod test {
},
];
env.data_region()
.add_columns(env.default_physical_region_id(), &mut new_columns)
.add_columns(
env.default_physical_region_id(),
&mut new_columns,
IndexOptions::Inverted,
)
.await
.unwrap();

Expand Down Expand Up @@ -295,7 +318,11 @@ mod test {
}];
let result = env
.data_region()
.add_columns(env.default_physical_region_id(), &mut new_columns)
.add_columns(
env.default_physical_region_id(),
&mut new_columns,
IndexOptions::Inverted,
)
.await;
assert!(result.is_err());
}
Expand Down
1 change: 1 addition & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use mito2::engine::MitoEngine;
pub(crate) use options::IndexOptions;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
Expand Down
20 changes: 17 additions & 3 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
use crate::error::{LogicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu};
use crate::error::{
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
};
use crate::utils::{to_data_region_id, to_metadata_region_id};

impl MetricEngineInner {
Expand Down Expand Up @@ -64,16 +66,27 @@ impl MetricEngineInner {
logical_region_id: RegionId,
request: RegionAlterRequest,
) -> Result<RegionId> {
let physical_region_id = {
let (physical_region_id, index_options) = {
let state = &self.state.read().unwrap();
state
let physical_region_id = state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})?;

let index_options = state
.physical_region_states()
.get(&physical_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?
.options()
.index;

(physical_region_id, index_options)
};

// only handle adding column
Expand Down Expand Up @@ -122,6 +135,7 @@ impl MetricEngineInner {
data_region_id,
logical_region_id,
&mut columns_to_add,
index_options,
)
.await?;

Expand Down
22 changes: 18 additions & 4 deletions src/metric-engine/src/engine/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
// limitations under the License.

use common_telemetry::debug;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
use crate::error::{MitoCatchupOperationSnafu, Result, UnsupportedRegionRequestSnafu};
use crate::error::{
MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
};
use crate::utils;

impl MetricEngineInner {
Expand All @@ -34,6 +36,18 @@ impl MetricEngineInner {
}
.fail();
}
let data_region_id = utils::to_data_region_id(region_id);
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.options();

let metadata_region_id = utils::to_metadata_region_id(region_id);
// TODO(weny): improve the catchup, we can read the wal entries only once.
debug!("Catchup metadata region {metadata_region_id}");
Expand All @@ -49,7 +63,6 @@ impl MetricEngineInner {
.await
.context(MitoCatchupOperationSnafu)?;

let data_region_id = utils::to_data_region_id(region_id);
debug!("Catchup data region {data_region_id}");
self.mito
.handle_request(
Expand All @@ -64,7 +77,8 @@ impl MetricEngineInner {
.context(MitoCatchupOperationSnafu)
.map(|response| response.affected_rows)?;

self.recover_states(region_id).await?;
self.recover_states(region_id, physical_region_options)
.await?;
Ok(0)
}
}
3 changes: 1 addition & 2 deletions src/metric-engine/src/engine/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ impl MetricEngineInner {
.state
.read()
.unwrap()
.physical_regions()
.contains_key(&data_region_id)
.exist_physical_region(data_region_id)
{
self.close_physical_region(data_region_id).await?;
self.state
Expand Down
Loading

0 comments on commit 385b1bc

Please sign in to comment.