Skip to content

Commit

Permalink
build range index pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 committed Dec 27, 2023
1 parent 01e90ac commit 8a98c39
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/pruning/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod block_pruner;
mod bloom_pruner;
mod fuse_pruner;
pub mod pipeline;
pub mod processors;
mod pruner_location;
mod pruning_statistics;
Expand Down
65 changes: 65 additions & 0 deletions src/query/storages/fuse/src/pruning/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::Pipeline;
use databend_storages_common_pruner::InternalColumnPruner;
use opendal::Operator;

use super::processors::segment_source::SegmentSource;

pub fn build_pruning_pipelines(
ctx: Arc<dyn TableContext>,
push_down: &PushDownInfo,
dal: Operator,
) -> Result<Vec<Pipeline>> {
let mut pipelines = vec![];
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()?;
let func_ctx = ctx.get_function_context()?;
let filter_expr = push_down
.filters
.as_ref()
.map(|f| f.filter.as_expr(&BUILTIN_FUNCTIONS));
let internal_column_pruner = InternalColumnPruner::try_create(func_ctx, filter_expr.as_ref());

pipelines.push(build_range_index_pruning_pipeline(
ctx,
max_io_requests as usize,
dal,
internal_column_pruner,
)?);

Ok(pipelines)
}

fn build_range_index_pruning_pipeline(
ctx: Arc<dyn TableContext>,
read_segment_concurrency: usize,
dal: Operator,
internal_column_pruner: Option<Arc<InternalColumnPruner>>,
) -> Result<Pipeline> {
let mut range_index_pruning_pipeline = Pipeline::create();
range_index_pruning_pipeline.set_max_threads(ctx.get_settings().get_max_threads()? as usize);

range_index_pruning_pipeline.add_source(
|output| SegmentSource::create(ctx, dal, internal_column_pruner, output),
read_segment_concurrency,
);
Ok(range_index_pruning_pipeline)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_common_expression::BLOCK_NAME_COL_NAME;
use databend_common_pipeline_sinks::AsyncSink;
use databend_storages_common_index::RangeIndex;
use databend_storages_common_pruner::InternalColumnPruner;
use databend_storages_common_pruner::Limiter;
use databend_storages_common_pruner::RangePruner;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::SegmentInfoVersion;
Expand All @@ -38,10 +37,9 @@ use crate::operations::DeletedSegmentInfo;
use crate::operations::SegmentIndex;

struct BlockPruneSink {
block_meta_sender: Option<Sender<Arc<BlockMeta>>>,
block_meta_sender: Sender<Arc<BlockMeta>>,
schema: TableSchemaRef,
range_pruner: Arc<dyn RangePruner + Send + Sync>,
limit_pruner: Arc<dyn Limiter + Send + Sync>,
internal_column_pruner: Option<Arc<InternalColumnPruner>>,
inverse_range_index_context: Option<InverseRangeIndexContext>,
}
Expand All @@ -58,15 +56,15 @@ impl AsyncSink for BlockPruneSink {

#[async_backtrace::framed]
async fn on_finish(&mut self) -> Result<()> {
drop(self.block_meta_sender.take());
self.block_meta_sender.close();
drop(self.inverse_range_index_context.take());
Ok(())
}

#[unboxed_simple]
#[async_backtrace::framed]
async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
let block_meta_sender = self.block_meta_sender.as_ref().unwrap();
let block_meta_sender = &self.block_meta_sender;
let segment_bytes = SegmentBytes::downcast_from(data_block.take_meta().unwrap()).unwrap();
let compact_segment = deserialize_segment_info(
&SegmentInfoVersion::try_from(segment_bytes.segment_location.1)?,
Expand Down Expand Up @@ -99,9 +97,6 @@ impl AsyncSink for BlockPruneSink {
}
let segment_block_metas = compact_segment.block_metas()?;
for (block_index, block_meta) in segment_block_metas.into_iter().enumerate() {
if self.limit_pruner.exceeded() {
break;
}
if let Some(p) = self.internal_column_pruner.as_ref() {
if !p.should_keep(BLOCK_NAME_COL_NAME, &block_meta.location.0) {
continue;
Expand Down Expand Up @@ -130,7 +125,6 @@ impl AsyncSink for BlockPruneSink {
continue;
}
}
self.limit_pruner.within_limit(block_meta.row_count);
block_meta_sender
.send(block_meta)
.await
Expand Down
20 changes: 19 additions & 1 deletion src/query/storages/fuse/src/pruning/processors/segment_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::DataBlock;
use databend_common_expression::SEGMENT_NAME_COL_NAME;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sources::AsyncSource;
use databend_common_pipeline_sources::AsyncSourcer;
use databend_storages_common_pruner::InternalColumnPruner;
use databend_storages_common_table_meta::meta::Location;
use opendal::Operator;
Expand All @@ -29,12 +32,27 @@ use serde::Serialize;

use crate::FuseLazyPartInfo;

struct SegmentSource {
pub struct SegmentSource {
ctx: Arc<dyn TableContext>,
dal: Operator,
internal_column_pruner: Option<Arc<InternalColumnPruner>>,
}

impl SegmentSource {
pub fn create(
ctx: Arc<dyn TableContext>,
dal: Operator,
internal_column_pruner: Option<Arc<InternalColumnPruner>>,
output: Arc<OutputPort>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx, output, Self {
ctx,
dal,
internal_column_pruner,
})
}
}

#[async_trait::async_trait]
impl AsyncSource for SegmentSource {
const NAME: &'static str = "segment source";
Expand Down

0 comments on commit 8a98c39

Please sign in to comment.