Skip to content

Commit

Permalink
chore(query): reduce hash join memory usage (#16359)
Browse files Browse the repository at this point in the history
* feat(query): impl next_matched_ptr for hash join hash table

* chore(query): reduce hash join memory usage

* chore(query): refine code

* chore(query): fix left join with conjunct

* chore(query): improve the performance of hash join probe

* chore(code): refine code

* chore(code): fix ci check

* chore(code): update Cargo.lock

---------

Co-authored-by: Bohu <[email protected]>
  • Loading branch information
Dousir9 and BohuTANG authored Sep 2, 2024
1 parent 9164bf0 commit 5fff933
Show file tree
Hide file tree
Showing 22 changed files with 665 additions and 582 deletions.
14 changes: 14 additions & 0 deletions src/common/hashtable/src/hashjoin_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,18 @@ where
(0, 0)
}
}

fn next_matched_ptr(&self, key: &Self::Key, mut ptr: u64) -> u64 {
loop {
if ptr == 0 {
break;
}
let raw_entry = unsafe { &*(ptr as *mut RawEntry<K>) };
if key == &raw_entry.key {
return ptr;
}
ptr = raw_entry.next;
}
0
}
}
29 changes: 29 additions & 0 deletions src/common/hashtable/src/hashjoin_string_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,33 @@ where A: Allocator + Clone + 'static
(0, 0)
}
}

fn next_matched_ptr(&self, key: &Self::Key, mut ptr: u64) -> u64 {
loop {
if ptr == 0 {
break;
}
let raw_entry = unsafe { &*(ptr as *mut StringRawEntry) };
// Compare `early` and the length of the string, the size of `early` is 4.
let min_len = std::cmp::min(
STRING_EARLY_SIZE,
std::cmp::min(key.len(), raw_entry.length as usize),
);
if raw_entry.length as usize == key.len()
&& key[0..min_len] == raw_entry.early[0..min_len]
{
let key_ref = unsafe {
std::slice::from_raw_parts(
raw_entry.key as *const u8,
raw_entry.length as usize,
)
};
if key == key_ref {
return ptr;
}
}
ptr = raw_entry.next;
}
0
}
}
3 changes: 3 additions & 0 deletions src/common/hashtable/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,4 +566,7 @@ pub trait HashJoinHashtableLike {
occupied: usize,
capacity: usize,
) -> (usize, u64);

// Find the next matched ptr.
fn next_matched_ptr(&self, key: &Self::Key, ptr: u64) -> u64;
}
7 changes: 4 additions & 3 deletions src/query/expression/src/kernels/group_by_hash/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::HashMethodSerializer;
use crate::HashMethodSingleBinary;
use crate::InputColumns;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum KeysState {
Column(Column),
U128(Buffer<u128>),
Expand Down Expand Up @@ -76,11 +76,12 @@ pub trait HashMethod: Clone + Sync + Send + 'static {

fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result<Self::HashKeyIter<'a>>;

fn build_keys_accessor_and_hashes(
fn build_keys_accessor(
&self,
keys_state: KeysState,
hashes: &mut Vec<u64>,
) -> Result<Box<dyn KeyAccessor<Key = Self::HashKey>>>;

fn build_keys_hashes(&self, keys_state: &KeysState, hashes: &mut Vec<u64>);
}

/// These methods are `generic` method to generate hash key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,26 @@ impl HashMethod for HashMethodDictionarySerializer {
}
}

fn build_keys_accessor_and_hashes(
fn build_keys_accessor(
&self,
keys_state: KeysState,
hashes: &mut Vec<u64>,
) -> Result<Box<dyn KeyAccessor<Key = Self::HashKey>>> {
match keys_state {
KeysState::Dictionary { dictionaries, .. } => {
hashes.extend(dictionaries.iter().map(|key| key.fast_hash()));
Ok(Box::new(DicKeyAccessor::new(dictionaries)))
}
_ => unreachable!(),
}
}

fn build_keys_hashes(&self, keys_state: &KeysState, hashes: &mut Vec<u64>) {
match keys_state {
KeysState::Dictionary { dictionaries, .. } => {
hashes.extend(dictionaries.iter().map(|key| key.fast_hash()));
}
_ => unreachable!(),
}
}
}

pub struct DicKeyAccessor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,20 +272,28 @@ macro_rules! impl_hash_method_fixed_keys {
}
}

fn build_keys_accessor_and_hashes(
fn build_keys_accessor(
&self,
keys_state: KeysState,
hashes: &mut Vec<u64>,
) -> Result<Box<dyn KeyAccessor<Key = Self::HashKey>>> {
use crate::types::ArgType;
match keys_state {
KeysState::Column(Column::Number(NumberColumn::$dt(col))) => {
hashes.extend(col.iter().map(|key| key.fast_hash()));
Ok(Box::new(PrimitiveKeyAccessor::<$ty>::new(col)))
}
other => unreachable!("{:?} -> {}", other, NumberType::<$ty>::data_type()),
}
}

fn build_keys_hashes(&self, keys_state: &KeysState, hashes: &mut Vec<u64>) {
use crate::types::ArgType;
match keys_state {
KeysState::Column(Column::Number(NumberColumn::$dt(col))) => {
hashes.extend(col.iter().map(|key| key.fast_hash()));
}
other => unreachable!("{:?} -> {}", other, NumberType::<$ty>::data_type()),
}
}
}
};
}
Expand Down Expand Up @@ -342,15 +350,20 @@ macro_rules! impl_hash_method_fixed_large_keys {
}
}

fn build_keys_accessor_and_hashes(
fn build_keys_accessor(
&self,
keys_state: KeysState,
hashes: &mut Vec<u64>,
) -> Result<Box<dyn KeyAccessor<Key = Self::HashKey>>> {
match keys_state {
KeysState::$name(v) => Ok(Box::new(PrimitiveKeyAccessor::<$ty>::new(v))),
_ => unreachable!(),
}
}

fn build_keys_hashes(&self, keys_state: &KeysState, hashes: &mut Vec<u64>) {
match keys_state {
KeysState::$name(v) => {
hashes.extend(v.iter().map(|key| key.fast_hash()));
Ok(Box::new(PrimitiveKeyAccessor::<$ty>::new(v)))
}
_ => unreachable!(),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,25 @@ impl HashMethod for HashMethodSerializer {
}
}

fn build_keys_accessor_and_hashes(
fn build_keys_accessor(
&self,
keys_state: KeysState,
hashes: &mut Vec<u64>,
) -> Result<Box<dyn KeyAccessor<Key = Self::HashKey>>> {
match keys_state {
KeysState::Column(Column::Binary(col)) => {
hashes.extend(col.iter().map(hash_join_fast_string_hash));
let (data, offsets) = col.into_buffer();
Ok(Box::new(BinaryKeyAccessor::new(data, offsets)))
}
_ => unreachable!(),
}
}

fn build_keys_hashes(&self, keys_state: &KeysState, hashes: &mut Vec<u64>) {
match keys_state {
KeysState::Column(Column::Binary(col)) => {
hashes.extend(col.iter().map(hash_join_fast_string_hash));
}
_ => unreachable!(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,38 @@ impl HashMethod for HashMethodSingleBinary {
}
}

fn build_keys_accessor_and_hashes(
fn build_keys_accessor(
&self,
keys_state: KeysState,
hashes: &mut Vec<u64>,
) -> Result<Box<dyn KeyAccessor<Key = Self::HashKey>>> {
match keys_state {
KeysState::Column(Column::Binary(col))
| KeysState::Column(Column::Variant(col))
| KeysState::Column(Column::Bitmap(col)) => {
hashes.extend(col.iter().map(hash_join_fast_string_hash));
let (data, offsets) = col.into_buffer();
Ok(Box::new(BinaryKeyAccessor::new(data, offsets)))
}
KeysState::Column(Column::String(col)) => {
hashes.extend(col.iter_binary().map(hash_join_fast_string_hash));
let (data, offsets) = col.into_buffer();
Ok(Box::new(BinaryKeyAccessor::new(data, offsets)))
}
_ => unreachable!(),
}
}

fn build_keys_hashes(&self, keys_state: &KeysState, hashes: &mut Vec<u64>) {
match keys_state {
KeysState::Column(Column::Binary(col))
| KeysState::Column(Column::Variant(col))
| KeysState::Column(Column::Bitmap(col)) => {
hashes.extend(col.iter().map(hash_join_fast_string_hash));
}
KeysState::Column(Column::String(col)) => {
hashes.extend(col.iter_binary().map(hash_join_fast_string_hash));
}
_ => unreachable!(),
}
}
}

pub struct BinaryKeyAccessor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,15 @@ impl<Method: HashMethodBounds> HashMethod for PartitionedHashMethod<Method> {
self.method.build_keys_iter(keys_state)
}

fn build_keys_accessor_and_hashes(
fn build_keys_accessor(
&self,
keys_state: KeysState,
hashes: &mut Vec<u64>,
) -> Result<Box<dyn KeyAccessor<Key = Self::HashKey>>> {
self.method
.build_keys_accessor_and_hashes(keys_state, hashes)
self.method.build_keys_accessor(keys_state)
}

fn build_keys_hashes(&self, keys_state: &KeysState, hashes: &mut Vec<u64>) {
self.method.build_keys_hashes(keys_state, hashes)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use parking_lot::Mutex;
use parking_lot::RwLock;

use super::ProbeState;
use super::ProcessState;
use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity;
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE;
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_NULL;
Expand Down Expand Up @@ -141,22 +142,29 @@ impl HashJoinProbeState {
/// Probe the hash table and retrieve matched rows as DataBlocks.
pub fn probe(&self, input: DataBlock, probe_state: &mut ProbeState) -> Result<Vec<DataBlock>> {
match self.hash_join_state.hash_join_desc.join_type {
JoinType::Inner
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::Left
| JoinType::LeftMark
| JoinType::RightMark
| JoinType::LeftSingle
| JoinType::RightSingle
| JoinType::Right
| JoinType::Full => self.probe_join(input, probe_state),
JoinType::Cross => self.cross_join(input, probe_state),
_ => self.probe_join(input, probe_state),
}
}

pub fn next_probe(&self, probe_state: &mut ProbeState) -> Result<Vec<DataBlock>> {
let process_state = probe_state.process_state.as_ref().unwrap();
let hash_table = unsafe { &*self.hash_join_state.hash_table.get() };
with_join_hash_method!(|T| match hash_table {
HashJoinHashTable::T(table) => {
// Build `keys` and get the hashes of `keys`.
let keys = table
.hash_method
.build_keys_accessor(process_state.keys_state.clone())?;
// Continue to probe hash table and process data blocks.
self.result_blocks(probe_state, keys, &table.hash_table)
}
HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery(
"Aborted query, because the hash table is uninitialized.",
)),
})
}

pub fn probe_join(
&self,
mut input: DataBlock,
Expand Down Expand Up @@ -292,9 +300,16 @@ impl HashJoinProbeState {
let keys_state = table
.hash_method
.build_keys_state(probe_keys, input_num_rows)?;
let keys = table
table
.hash_method
.build_keys_accessor_and_hashes(keys_state, &mut probe_state.hashes)?;
.build_keys_hashes(&keys_state, &mut probe_state.hashes);
let keys = table.hash_method.build_keys_accessor(keys_state.clone())?;

probe_state.process_state = Some(ProcessState {
input,
keys_state,
next_idx: 0,
});

// Perform a round of hash table probe.
probe_state.probe_with_selection = prefer_early_filtering;
Expand Down Expand Up @@ -335,7 +350,7 @@ impl HashJoinProbeState {
probe_state.num_keys_hash_matched += probe_state.selection_count as u64;

// Continue to probe hash table and process data blocks.
self.result_blocks(&input, keys, &table.hash_table, probe_state)
self.result_blocks(probe_state, keys, &table.hash_table)
}
HashJoinHashTable::Null => Err(ErrorCode::AbortedQuery(
"Aborted query, because the hash table is uninitialized.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ pub use hash_join_probe_state::HashJoinProbeState;
pub use hash_join_spiller::HashJoinSpiller;
pub use hash_join_state::*;
pub use probe_state::ProbeState;
pub use probe_state::ProcessState;
pub use transform_hash_join_build::TransformHashJoinBuild;
pub use transform_hash_join_probe::TransformHashJoinProbe;
Loading

0 comments on commit 5fff933

Please sign in to comment.