From 730ca2318f599dc67f280c21c1a23ae22b397364 Mon Sep 17 00:00:00 2001 From: hezheyu Date: Fri, 25 Aug 2023 17:34:23 +0800 Subject: [PATCH 1/5] Parallelize parquet rows fetcher. --- .../fuse/src/io/read/read_settings.rs | 2 +- .../src/operations/read/fuse_rows_fetcher.rs | 60 ++++++---- .../operations/read/native_rows_fetcher.rs | 52 +++++--- .../operations/read/parquet_rows_fetcher.rs | 112 +++++++++++++++--- 4 files changed, 162 insertions(+), 64 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/read_settings.rs b/src/query/storages/fuse/src/io/read/read_settings.rs index e109405691fa..156a5369e810 100644 --- a/src/query/storages/fuse/src/io/read/read_settings.rs +++ b/src/query/storages/fuse/src/io/read/read_settings.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use common_catalog::table_context::TableContext; use common_exception::Result; -#[derive(Clone)] +#[derive(Clone, Copy)] pub struct ReadSettings { pub storage_io_min_bytes_for_seek: u64, pub storage_io_max_page_bytes_for_read: u64, diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs index c580dd081206..8ef6a51d4f0d 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs @@ -55,20 +55,22 @@ pub fn build_row_fetcher_pipeline( .to_owned(); let fuse_table = Arc::new(fuse_table); let block_reader = fuse_table.create_block_reader(projection.clone(), false, ctx.clone())?; + let max_threads = ctx.get_settings().get_max_threads()? as usize; - pipeline.add_transform(|input, output| { - Ok(match &fuse_table.storage_format { - FuseStorageFormat::Native => { - let mut column_leaves = Vec::with_capacity(block_reader.project_column_nodes.len()); - for column_node in &block_reader.project_column_nodes { - let leaves: Vec = column_node - .leaf_indices - .iter() - .map(|i| block_reader.parquet_schema_descriptor.columns()[*i].clone()) - .collect::>(); - column_leaves.push(leaves); - } - if block_reader.support_blocking_api() { + match &fuse_table.storage_format { + FuseStorageFormat::Native => { + let mut column_leaves = Vec::with_capacity(block_reader.project_column_nodes.len()); + for column_node in &block_reader.project_column_nodes { + let leaves: Vec = column_node + .leaf_indices + .iter() + .map(|i| block_reader.parquet_schema_descriptor.columns()[*i].clone()) + .collect::>(); + column_leaves.push(leaves); + } + let column_leaves = Arc::new(column_leaves); + pipeline.add_transform(|input, output| { + Ok(if block_reader.support_blocking_api() { TransformRowsFetcher::create( input, output, @@ -77,7 +79,8 @@ pub fn build_row_fetcher_pipeline( fuse_table.clone(), projection.clone(), block_reader.clone(), - column_leaves, + column_leaves.clone(), + max_threads, ), ) } else { @@ -89,16 +92,19 @@ pub fn build_row_fetcher_pipeline( fuse_table.clone(), projection.clone(), block_reader.clone(), - column_leaves, + column_leaves.clone(), + max_threads, ), ) - } - } - FuseStorageFormat::Parquet => { - let buffer_size = - ctx.get_settings().get_parquet_uncompressed_buffer_size()? as usize; - let read_settings = ReadSettings::from_ctx(&ctx)?; - if block_reader.support_blocking_api() { + }) + }) + } + FuseStorageFormat::Parquet => { + let buffer_size: usize = + ctx.get_settings().get_parquet_uncompressed_buffer_size()? as usize; + let read_settings = ReadSettings::from_ctx(&ctx)?; + pipeline.add_transform(|input, output| { + Ok(if block_reader.support_blocking_api() { TransformRowsFetcher::create( input, output, @@ -109,6 +115,7 @@ pub fn build_row_fetcher_pipeline( block_reader.clone(), read_settings, buffer_size, + max_threads, ), ) } else { @@ -122,12 +129,13 @@ pub fn build_row_fetcher_pipeline( block_reader.clone(), read_settings, buffer_size, + max_threads, ), ) - } - } - }) - }) + }) + }) + } + } } #[async_trait::async_trait] diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs index e7f5de3800a9..bec74c300cbd 100644 --- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs @@ -46,10 +46,13 @@ pub(super) struct NativeRowsFetcher { projection: Projection, schema: TableSchemaRef, reader: Arc, - column_leaves: Vec>, + column_leaves: Arc>>, // The value contains part info and the page size of the corresponding block file. - part_map: HashMap, + part_map: Arc>, + + // To control the parallelism of fetching blocks. + _max_threads: usize, } #[async_trait::async_trait] @@ -89,7 +92,13 @@ impl RowsFetcher for NativeRowsFetcher { }) .collect(); - let (blocks, idx_map) = self.fetch_blocks(part_set).await?; + let (blocks, idx_map) = Self::fetch_blocks( + self.reader.clone(), + part_set, + self.part_map.clone(), + self.column_leaves.clone(), + ) + .await?; let indices = row_set .iter() .map(|(prefix, page_idx, idx)| { @@ -111,7 +120,8 @@ impl NativeRowsFetcher { table: Arc, projection: Projection, reader: Arc, - column_leaves: Vec>, + column_leaves: Arc>>, + max_threads: usize, ) -> Self { let schema = table.schema(); let segment_reader = @@ -125,7 +135,8 @@ impl NativeRowsFetcher { schema, reader, column_leaves, - part_map: HashMap::new(), + part_map: Arc::new(HashMap::new()), + _max_threads: max_threads, } } @@ -135,6 +146,7 @@ impl NativeRowsFetcher { let arrow_schema = self.schema.to_arrow(); let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema)); + let mut part_map = HashMap::new(); for row_id in row_ids { let (prefix, _) = split_row_id(*row_id); @@ -166,23 +178,26 @@ impl NativeRowsFetcher { &self.projection, ); - self.part_map.insert(prefix, (part_info, page_size)); + part_map.insert(prefix, (part_info, page_size)); } + self.part_map = Arc::new(part_map); + Ok(()) } fn build_blocks( - &self, + reader: &BlockReader, mut chunks: NativeSourceData, needed_pages: &[u64], + column_leaves: &Vec>, ) -> Result> { let mut array_iters = BTreeMap::new(); - for (index, column_node) in self.reader.project_column_nodes.iter().enumerate() { + for (index, column_node) in reader.project_column_nodes.iter().enumerate() { let readers = chunks.remove(&index).unwrap(); if !readers.is_empty() { - let leaves = self.column_leaves.get(index).unwrap().clone(); + let leaves = column_leaves.get(index).unwrap().clone(); let array_iter = BlockReader::build_array_iter(column_node, leaves, readers)?; array_iters.insert(index, array_iter); } @@ -204,7 +219,7 @@ impl NativeRowsFetcher { arrays.push((*index, array)); } offset = *page + 1; - let block = self.reader.build_block(arrays, None)?; + let block = reader.build_block(arrays, None)?; blocks.push(block); } @@ -214,23 +229,22 @@ impl NativeRowsFetcher { #[allow(clippy::type_complexity)] #[async_backtrace::framed] async fn fetch_blocks( - &self, + reader: Arc, part_set: HashMap>, + part_map: Arc>, + column_leaves: Arc>>, ) -> Result<(Vec, HashMap<(u64, u64), usize>)> { let mut chunks = Vec::with_capacity(part_set.len()); if BLOCKING_IO { for (prefix, needed_pages) in part_set.into_iter() { - let part = self.part_map[&prefix].0.clone(); - let chunk = self.reader.sync_read_native_columns_data(part, &None)?; + let part = part_map[&prefix].0.clone(); + let chunk = reader.sync_read_native_columns_data(part, &None)?; chunks.push((prefix, chunk, needed_pages)); } } else { for (prefix, needed_pages) in part_set.into_iter() { - let part = self.part_map[&prefix].0.clone(); - let chunk = self - .reader - .async_read_native_columns_data(part, &None) - .await?; + let part = part_map[&prefix].0.clone(); + let chunk = reader.async_read_native_columns_data(part, &None).await?; chunks.push((prefix, chunk, needed_pages)); } } @@ -243,7 +257,7 @@ impl NativeRowsFetcher { let mut offset = 0_usize; for (prefix, chunk, needed_pages) in chunks.into_iter() { - let fetched_blocks = self.build_blocks(chunk, &needed_pages)?; + let fetched_blocks = Self::build_blocks(&reader, chunk, &needed_pages, &column_leaves)?; for (block, page) in fetched_blocks.into_iter().zip(needed_pages) { idx_map.insert((prefix, page), offset); offset += 1; diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 1167deca500f..e6ec32842898 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use common_base::runtime::execute_futures_in_parallel; use common_catalog::plan::block_idx_in_segment; use common_catalog::plan::split_prefix; use common_catalog::plan::split_row_id; @@ -27,6 +28,7 @@ use common_expression::DataBlock; use common_expression::DataSchema; use common_expression::TableSchemaRef; use common_storage::ColumnNodes; +use itertools::Itertools; use storages_common_cache::LoadParams; use storages_common_table_meta::meta::TableSnapshot; @@ -50,7 +52,10 @@ pub(super) struct ParquetRowsFetcher { settings: ReadSettings, reader: Arc, uncompressed_buffer: Arc, - part_map: HashMap, + part_map: Arc>, + + // To control the parallelism of fetching blocks. + max_threads: usize, } #[async_trait::async_trait] @@ -74,7 +79,63 @@ impl RowsFetcher for ParquetRowsFetcher { row_set.push((prefix, idx)); } - let (blocks, idx_map) = self.fetch_blocks(part_set).await?; + // part_set.len() = parts_per_thread * max_threads + remain + // task distribution: + // Part number of each task | Task number + // ------------------------------------------------------ + // parts_per_thread + 1 | remain + // parts_per_thread | max_threads - remain + let part_set = part_set.into_iter().sorted().collect::>(); + let num_parts = part_set.len(); + let parts_per_thread = num_parts / self.max_threads; + let remain = num_parts % self.max_threads as usize; + let mut tasks = Vec::with_capacity(self.max_threads); + // Fetch blocks in parallel. + for i in 0..remain { + let parts = + part_set[i * (parts_per_thread + 1)..(i + 1) * (parts_per_thread + 1)].to_vec(); + tasks.push(Self::fetch_blocks( + self.reader.clone(), + parts, + self.part_map.clone(), + self.uncompressed_buffer.clone(), + self.settings, + )) + } + let offset = remain * (parts_per_thread + 1); + for i in 0..(self.max_threads - remain) { + let parts = part_set + [offset + i * parts_per_thread..offset + (i + 1) * parts_per_thread] + .to_vec(); + tasks.push(Self::fetch_blocks( + self.reader.clone(), + parts, + self.part_map.clone(), + self.uncompressed_buffer.clone(), + self.settings, + )) + } + + let result = execute_futures_in_parallel( + tasks, + self.max_threads, + self.max_threads * 2, + "parqeut rows fetch".to_string(), + ) + .await? + .into_iter() + .collect::>>()?; + // Merge blocks and idx_map. + let mut blocks = Vec::with_capacity(num_parts); + let mut idx_map = HashMap::with_capacity(num_parts); + for (bs, m) in result { + let offset = blocks.len(); + blocks.extend(bs); + for (k, v) in m { + idx_map.insert(k, v + offset); + } + } + // Take result rows from blocks. let indices = row_set .iter() .map(|(prefix, row_idx)| { @@ -98,6 +159,7 @@ impl ParquetRowsFetcher { reader: Arc, settings: ReadSettings, buffer_size: usize, + max_threads: usize, ) -> Self { let uncompressed_buffer = UncompressedBuffer::new(buffer_size); let schema = table.schema(); @@ -112,7 +174,8 @@ impl ParquetRowsFetcher { reader, settings, uncompressed_buffer, - part_map: HashMap::new(), + part_map: Arc::new(HashMap::new()), + max_threads, } } @@ -122,6 +185,7 @@ impl ParquetRowsFetcher { let arrow_schema = self.schema.to_arrow(); let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema)); + let mut part_map = HashMap::new(); for row_id in row_ids { let (prefix, _) = split_row_id(*row_id); @@ -152,34 +216,36 @@ impl ParquetRowsFetcher { &self.projection, ); - self.part_map.insert(prefix, part_info); + part_map.insert(prefix, part_info); } + self.part_map = Arc::new(part_map); + Ok(()) } #[async_backtrace::framed] async fn fetch_blocks( - &self, - part_set: HashSet, + reader: Arc, + part_set: Vec, + part_map: Arc>, + uncompressed_buffer: Arc, + settings: ReadSettings, ) -> Result<(Vec, HashMap)> { let mut chunks = Vec::with_capacity(part_set.len()); if BLOCKING_IO { for prefix in part_set.into_iter() { - let part = self.part_map[&prefix].clone(); - let chunk = - self.reader - .sync_read_columns_data_by_merge_io(&self.settings, part, &None)?; + let part = part_map[&prefix].clone(); + let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?; chunks.push((prefix, chunk)); } } else { for prefix in part_set.into_iter() { - let part = self.part_map[&prefix].clone(); + let part = part_map[&prefix].clone(); let part = FusePartInfo::from_part(&part)?; - let chunk = self - .reader + let chunk = reader .read_columns_data_by_merge_io( - &self.settings, + &settings, &part.location, &part.columns_meta, &None, @@ -194,23 +260,33 @@ impl ParquetRowsFetcher { .enumerate() .map(|(idx, (part, chunk))| { idx_map.insert(part, idx); - self.build_block(&self.part_map[&part], chunk) + Self::build_block( + &reader, + &part_map[&part], + chunk, + uncompressed_buffer.clone(), + ) }) .collect::>>()?; Ok((fetched_blocks, idx_map)) } - fn build_block(&self, part: &PartInfoPtr, chunk: MergeIOReadResult) -> Result { + fn build_block( + reader: &BlockReader, + part: &PartInfoPtr, + chunk: MergeIOReadResult, + uncompressed_buffer: Arc, + ) -> Result { let columns_chunks = chunk.columns_chunks()?; let part = FusePartInfo::from_part(part)?; - self.reader.deserialize_parquet_chunks_with_buffer( + reader.deserialize_parquet_chunks_with_buffer( &part.location, part.nums_rows, &part.compression, &part.columns_meta, columns_chunks, - Some(self.uncompressed_buffer.clone()), + Some(uncompressed_buffer), ) } } From f0e1bdf3131bdbf9ec72dae95c59e1a48ba0f688 Mon Sep 17 00:00:00 2001 From: hezheyu Date: Fri, 25 Aug 2023 21:01:36 +0800 Subject: [PATCH 2/5] Each threads hold one uncompressed buffer. --- .../operations/read/parquet_rows_fetcher.rs | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index e6ec32842898..8beb8f557bb6 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -51,7 +51,9 @@ pub(super) struct ParquetRowsFetcher { settings: ReadSettings, reader: Arc, - uncompressed_buffer: Arc, + // [`UncompressedBuffer`] cannot be shared between multiple threads at the same time. + // So we create a [`UncompressedBuffer`] for each thread. + uncompressed_buffers: Vec>, part_map: Arc>, // To control the parallelism of fetching blocks. @@ -98,28 +100,31 @@ impl RowsFetcher for ParquetRowsFetcher { self.reader.clone(), parts, self.part_map.clone(), - self.uncompressed_buffer.clone(), + self.uncompressed_buffers[i].clone(), self.settings, )) } - let offset = remain * (parts_per_thread + 1); - for i in 0..(self.max_threads - remain) { - let parts = part_set - [offset + i * parts_per_thread..offset + (i + 1) * parts_per_thread] - .to_vec(); - tasks.push(Self::fetch_blocks( - self.reader.clone(), - parts, - self.part_map.clone(), - self.uncompressed_buffer.clone(), - self.settings, - )) + if parts_per_thread > 0 { + let offset = remain * (parts_per_thread + 1); + for i in 0..(self.max_threads - remain) { + let parts = part_set + [offset + i * parts_per_thread..offset + (i + 1) * parts_per_thread] + .to_vec(); + tasks.push(Self::fetch_blocks( + self.reader.clone(), + parts, + self.part_map.clone(), + self.uncompressed_buffers[i].clone(), + self.settings, + )) + } } + let num_task = tasks.len(); let result = execute_futures_in_parallel( tasks, - self.max_threads, - self.max_threads * 2, + num_task, + num_task * 2, "parqeut rows fetch".to_string(), ) .await? @@ -161,7 +166,10 @@ impl ParquetRowsFetcher { buffer_size: usize, max_threads: usize, ) -> Self { - let uncompressed_buffer = UncompressedBuffer::new(buffer_size); + let mut uncompressed_buffers = Vec::with_capacity(max_threads); + for _ in 0..max_threads { + uncompressed_buffers.push(UncompressedBuffer::new(buffer_size)); + } let schema = table.schema(); let segment_reader = MetaReaders::segment_info_reader(table.operator.clone(), schema.clone()); @@ -173,7 +181,7 @@ impl ParquetRowsFetcher { schema, reader, settings, - uncompressed_buffer, + uncompressed_buffers, part_map: Arc::new(HashMap::new()), max_threads, } From 5017275fdd3e21229c25cd0c9d65a76f8a7c2402 Mon Sep 17 00:00:00 2001 From: hezheyu Date: Fri, 25 Aug 2023 21:28:24 +0800 Subject: [PATCH 3/5] Refactor codes. --- .../agg_index/agg_index_reader_parquet.rs | 2 +- .../read/block/block_reader_merge_io_sync.rs | 4 +- .../operations/read/native_rows_fetcher.rs | 2 +- .../read/parquet_data_source_reader.rs | 2 +- .../operations/read/parquet_rows_fetcher.rs | 79 ++++++++----------- .../storages/fuse/src/operations/replace.rs | 2 +- 6 files changed, 39 insertions(+), 52 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs index f0525e9c9238..ce5825ca2a1e 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs @@ -54,7 +54,7 @@ impl AggIndexReader { ); let res = self .reader - .sync_read_columns_data_by_merge_io(read_settings, part.clone(), &None) + .sync_read_columns_data_by_merge_io(read_settings, &part, &None) .inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}")) .ok()?; Some((part, res)) diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs index cc3af7020f13..15a03ecb4f3e 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs @@ -97,10 +97,10 @@ impl BlockReader { pub fn sync_read_columns_data_by_merge_io( &self, settings: &ReadSettings, - part: PartInfoPtr, + part: &PartInfoPtr, ignore_column_ids: &Option>, ) -> Result { - let part = FusePartInfo::from_part(&part)?; + let part = FusePartInfo::from_part(part)?; let column_array_cache = CacheManager::instance().get_table_data_array_cache(); let mut ranges = vec![]; diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs index bec74c300cbd..12b72794e436 100644 --- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs @@ -190,7 +190,7 @@ impl NativeRowsFetcher { reader: &BlockReader, mut chunks: NativeSourceData, needed_pages: &[u64], - column_leaves: &Vec>, + column_leaves: &[Vec], ) -> Result> { let mut array_iters = BTreeMap::new(); diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs index c9c26e178964..6782b39eb2bd 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs @@ -142,7 +142,7 @@ impl SyncSource for ReadParquetDataSource { let source = self.block_reader.sync_read_columns_data_by_merge_io( &ReadSettings::from_ctx(&self.partitions.ctx)?, - part.clone(), + &part, ignore_column_ids, )?; diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 8beb8f557bb6..be02c1022a6e 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -54,7 +54,7 @@ pub(super) struct ParquetRowsFetcher { // [`UncompressedBuffer`] cannot be shared between multiple threads at the same time. // So we create a [`UncompressedBuffer`] for each thread. uncompressed_buffers: Vec>, - part_map: Arc>, + part_map: HashMap, // To control the parallelism of fetching blocks. max_threads: usize, @@ -88,18 +88,24 @@ impl RowsFetcher for ParquetRowsFetcher { // parts_per_thread + 1 | remain // parts_per_thread | max_threads - remain let part_set = part_set.into_iter().sorted().collect::>(); + let idx_map = part_set + .iter() + .enumerate() + .map(|(i, p)| (*p, i)) + .collect::>(); let num_parts = part_set.len(); let parts_per_thread = num_parts / self.max_threads; - let remain = num_parts % self.max_threads as usize; + let remain = num_parts % self.max_threads; let mut tasks = Vec::with_capacity(self.max_threads); // Fetch blocks in parallel. for i in 0..remain { - let parts = - part_set[i * (parts_per_thread + 1)..(i + 1) * (parts_per_thread + 1)].to_vec(); + let parts = part_set[i * (parts_per_thread + 1)..(i + 1) * (parts_per_thread + 1)] + .iter() + .map(|idx| self.part_map[idx].clone()) + .collect::>(); tasks.push(Self::fetch_blocks( self.reader.clone(), parts, - self.part_map.clone(), self.uncompressed_buffers[i].clone(), self.settings, )) @@ -109,11 +115,12 @@ impl RowsFetcher for ParquetRowsFetcher { for i in 0..(self.max_threads - remain) { let parts = part_set [offset + i * parts_per_thread..offset + (i + 1) * parts_per_thread] - .to_vec(); + .iter() + .map(|idx| self.part_map[idx].clone()) + .collect::>(); tasks.push(Self::fetch_blocks( self.reader.clone(), parts, - self.part_map.clone(), self.uncompressed_buffers[i].clone(), self.settings, )) @@ -121,7 +128,7 @@ impl RowsFetcher for ParquetRowsFetcher { } let num_task = tasks.len(); - let result = execute_futures_in_parallel( + let blocks = execute_futures_in_parallel( tasks, num_task, num_task * 2, @@ -129,17 +136,10 @@ impl RowsFetcher for ParquetRowsFetcher { ) .await? .into_iter() - .collect::>>()?; - // Merge blocks and idx_map. - let mut blocks = Vec::with_capacity(num_parts); - let mut idx_map = HashMap::with_capacity(num_parts); - for (bs, m) in result { - let offset = blocks.len(); - blocks.extend(bs); - for (k, v) in m { - idx_map.insert(k, v + offset); - } - } + .collect::>>()? + .into_iter() + .flatten() + .collect::>(); // Take result rows from blocks. let indices = row_set .iter() @@ -182,7 +182,7 @@ impl ParquetRowsFetcher { reader, settings, uncompressed_buffers, - part_map: Arc::new(HashMap::new()), + part_map: HashMap::new(), max_threads, } } @@ -193,7 +193,6 @@ impl ParquetRowsFetcher { let arrow_schema = self.schema.to_arrow(); let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema)); - let mut part_map = HashMap::new(); for row_id in row_ids { let (prefix, _) = split_row_id(*row_id); @@ -224,33 +223,28 @@ impl ParquetRowsFetcher { &self.projection, ); - part_map.insert(prefix, part_info); + self.part_map.insert(prefix, part_info); } - self.part_map = Arc::new(part_map); - Ok(()) } #[async_backtrace::framed] async fn fetch_blocks( reader: Arc, - part_set: Vec, - part_map: Arc>, + parts: Vec, uncompressed_buffer: Arc, settings: ReadSettings, - ) -> Result<(Vec, HashMap)> { - let mut chunks = Vec::with_capacity(part_set.len()); + ) -> Result> { + let mut chunks = Vec::with_capacity(parts.len()); if BLOCKING_IO { - for prefix in part_set.into_iter() { - let part = part_map[&prefix].clone(); + for part in parts.iter() { let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?; - chunks.push((prefix, chunk)); + chunks.push(chunk); } } else { - for prefix in part_set.into_iter() { - let part = part_map[&prefix].clone(); - let part = FusePartInfo::from_part(&part)?; + for part in parts.iter() { + let part = FusePartInfo::from_part(part)?; let chunk = reader .read_columns_data_by_merge_io( &settings, @@ -259,25 +253,18 @@ impl ParquetRowsFetcher { &None, ) .await?; - chunks.push((prefix, chunk)); + chunks.push(chunk); } } - let mut idx_map = HashMap::with_capacity(chunks.len()); let fetched_blocks = chunks .into_iter() - .enumerate() - .map(|(idx, (part, chunk))| { - idx_map.insert(part, idx); - Self::build_block( - &reader, - &part_map[&part], - chunk, - uncompressed_buffer.clone(), - ) + .zip(parts.iter()) + .map(|(chunk, part)| { + Self::build_block(&reader, part, chunk, uncompressed_buffer.clone()) }) .collect::>>()?; - Ok((fetched_blocks, idx_map)) + Ok(fetched_blocks) } fn build_block( diff --git a/src/query/storages/fuse/src/operations/replace.rs b/src/query/storages/fuse/src/operations/replace.rs index 438448e354a8..c5a07d3985c6 100644 --- a/src/query/storages/fuse/src/operations/replace.rs +++ b/src/query/storages/fuse/src/operations/replace.rs @@ -119,7 +119,7 @@ impl FuseTable { self.operator.clone(), self.table_info.schema(), self.get_write_settings(), - read_settings.clone(), + read_settings, block_builder.clone(), io_request_semaphore.clone(), )?; From aed05493c4eae4861578df3925803ba22dbe3dcf Mon Sep 17 00:00:00 2001 From: hezheyu Date: Fri, 25 Aug 2023 22:54:06 +0800 Subject: [PATCH 4/5] Parallelize native rows fetcher. --- .../read/agg_index/agg_index_reader_native.rs | 4 +- .../src/io/read/block/block_reader_native.rs | 8 +- .../read/native_data_source_reader.rs | 11 +- .../operations/read/native_rows_fetcher.rs | 118 ++++++++++++------ .../operations/read/parquet_rows_fetcher.rs | 13 +- 5 files changed, 95 insertions(+), 59 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs index 318f2cb24aa7..383b66ae1637 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs @@ -57,7 +57,7 @@ impl AggIndexReader { ); let res = self .reader - .sync_read_native_columns_data(part, &None) + .sync_read_native_columns_data(&part, &None) .inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}")) .ok()?; Some(res) @@ -108,7 +108,7 @@ impl AggIndexReader { ); let res = self .reader - .async_read_native_columns_data(part, &None) + .async_read_native_columns_data(&part, &None) .await .inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}")) .ok()?; diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_native.rs b/src/query/storages/fuse/src/io/read/block/block_reader_native.rs index 33459a0b78a8..e2decfc50d13 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_native.rs @@ -52,7 +52,7 @@ impl BlockReader { #[async_backtrace::framed] pub async fn async_read_native_columns_data( &self, - part: PartInfoPtr, + part: &PartInfoPtr, ignore_column_ids: &Option>, ) -> Result { // Perf @@ -60,7 +60,7 @@ impl BlockReader { metrics_inc_remote_io_read_parts(1); } - let part = FusePartInfo::from_part(&part)?; + let part = FusePartInfo::from_part(part)?; let mut join_handlers = Vec::with_capacity(self.project_column_nodes.len()); for (index, column_node) in self.project_column_nodes.iter().enumerate() { @@ -148,10 +148,10 @@ impl BlockReader { pub fn sync_read_native_columns_data( &self, - part: PartInfoPtr, + part: &PartInfoPtr, ignore_column_ids: &Option>, ) -> Result { - let part = FusePartInfo::from_part(&part)?; + let part = FusePartInfo::from_part(part)?; let mut results: BTreeMap>> = BTreeMap::new(); for (index, column_node) in self.project_column_nodes.iter().enumerate() { diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs index 022cf598470c..7cdd2526b6c4 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs @@ -138,7 +138,7 @@ impl SyncSource for ReadNativeDataSource { { let mut source_data = self .block_reader - .sync_read_native_columns_data(part.clone(), &ignore_column_ids)?; + .sync_read_native_columns_data(&part, &ignore_column_ids)?; source_data.append(&mut virtual_source_data); return Ok(Some(DataBlock::empty_with_meta( NativeDataSourceMeta::create(vec![part.clone()], vec![ @@ -151,7 +151,7 @@ impl SyncSource for ReadNativeDataSource { Ok(Some(DataBlock::empty_with_meta( NativeDataSourceMeta::create(vec![part.clone()], vec![DataSource::Normal( self.block_reader - .sync_read_native_columns_data(part, &None)?, + .sync_read_native_columns_data(&part, &None)?, )]), ))) } @@ -229,10 +229,7 @@ impl Processor for ReadNativeDataSource { virtual_reader.read_native_data(&loc).await { let mut source_data = block_reader - .async_read_native_columns_data( - part.clone(), - &ignore_column_ids, - ) + .async_read_native_columns_data(&part, &ignore_column_ids) .await?; source_data.append(&mut virtual_source_data); return Ok(DataSource::Normal(source_data)); @@ -241,7 +238,7 @@ impl Processor for ReadNativeDataSource { Ok(DataSource::Normal( block_reader - .async_read_native_columns_data(part, &None) + .async_read_native_columns_data(&part, &None) .await?, )) })); diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs index 12b72794e436..852e88fcd043 100644 --- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs @@ -18,6 +18,7 @@ use std::collections::HashSet; use std::sync::Arc; use common_arrow::parquet::metadata::ColumnDescriptor; +use common_base::runtime::execute_futures_in_parallel; use common_catalog::plan::block_idx_in_segment; use common_catalog::plan::split_prefix; use common_catalog::plan::split_row_id; @@ -29,6 +30,7 @@ use common_expression::DataBlock; use common_expression::DataSchema; use common_expression::TableSchemaRef; use common_storage::ColumnNodes; +use itertools::Itertools; use storages_common_cache::LoadParams; use storages_common_table_meta::meta::TableSnapshot; @@ -49,10 +51,10 @@ pub(super) struct NativeRowsFetcher { column_leaves: Arc>>, // The value contains part info and the page size of the corresponding block file. - part_map: Arc>, + part_map: HashMap, // To control the parallelism of fetching blocks. - _max_threads: usize, + max_threads: usize, } #[async_trait::async_trait] @@ -83,22 +85,74 @@ impl RowsFetcher for NativeRowsFetcher { .or_insert(HashSet::from([page_idx])); row_set.push((prefix, page_idx, idx_within_page)); } + + // Read blocks in `prefix` order. let part_set = part_set .into_iter() + .sorted_by_key(|(k, _)| *k) .map(|(k, v)| { let mut v = v.into_iter().collect::>(); v.sort(); (k, v) }) - .collect(); + .collect::>(); + let mut idx_map = HashMap::with_capacity(part_set.len()); + for (p, pages) in part_set.iter() { + for page in pages { + idx_map.insert((*p, *page), idx_map.len()); + } + } + // part_set.len() = parts_per_thread * max_threads + remain + // task distribution: + // Part number of each task | Task number + // ------------------------------------------------------ + // parts_per_thread + 1 | remain + // parts_per_thread | max_threads - remain + let num_parts = part_set.len(); + let parts_per_thread = num_parts / self.max_threads; + let remain = num_parts % self.max_threads; + let mut tasks = Vec::with_capacity(self.max_threads); + // Fetch blocks in parallel. + for i in 0..remain { + let parts = part_set[i * (parts_per_thread + 1)..(i + 1) * (parts_per_thread + 1)] + .iter() + .map(|(idx, pages)| (self.part_map[idx].0.clone(), pages.clone())) + .collect::>(); + tasks.push(Self::fetch_blocks( + self.reader.clone(), + parts, + self.column_leaves.clone(), + )) + } + if parts_per_thread > 0 { + let offset = remain * (parts_per_thread + 1); + for i in 0..(self.max_threads - remain) { + let parts = part_set + [offset + i * parts_per_thread..offset + (i + 1) * parts_per_thread] + .iter() + .map(|(idx, pages)| (self.part_map[idx].0.clone(), pages.clone())) + .collect::>(); + tasks.push(Self::fetch_blocks( + self.reader.clone(), + parts, + self.column_leaves.clone(), + )) + } + } - let (blocks, idx_map) = Self::fetch_blocks( - self.reader.clone(), - part_set, - self.part_map.clone(), - self.column_leaves.clone(), + let num_task = tasks.len(); + let blocks = execute_futures_in_parallel( + tasks, + num_task, + num_task * 2, + "parqeut rows fetch".to_string(), ) - .await?; + .await? + .into_iter() + .collect::>>()? + .into_iter() + .flatten() + .collect::>(); let indices = row_set .iter() .map(|(prefix, page_idx, idx)| { @@ -135,8 +189,8 @@ impl NativeRowsFetcher { schema, reader, column_leaves, - part_map: Arc::new(HashMap::new()), - _max_threads: max_threads, + part_map: HashMap::new(), + max_threads, } } @@ -146,7 +200,6 @@ impl NativeRowsFetcher { let arrow_schema = self.schema.to_arrow(); let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema)); - let mut part_map = HashMap::new(); for row_id in row_ids { let (prefix, _) = split_row_id(*row_id); @@ -178,11 +231,9 @@ impl NativeRowsFetcher { &self.projection, ); - part_map.insert(prefix, (part_info, page_size)); + self.part_map.insert(prefix, (part_info, page_size)); } - self.part_map = Arc::new(part_map); - Ok(()) } @@ -230,41 +281,28 @@ impl NativeRowsFetcher { #[async_backtrace::framed] async fn fetch_blocks( reader: Arc, - part_set: HashMap>, - part_map: Arc>, + parts: Vec<(PartInfoPtr, Vec)>, column_leaves: Arc>>, - ) -> Result<(Vec, HashMap<(u64, u64), usize>)> { - let mut chunks = Vec::with_capacity(part_set.len()); + ) -> Result> { + let mut chunks = Vec::with_capacity(parts.len()); if BLOCKING_IO { - for (prefix, needed_pages) in part_set.into_iter() { - let part = part_map[&prefix].0.clone(); + for (part, _) in parts.iter() { let chunk = reader.sync_read_native_columns_data(part, &None)?; - chunks.push((prefix, chunk, needed_pages)); + chunks.push(chunk); } } else { - for (prefix, needed_pages) in part_set.into_iter() { - let part = part_map[&prefix].0.clone(); + for (part, _) in parts.iter() { let chunk = reader.async_read_native_columns_data(part, &None).await?; - chunks.push((prefix, chunk, needed_pages)); + chunks.push(chunk); } } - let num_blocks = chunks - .iter() - .map(|(_, _, pages)| pages.len()) - .sum::(); - let mut idx_map = HashMap::with_capacity(num_blocks); + let num_blocks = parts.iter().map(|(_, p)| p.len()).sum::(); let mut blocks = Vec::with_capacity(num_blocks); - - let mut offset = 0_usize; - for (prefix, chunk, needed_pages) in chunks.into_iter() { - let fetched_blocks = Self::build_blocks(&reader, chunk, &needed_pages, &column_leaves)?; - for (block, page) in fetched_blocks.into_iter().zip(needed_pages) { - idx_map.insert((prefix, page), offset); - offset += 1; - blocks.push(block); - } + for (chunk, (_, needed_pages)) in chunks.into_iter().zip(parts.iter()) { + let fetched_blocks = Self::build_blocks(&reader, chunk, needed_pages, &column_leaves)?; + blocks.extend(fetched_blocks); } - Ok((blocks, idx_map)) + Ok(blocks) } } diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index be02c1022a6e..58065c7dede5 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -81,18 +81,19 @@ impl RowsFetcher for ParquetRowsFetcher { row_set.push((prefix, idx)); } - // part_set.len() = parts_per_thread * max_threads + remain - // task distribution: - // Part number of each task | Task number - // ------------------------------------------------------ - // parts_per_thread + 1 | remain - // parts_per_thread | max_threads - remain + // Read blocks in `prefix` order. let part_set = part_set.into_iter().sorted().collect::>(); let idx_map = part_set .iter() .enumerate() .map(|(i, p)| (*p, i)) .collect::>(); + // part_set.len() = parts_per_thread * max_threads + remain + // task distribution: + // Part number of each task | Task number + // ------------------------------------------------------ + // parts_per_thread + 1 | remain + // parts_per_thread | max_threads - remain let num_parts = part_set.len(); let parts_per_thread = num_parts / self.max_threads; let remain = num_parts % self.max_threads; From 30d45aa8a0ab79e4410b6544d88728fd08147147 Mon Sep 17 00:00:00 2001 From: hezheyu Date: Sat, 26 Aug 2023 10:57:59 +0800 Subject: [PATCH 5/5] Refactor codes. --- .../operations/read/native_rows_fetcher.rs | 31 ++++++------------- .../operations/read/parquet_rows_fetcher.rs | 30 ++++++------------ 2 files changed, 19 insertions(+), 42 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs index 852e88fcd043..2603ba2a1adf 100644 --- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs @@ -102,19 +102,23 @@ impl RowsFetcher for NativeRowsFetcher { idx_map.insert((*p, *page), idx_map.len()); } } - // part_set.len() = parts_per_thread * max_threads + remain + // parts_per_thread = num_parts / max_threads + // remain = num_parts % max_threads // task distribution: // Part number of each task | Task number // ------------------------------------------------------ // parts_per_thread + 1 | remain // parts_per_thread | max_threads - remain let num_parts = part_set.len(); - let parts_per_thread = num_parts / self.max_threads; - let remain = num_parts % self.max_threads; let mut tasks = Vec::with_capacity(self.max_threads); // Fetch blocks in parallel. - for i in 0..remain { - let parts = part_set[i * (parts_per_thread + 1)..(i + 1) * (parts_per_thread + 1)] + for i in 0..self.max_threads { + let begin = num_parts * i / self.max_threads; + let end = num_parts * (i + 1) / self.max_threads; + if begin == end { + continue; + } + let parts = part_set[begin..end] .iter() .map(|(idx, pages)| (self.part_map[idx].0.clone(), pages.clone())) .collect::>(); @@ -124,28 +128,13 @@ impl RowsFetcher for NativeRowsFetcher { self.column_leaves.clone(), )) } - if parts_per_thread > 0 { - let offset = remain * (parts_per_thread + 1); - for i in 0..(self.max_threads - remain) { - let parts = part_set - [offset + i * parts_per_thread..offset + (i + 1) * parts_per_thread] - .iter() - .map(|(idx, pages)| (self.part_map[idx].0.clone(), pages.clone())) - .collect::>(); - tasks.push(Self::fetch_blocks( - self.reader.clone(), - parts, - self.column_leaves.clone(), - )) - } - } let num_task = tasks.len(); let blocks = execute_futures_in_parallel( tasks, num_task, num_task * 2, - "parqeut rows fetch".to_string(), + "native rows fetch".to_string(), ) .await? .into_iter() diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 58065c7dede5..4418f4f97dce 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -88,19 +88,23 @@ impl RowsFetcher for ParquetRowsFetcher { .enumerate() .map(|(i, p)| (*p, i)) .collect::>(); - // part_set.len() = parts_per_thread * max_threads + remain + // parts_per_thread = num_parts / max_threads + // remain = num_parts % max_threads // task distribution: // Part number of each task | Task number // ------------------------------------------------------ // parts_per_thread + 1 | remain // parts_per_thread | max_threads - remain let num_parts = part_set.len(); - let parts_per_thread = num_parts / self.max_threads; - let remain = num_parts % self.max_threads; let mut tasks = Vec::with_capacity(self.max_threads); // Fetch blocks in parallel. - for i in 0..remain { - let parts = part_set[i * (parts_per_thread + 1)..(i + 1) * (parts_per_thread + 1)] + for i in 0..self.max_threads { + let begin = num_parts * i / self.max_threads; + let end = num_parts * (i + 1) / self.max_threads; + if begin == end { + continue; + } + let parts = part_set[begin..end] .iter() .map(|idx| self.part_map[idx].clone()) .collect::>(); @@ -111,22 +115,6 @@ impl RowsFetcher for ParquetRowsFetcher { self.settings, )) } - if parts_per_thread > 0 { - let offset = remain * (parts_per_thread + 1); - for i in 0..(self.max_threads - remain) { - let parts = part_set - [offset + i * parts_per_thread..offset + (i + 1) * parts_per_thread] - .iter() - .map(|idx| self.part_map[idx].clone()) - .collect::>(); - tasks.push(Self::fetch_blocks( - self.reader.clone(), - parts, - self.uncompressed_buffers[i].clone(), - self.settings, - )) - } - } let num_task = tasks.len(); let blocks = execute_futures_in_parallel(