From 8a98c39ce118fd3523e2d8150c6849877077c1fd Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Wed, 27 Dec 2023 21:46:54 +0800 Subject: [PATCH] build range index pipeline --- src/query/storages/fuse/src/pruning/mod.rs | 1 + .../storages/fuse/src/pruning/pipeline.rs | 65 +++++++++++++++++++ .../pruning/processors/block_prune_sink.rs | 12 +--- .../src/pruning/processors/segment_source.rs | 20 +++++- 4 files changed, 88 insertions(+), 10 deletions(-) create mode 100644 src/query/storages/fuse/src/pruning/pipeline.rs diff --git a/src/query/storages/fuse/src/pruning/mod.rs b/src/query/storages/fuse/src/pruning/mod.rs index 72139e338c18..c63f73b29c94 100644 --- a/src/query/storages/fuse/src/pruning/mod.rs +++ b/src/query/storages/fuse/src/pruning/mod.rs @@ -15,6 +15,7 @@ mod block_pruner; mod bloom_pruner; mod fuse_pruner; +pub mod pipeline; pub mod processors; mod pruner_location; mod pruning_statistics; diff --git a/src/query/storages/fuse/src/pruning/pipeline.rs b/src/query/storages/fuse/src/pruning/pipeline.rs new file mode 100644 index 000000000000..6534cdb3d467 --- /dev/null +++ b/src/query/storages/fuse/src/pruning/pipeline.rs @@ -0,0 +1,65 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_pipeline_core::Pipeline; +use databend_storages_common_pruner::InternalColumnPruner; +use opendal::Operator; + +use super::processors::segment_source::SegmentSource; + +pub fn build_pruning_pipelines( + ctx: Arc, + push_down: &PushDownInfo, + dal: Operator, +) -> Result> { + let mut pipelines = vec![]; + let max_io_requests = ctx.get_settings().get_max_storage_io_requests()?; + let func_ctx = ctx.get_function_context()?; + let filter_expr = push_down + .filters + .as_ref() + .map(|f| f.filter.as_expr(&BUILTIN_FUNCTIONS)); + let internal_column_pruner = InternalColumnPruner::try_create(func_ctx, filter_expr.as_ref()); + + pipelines.push(build_range_index_pruning_pipeline( + ctx, + max_io_requests as usize, + dal, + internal_column_pruner, + )?); + + Ok(pipelines) +} + +fn build_range_index_pruning_pipeline( + ctx: Arc, + read_segment_concurrency: usize, + dal: Operator, + internal_column_pruner: Option>, +) -> Result { + let mut range_index_pruning_pipeline = Pipeline::create(); + range_index_pruning_pipeline.set_max_threads(ctx.get_settings().get_max_threads()? as usize); + + range_index_pruning_pipeline.add_source( + |output| SegmentSource::create(ctx, dal, internal_column_pruner, output), + read_segment_concurrency, + ); + Ok(range_index_pruning_pipeline) +} diff --git a/src/query/storages/fuse/src/pruning/processors/block_prune_sink.rs b/src/query/storages/fuse/src/pruning/processors/block_prune_sink.rs index 7f5616241f99..6d4f29e180be 100644 --- a/src/query/storages/fuse/src/pruning/processors/block_prune_sink.rs +++ b/src/query/storages/fuse/src/pruning/processors/block_prune_sink.rs @@ -26,7 +26,6 @@ use databend_common_expression::BLOCK_NAME_COL_NAME; use databend_common_pipeline_sinks::AsyncSink; use databend_storages_common_index::RangeIndex; use databend_storages_common_pruner::InternalColumnPruner; -use databend_storages_common_pruner::Limiter; use databend_storages_common_pruner::RangePruner; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::SegmentInfoVersion; @@ -38,10 +37,9 @@ use crate::operations::DeletedSegmentInfo; use crate::operations::SegmentIndex; struct BlockPruneSink { - block_meta_sender: Option>>, + block_meta_sender: Sender>, schema: TableSchemaRef, range_pruner: Arc, - limit_pruner: Arc, internal_column_pruner: Option>, inverse_range_index_context: Option, } @@ -58,7 +56,7 @@ impl AsyncSink for BlockPruneSink { #[async_backtrace::framed] async fn on_finish(&mut self) -> Result<()> { - drop(self.block_meta_sender.take()); + self.block_meta_sender.close(); drop(self.inverse_range_index_context.take()); Ok(()) } @@ -66,7 +64,7 @@ impl AsyncSink for BlockPruneSink { #[unboxed_simple] #[async_backtrace::framed] async fn consume(&mut self, mut data_block: DataBlock) -> Result { - let block_meta_sender = self.block_meta_sender.as_ref().unwrap(); + let block_meta_sender = &self.block_meta_sender; let segment_bytes = SegmentBytes::downcast_from(data_block.take_meta().unwrap()).unwrap(); let compact_segment = deserialize_segment_info( &SegmentInfoVersion::try_from(segment_bytes.segment_location.1)?, @@ -99,9 +97,6 @@ impl AsyncSink for BlockPruneSink { } let segment_block_metas = compact_segment.block_metas()?; for (block_index, block_meta) in segment_block_metas.into_iter().enumerate() { - if self.limit_pruner.exceeded() { - break; - } if let Some(p) = self.internal_column_pruner.as_ref() { if !p.should_keep(BLOCK_NAME_COL_NAME, &block_meta.location.0) { continue; @@ -130,7 +125,6 @@ impl AsyncSink for BlockPruneSink { continue; } } - self.limit_pruner.within_limit(block_meta.row_count); block_meta_sender .send(block_meta) .await diff --git a/src/query/storages/fuse/src/pruning/processors/segment_source.rs b/src/query/storages/fuse/src/pruning/processors/segment_source.rs index c93e2107b482..05d1ebf04569 100644 --- a/src/query/storages/fuse/src/pruning/processors/segment_source.rs +++ b/src/query/storages/fuse/src/pruning/processors/segment_source.rs @@ -20,7 +20,10 @@ use databend_common_exception::Result; use databend_common_expression::BlockMetaInfo; use databend_common_expression::DataBlock; use databend_common_expression::SEGMENT_NAME_COL_NAME; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; use databend_storages_common_pruner::InternalColumnPruner; use databend_storages_common_table_meta::meta::Location; use opendal::Operator; @@ -29,12 +32,27 @@ use serde::Serialize; use crate::FuseLazyPartInfo; -struct SegmentSource { +pub struct SegmentSource { ctx: Arc, dal: Operator, internal_column_pruner: Option>, } +impl SegmentSource { + pub fn create( + ctx: Arc, + dal: Operator, + internal_column_pruner: Option>, + output: Arc, + ) -> Result { + AsyncSourcer::create(ctx, output, Self { + ctx, + dal, + internal_column_pruner, + }) + } +} + #[async_trait::async_trait] impl AsyncSource for SegmentSource { const NAME: &'static str = "segment source";