Skip to content

Commit

Permalink
rename to spilling_file_format
Browse files Browse the repository at this point in the history
Signed-off-by: coldWater <[email protected]>
  • Loading branch information
forsaken628 committed Oct 16, 2024
1 parent 5b9c532 commit ffba39b
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/query/service/src/pipelines/builders/builder_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl SortPipelineBuilder {
&self.ctx.get_id(),
),
disk_spill: None,
use_parquet: settings.get_spilling_use_parquet()?,
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
};
pipeline.add_transform(|input, output| {
let op = DataOperator::instance().operator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl HashJoinSpiller {
spiller_type,
location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()),
disk_spill: None,
use_parquet: ctx.get_settings().get_spilling_use_parquet()?,
use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(),
};
let operator = DataOperator::instance().operator();
let spiller = Spiller::create(ctx.clone(), operator, spill_config)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ mod tests {
spiller_type: SpillerType::OrderBy,
location_prefix: "_spill_test".to_string(),
disk_spill: None,
use_parquet: ctx.get_settings().get_spilling_use_parquet()?,
use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(),
};

let spiller = Spiller::create(ctx.clone(), op, spill_config)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl TransformWindowPartitionCollect {
spiller_type: SpillerType::Window,
location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()),
disk_spill,
use_parquet: settings.get_spilling_use_parquet()?,
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
};

// Create an inner `Spiller` to spill data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl WindowPartitionBuffer {
.spill_with_merged_partitions(partitions_to_spill)
.await?;
let index = self.spilled_merged_partitions.len();
for (id, _, _) in &spilled.partitions {
for (id, _) in &spilled.partitions {
self.spilled_small_partitions[*id].push(index);
}
self.spilled_merged_partitions.push((spilled, false, false));
Expand Down Expand Up @@ -176,12 +176,10 @@ impl WindowPartitionBuffer {
partitions,
} = merged_partitions;
if out_of_memory_limit || *partial_restored {
if let Some(pos) = partitions.iter().position(|p| p.0 == partition_id) {
let data_range = &partitions[pos].1;
let columns_layout = &partitions[pos].2;
if let Some(pos) = partitions.iter().position(|(id, _)| *id == partition_id) {
let data_block = self
.spiller
.read_range(location, data_range.clone(), columns_layout)
.read_chunk(location, &partitions[pos].1)
.await?;
self.restored_partition_buffer
.add_data_block(partition_id, data_block);
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/spillers/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl BlocksEncoder {
// Currently we splice multiple complete parquet files into one,
// so that the file contains duplicate headers/footers and metadata,
// which can lead to file bloat. A better approach would be for the entire file to be ONE parquet,
// with each group of blocks (i.e., a unit of the upstream read range) corresponding to one or more row groupsx
// with each group of blocks (i.e. Chunk) corresponding to one or more row groupsx
bare_blocks_to_parquet(blocks, &mut self.buf).unwrap();
Layout::Parquet
} else {
Expand Down
32 changes: 14 additions & 18 deletions src/query/service/src/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl Spiller {
.map(|x| x[0]..x[1])
.zip(columns_layout.into_iter()),
)
.map(|(id, (range, layout))| (id, range, layout))
.map(|(id, (range, layout))| (id, Chunk { range, layout }))
.collect();

// Spill data to storage.
Expand Down Expand Up @@ -322,7 +322,7 @@ impl Spiller {
let file_size = path.size();
debug_assert_eq!(
file_size,
if let Some((_, range, _)) = partitions.last() {
if let Some((_, Chunk { range, .. })) = partitions.last() {
range.end
} else {
0
Expand All @@ -346,24 +346,20 @@ impl Spiller {
// Deserialize partitioned data block.
let partitioned_data = partitions
.iter()
.map(|(partition_id, range, columns_layout)| {
let block = deserialize_block(columns_layout, data.slice(range.clone()));
.map(|(partition_id, Chunk { range, layout })| {
let block = deserialize_block(layout, data.slice(range.clone()));
(*partition_id, block)
})
.collect();

Ok(partitioned_data)
}

pub async fn read_range(
&self,
location: &Location,
data_range: Range<usize>,
columns_layout: &Layout,
) -> Result<DataBlock> {
pub async fn read_chunk(&self, location: &Location, chunk: &Chunk) -> Result<DataBlock> {
// Read spilled data from storage.
let instant = Instant::now();
let data_range = data_range.start as u64..data_range.end as u64;
let Chunk { range, layout } = chunk;
let data_range = range.start as u64..range.end as u64;

let data = match location {
Location::Local(path) => match &self.local_operator {
Expand All @@ -383,7 +379,7 @@ impl Spiller {

record_read_profile(location, &instant, data.len());

Ok(deserialize_block(columns_layout, data))
Ok(deserialize_block(layout, data))
}

async fn write_encodes(&mut self, size: usize, buf: DmaWriteBuf) -> Result<Location> {
Expand Down Expand Up @@ -438,14 +434,14 @@ impl Spiller {
}
}

pub enum SpilledData {
Partition(Location),
MergedPartition(MergedPartition),
}

pub struct MergedPartition {
pub location: Location,
pub partitions: Vec<(usize, Range<usize>, Layout)>,
pub partitions: Vec<(usize, Chunk)>,
}

pub struct Chunk {
pub range: Range<usize>,
pub layout: Layout,
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn test_spill_with_partition() -> Result<()> {
spiller_type: SpillerType::HashJoinBuild,
location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()),
disk_spill: None,
use_parquet: ctx.get_settings().get_spilling_use_parquet()?,
use_parquet: ctx.get_settings().get_spilling_file_format()?.is_parquet(),
};
let operator = DataOperator::instance().operator();

Expand Down
10 changes: 6 additions & 4 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::UserSettingValue;
use once_cell::sync::OnceCell;

use super::settings_getter_setter::SpillFileFormat;

static DEFAULT_SETTINGS: OnceCell<Arc<DefaultSettings>> = OnceCell::new();

// Default value of cost factor settings
Expand Down Expand Up @@ -309,11 +311,11 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("spilling_use_parquet", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Set whether to use Parquet or Arrow IPC for spilling.",
("spilling_file_format", DefaultSettingValue {
value: UserSettingValue::String("parquet".to_string()),
desc: "Set the storage file format for spilling.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
range: Some(SettingRange::String(SpillFileFormat::range())),
}),
("spilling_to_disk_vacuum_unknown_temp_dirs_limit", DefaultSettingValue {
value: UserSettingValue::UInt64(u64::MAX),
Expand Down
41 changes: 39 additions & 2 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use databend_common_ast::parser::Dialect;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand All @@ -31,6 +33,41 @@ pub enum FlightCompression {
Zstd,
}

#[derive(Clone, Copy)]
pub enum SpillFileFormat {
Arrow,
Parquet,
}

impl SpillFileFormat {
pub fn range() -> Vec<String> {
["arrow", "parquet"]
.iter()
.copied()
.map(String::from)
.collect()
}

pub fn is_parquet(&self) -> bool {
matches!(self, SpillFileFormat::Parquet)
}
}

impl FromStr for SpillFileFormat {
type Err = ErrorCode;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"arrow" => Ok(SpillFileFormat::Arrow),
"parquet" => Ok(Self::Parquet),
_ => Err(ErrorCode::InvalidConfig(format!(
"invalid SpillFileFormat: {:?}",
s
))),
}
}
}

impl Settings {
// Get u64 value, we don't get from the metasrv.
fn try_get_u64(&self, key: &str) -> Result<u64> {
Expand Down Expand Up @@ -290,8 +327,8 @@ impl Settings {
Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize)
}

pub fn get_spilling_use_parquet(&self) -> Result<bool> {
Ok(self.try_get_u64("spilling_use_parquet")? != 0)
pub fn get_spilling_file_format(&self) -> Result<SpillFileFormat> {
self.try_get_string("spilling_file_format")?.parse()
}

pub fn get_spilling_to_disk_vacuum_unknown_temp_dirs_limit(&self) -> Result<usize> {
Expand Down

0 comments on commit ffba39b

Please sign in to comment.