Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parallelize rows fetcher. #12590

Merged
merged 6 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl AggIndexReader {
);
let res = self
.reader
.sync_read_native_columns_data(part, &None)
.sync_read_native_columns_data(&part, &None)
.inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}"))
.ok()?;
Some(res)
Expand Down Expand Up @@ -108,7 +108,7 @@ impl AggIndexReader {
);
let res = self
.reader
.async_read_native_columns_data(part, &None)
.async_read_native_columns_data(&part, &None)
.await
.inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}"))
.ok()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl AggIndexReader {
);
let res = self
.reader
.sync_read_columns_data_by_merge_io(read_settings, part.clone(), &None)
.sync_read_columns_data_by_merge_io(read_settings, &part, &None)
.inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}"))
.ok()?;
Some((part, res))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ impl BlockReader {
pub fn sync_read_columns_data_by_merge_io(
&self,
settings: &ReadSettings,
part: PartInfoPtr,
part: &PartInfoPtr,
ignore_column_ids: &Option<HashSet<ColumnId>>,
) -> Result<MergeIOReadResult> {
let part = FusePartInfo::from_part(&part)?;
let part = FusePartInfo::from_part(part)?;
let column_array_cache = CacheManager::instance().get_table_data_array_cache();

let mut ranges = vec![];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ impl BlockReader {
#[async_backtrace::framed]
pub async fn async_read_native_columns_data(
&self,
part: PartInfoPtr,
part: &PartInfoPtr,
ignore_column_ids: &Option<HashSet<ColumnId>>,
) -> Result<NativeSourceData> {
// Perf
{
metrics_inc_remote_io_read_parts(1);
}

let part = FusePartInfo::from_part(&part)?;
let part = FusePartInfo::from_part(part)?;
let mut join_handlers = Vec::with_capacity(self.project_column_nodes.len());

for (index, column_node) in self.project_column_nodes.iter().enumerate() {
Expand Down Expand Up @@ -148,10 +148,10 @@ impl BlockReader {

pub fn sync_read_native_columns_data(
&self,
part: PartInfoPtr,
part: &PartInfoPtr,
ignore_column_ids: &Option<HashSet<ColumnId>>,
) -> Result<NativeSourceData> {
let part = FusePartInfo::from_part(&part)?;
let part = FusePartInfo::from_part(part)?;

let mut results: BTreeMap<usize, Vec<NativeReader<Reader>>> = BTreeMap::new();
for (index, column_node) in self.project_column_nodes.iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/io/read/read_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use common_catalog::table_context::TableContext;
use common_exception::Result;

#[derive(Clone)]
#[derive(Clone, Copy)]
pub struct ReadSettings {
pub storage_io_min_bytes_for_seek: u64,
pub storage_io_max_page_bytes_for_read: u64,
Expand Down
60 changes: 34 additions & 26 deletions src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,22 @@ pub fn build_row_fetcher_pipeline(
.to_owned();
let fuse_table = Arc::new(fuse_table);
let block_reader = fuse_table.create_block_reader(projection.clone(), false, ctx.clone())?;
let max_threads = ctx.get_settings().get_max_threads()? as usize;

pipeline.add_transform(|input, output| {
Ok(match &fuse_table.storage_format {
FuseStorageFormat::Native => {
let mut column_leaves = Vec::with_capacity(block_reader.project_column_nodes.len());
for column_node in &block_reader.project_column_nodes {
let leaves: Vec<ColumnDescriptor> = column_node
.leaf_indices
.iter()
.map(|i| block_reader.parquet_schema_descriptor.columns()[*i].clone())
.collect::<Vec<_>>();
column_leaves.push(leaves);
}
if block_reader.support_blocking_api() {
match &fuse_table.storage_format {
FuseStorageFormat::Native => {
let mut column_leaves = Vec::with_capacity(block_reader.project_column_nodes.len());
for column_node in &block_reader.project_column_nodes {
let leaves: Vec<ColumnDescriptor> = column_node
.leaf_indices
.iter()
.map(|i| block_reader.parquet_schema_descriptor.columns()[*i].clone())
.collect::<Vec<_>>();
column_leaves.push(leaves);
}
let column_leaves = Arc::new(column_leaves);
pipeline.add_transform(|input, output| {
Ok(if block_reader.support_blocking_api() {
TransformRowsFetcher::create(
input,
output,
Expand All @@ -77,7 +79,8 @@ pub fn build_row_fetcher_pipeline(
fuse_table.clone(),
projection.clone(),
block_reader.clone(),
column_leaves,
column_leaves.clone(),
max_threads,
),
)
} else {
Expand All @@ -89,16 +92,19 @@ pub fn build_row_fetcher_pipeline(
fuse_table.clone(),
projection.clone(),
block_reader.clone(),
column_leaves,
column_leaves.clone(),
max_threads,
),
)
}
}
FuseStorageFormat::Parquet => {
let buffer_size =
ctx.get_settings().get_parquet_uncompressed_buffer_size()? as usize;
let read_settings = ReadSettings::from_ctx(&ctx)?;
if block_reader.support_blocking_api() {
})
})
}
FuseStorageFormat::Parquet => {
let buffer_size: usize =
ctx.get_settings().get_parquet_uncompressed_buffer_size()? as usize;
let read_settings = ReadSettings::from_ctx(&ctx)?;
pipeline.add_transform(|input, output| {
Ok(if block_reader.support_blocking_api() {
TransformRowsFetcher::create(
input,
output,
Expand All @@ -109,6 +115,7 @@ pub fn build_row_fetcher_pipeline(
block_reader.clone(),
read_settings,
buffer_size,
max_threads,
),
)
} else {
Expand All @@ -122,12 +129,13 @@ pub fn build_row_fetcher_pipeline(
block_reader.clone(),
read_settings,
buffer_size,
max_threads,
),
)
}
}
})
})
})
})
}
}
}

#[async_trait::async_trait]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl SyncSource for ReadNativeDataSource<true> {
{
let mut source_data = self
.block_reader
.sync_read_native_columns_data(part.clone(), &ignore_column_ids)?;
.sync_read_native_columns_data(&part, &ignore_column_ids)?;
source_data.append(&mut virtual_source_data);
return Ok(Some(DataBlock::empty_with_meta(
NativeDataSourceMeta::create(vec![part.clone()], vec![
Expand All @@ -151,7 +151,7 @@ impl SyncSource for ReadNativeDataSource<true> {
Ok(Some(DataBlock::empty_with_meta(
NativeDataSourceMeta::create(vec![part.clone()], vec![DataSource::Normal(
self.block_reader
.sync_read_native_columns_data(part, &None)?,
.sync_read_native_columns_data(&part, &None)?,
)]),
)))
}
Expand Down Expand Up @@ -229,10 +229,7 @@ impl Processor for ReadNativeDataSource<false> {
virtual_reader.read_native_data(&loc).await
{
let mut source_data = block_reader
.async_read_native_columns_data(
part.clone(),
&ignore_column_ids,
)
.async_read_native_columns_data(&part, &ignore_column_ids)
.await?;
source_data.append(&mut virtual_source_data);
return Ok(DataSource::Normal(source_data));
Expand All @@ -241,7 +238,7 @@ impl Processor for ReadNativeDataSource<false> {

Ok(DataSource::Normal(
block_reader
.async_read_native_columns_data(part, &None)
.async_read_native_columns_data(&part, &None)
.await?,
))
}));
Expand Down
Loading
Loading