Skip to content

Commit

Permalink
chore(query): limit max concurrency of spill io requests (databendlab…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 authored Sep 12, 2024
1 parent 30101f3 commit d444db2
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 6 deletions.
7 changes: 7 additions & 0 deletions src/query/service/src/pipelines/builders/builder_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl PipelineBuilder {

let max_block_size = self.settings.get_max_block_size()?;
let max_threads = self.settings.get_max_threads()?;
let max_spill_io_requests = self.settings.get_max_spill_io_requests()?;

let enable_experimental_aggregate_hashtable = self
.settings
Expand All @@ -111,6 +112,7 @@ impl PipelineBuilder {
self.is_exchange_neighbor,
max_block_size as usize,
None,
max_spill_io_requests as usize,
)?;

if params.group_columns.is_empty() {
Expand Down Expand Up @@ -214,6 +216,8 @@ impl PipelineBuilder {
let enable_experimental_aggregate_hashtable = self
.settings
.get_enable_experimental_aggregate_hashtable()?;
let max_spill_io_requests = self.settings.get_max_spill_io_requests()?;

let params = Self::build_aggregator_params(
aggregate.before_group_by_schema.clone(),
&aggregate.group_by,
Expand All @@ -222,6 +226,7 @@ impl PipelineBuilder {
self.is_exchange_neighbor,
max_block_size as usize,
aggregate.limit,
max_spill_io_requests as usize,
)?;

if params.group_columns.is_empty() {
Expand Down Expand Up @@ -288,6 +293,7 @@ impl PipelineBuilder {
cluster_aggregator: bool,
max_block_size: usize,
limit: Option<usize>,
max_spill_io_requests: usize,
) -> Result<Arc<AggregatorParams>> {
let mut agg_args = Vec::with_capacity(agg_funcs.len());
let (group_by, group_data_types) = group_by
Expand Down Expand Up @@ -330,6 +336,7 @@ impl PipelineBuilder {
cluster_aggregator,
max_block_size,
limit,
max_spill_io_requests,
)?;

Ok(params)
Expand Down
12 changes: 11 additions & 1 deletion src/query/service/src/pipelines/builders/builder_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
Expand All @@ -26,6 +28,7 @@ use databend_common_pipeline_core::PipeItem;
use databend_common_sql::executor::physical_plans::Window;
use databend_common_sql::executor::physical_plans::WindowPartition;
use databend_common_storage::DataOperator;
use tokio::sync::Semaphore;

use crate::pipelines::processors::transforms::FrameBound;
use crate::pipelines::processors::transforms::TransformWindowPartitionBucket;
Expand Down Expand Up @@ -205,8 +208,15 @@ impl PipelineBuilder {

self.main_pipeline.try_resize(input_nums)?;

let max_spill_io_requests = self.settings.get_max_spill_io_requests()? as usize;
let semaphore = Arc::new(Semaphore::new(max_spill_io_requests));
self.main_pipeline.add_transform(|input, output| {
TransformWindowPartitionSpillReader::create(input, output, operator.clone())
TransformWindowPartitionSpillReader::create(
input,
output,
operator.clone(),
semaphore.clone(),
)
})?;

let block_size = self.settings.get_max_block_size()? as usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct AggregatorParams {
pub max_block_size: usize,
// Limit is push down to AggregatorTransform
pub limit: Option<usize>,
pub max_spill_io_requests: usize,
}

impl AggregatorParams {
Expand All @@ -59,6 +60,7 @@ impl AggregatorParams {
cluster_aggregator: bool,
max_block_size: usize,
limit: Option<usize>,
max_spill_io_requests: usize,
) -> Result<Arc<AggregatorParams>> {
let mut states_offsets: Vec<usize> = Vec::with_capacity(agg_funcs.len());
let mut states_layout = None;
Expand All @@ -79,6 +81,7 @@ impl AggregatorParams {
cluster_aggregator,
max_block_size,
limit,
max_spill_io_requests,
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_pipeline_core::processors::ProcessorPtr;
use itertools::Itertools;
use log::info;
use opendal::Operator;
use tokio::sync::Semaphore;

use crate::pipelines::processors::transforms::aggregator::AggregateMeta;
use crate::pipelines::processors::transforms::aggregator::BucketSpilledPayload;
Expand All @@ -47,6 +48,7 @@ pub struct TransformSpillReader<Method: HashMethodBounds, V: Send + Sync + 'stat
output: Arc<OutputPort>,

operator: Operator,
semaphore: Arc<Semaphore>,
deserialized_meta: Option<BlockMetaInfoPtr>,
reading_meta: Option<AggregateMeta<Method, V>>,
deserializing_meta: Option<DeserializingMeta<Method, V>>,
Expand Down Expand Up @@ -183,6 +185,7 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Processor
AggregateMeta::AggregateSpilling(_) => unreachable!(),
AggregateMeta::Serialized(_) => unreachable!(),
AggregateMeta::BucketSpilled(payload) => {
let _guard = self.semaphore.acquire().await;
let instant = Instant::now();
let data = self
.operator
Expand Down Expand Up @@ -211,7 +214,9 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Processor
let location = payload.location.clone();
let operator = self.operator.clone();
let data_range = payload.data_range.clone();
let semaphore = self.semaphore.clone();
read_data.push(databend_common_base::runtime::spawn(async move {
let _guard = semaphore.acquire().await;
let instant = Instant::now();
let data = operator
.read_with(&location)
Expand Down Expand Up @@ -282,6 +287,7 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> TransformSpillReader<Me
input: Arc<InputPort>,
output: Arc<OutputPort>,
operator: Operator,
semaphore: Arc<Semaphore>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(Box::new(TransformSpillReader::<
Method,
Expand All @@ -290,6 +296,7 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> TransformSpillReader<Me
input,
output,
operator,
semaphore,
deserialized_meta: None,
reading_meta: None,
deserializing_meta: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_pipeline_core::Pipe;
use databend_common_pipeline_core::PipeItem;
use databend_common_pipeline_core::Pipeline;
use databend_common_storage::DataOperator;
use tokio::sync::Semaphore;

use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta;
use crate::pipelines::processors::transforms::aggregator::aggregate_meta::HashTablePayload;
Expand Down Expand Up @@ -459,12 +460,23 @@ pub fn build_partition_bucket<Method: HashMethodBounds, V: Copy + Send + Sync +

pipeline.try_resize(input_nums)?;

let semaphore = Arc::new(Semaphore::new(params.max_spill_io_requests));
let operator = DataOperator::instance().operator();
pipeline.add_transform(|input, output| {
let operator = operator.clone();
match params.aggregate_functions.is_empty() {
true => TransformGroupBySpillReader::<Method>::create(input, output, operator),
false => TransformAggregateSpillReader::<Method>::create(input, output, operator),
true => TransformGroupBySpillReader::<Method>::create(
input,
output,
operator,
semaphore.clone(),
),
false => TransformAggregateSpillReader::<Method>::create(
input,
output,
operator,
semaphore.clone(),
),
}
})?;

Expand Down Expand Up @@ -501,12 +513,23 @@ pub fn build_partition_bucket<Method: HashMethodBounds, V: Copy + Send + Sync +

pipeline.try_resize(input_nums)?;

let semaphore = Arc::new(Semaphore::new(128));
let operator = DataOperator::instance().operator();
pipeline.add_transform(|input, output| {
let operator = operator.clone();
match params.aggregate_functions.is_empty() {
true => TransformGroupBySpillReader::<Method>::create(input, output, operator),
false => TransformAggregateSpillReader::<Method>::create(input, output, operator),
true => TransformGroupBySpillReader::<Method>::create(
input,
output,
operator,
semaphore.clone(),
),
false => TransformAggregateSpillReader::<Method>::create(
input,
output,
operator,
semaphore.clone(),
),
}
})?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_pipeline_core::processors::ProcessorPtr;
use itertools::Itertools;
use log::info;
use opendal::Operator;
use tokio::sync::Semaphore;

use super::WindowPartitionMeta;
use super::WindowPayload;
Expand All @@ -46,6 +47,7 @@ pub struct TransformWindowPartitionSpillReader {
output: Arc<OutputPort>,

operator: Operator,
semaphore: Arc<Semaphore>,
deserialized_meta: Option<BlockMetaInfoPtr>,
reading_meta: Option<WindowPartitionMeta>,
deserializing_meta: Option<DeserializingMeta>,
Expand Down Expand Up @@ -170,7 +172,9 @@ impl Processor for TransformWindowPartitionSpillReader {
let location = p.location.clone();
let operator = self.operator.clone();
let data_range = p.data_range.clone();
let semaphore = self.semaphore.clone();
read_data.push(databend_common_base::runtime::spawn(async move {
let _guard = semaphore.acquire().await;
let instant = Instant::now();
let data = operator
.read_with(&location)
Expand Down Expand Up @@ -243,12 +247,14 @@ impl TransformWindowPartitionSpillReader {
input: Arc<InputPort>,
output: Arc<OutputPort>,
operator: Operator,
semaphore: Arc<Semaphore>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(Box::new(
TransformWindowPartitionSpillReader {
input,
output,
operator,
semaphore,
deserialized_meta: None,
reading_meta: None,
deserializing_meta: None,
Expand Down
19 changes: 18 additions & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl DefaultSettings {
let num_cpus = Self::num_cpus();
let max_memory_usage = Self::max_memory_usage()?;
let recluster_block_size = Self::recluster_block_size()?;
let default_max_spill_io_requests = Self::spill_io_requests(num_cpus);
let default_max_storage_io_requests = Self::storage_io_requests(num_cpus);
let data_retention_time_in_days_max = Self::data_retention_time_in_days_max();
let global_conf = GlobalConfig::try_get_instance();
Expand Down Expand Up @@ -159,9 +160,15 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=data_retention_time_in_days_max)),
}),
("max_spill_io_requests", DefaultSettingValue {
value: UserSettingValue::UInt64(default_max_spill_io_requests),
desc: "Sets the maximum number of concurrent spill I/O requests.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(1..=1024)),
}),
("max_storage_io_requests", DefaultSettingValue {
value: UserSettingValue::UInt64(default_max_storage_io_requests),
desc: "Sets the maximum number of concurrent I/O requests.",
desc: "Sets the maximum number of concurrent storage I/O requests.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(1..=1024)),
}),
Expand Down Expand Up @@ -880,6 +887,16 @@ impl DefaultSettings {
}
}

fn spill_io_requests(num_cpus: u64) -> u64 {
match GlobalConfig::try_get_instance() {
None => std::cmp::min(num_cpus, 64),
Some(conf) => match conf.storage.params.is_fs() {
true => 48,
false => std::cmp::min(num_cpus, 64),
},
}
}

/// The maximum number of days that data can be retained.
/// The max is read from the global config:data_retention_time_in_days_max
/// If the global config is not set, the default value is 90 days.
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,4 +709,8 @@ impl Settings {
pub fn get_dynamic_sample_time_budget_ms(&self) -> Result<u64> {
self.try_get_u64("dynamic_sample_time_budget_ms")
}

pub fn get_max_spill_io_requests(&self) -> Result<u64> {
self.try_get_u64("max_spill_io_requests")
}
}

0 comments on commit d444db2

Please sign in to comment.