Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
advancedxy committed Mar 13, 2024
1 parent ea80c97 commit de678d3
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 13 deletions.
11 changes: 11 additions & 0 deletions core/src/common/bit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ pub fn read_num_bytes_u32(size: usize, src: &[u8]) -> u32 {
trailing_bits(v as u64, size * 8) as u32
}

/// Similar to the `read_num_bytes` but read nums from bytes in big-endian order
/// This is used to read bytes from Java's OutputStream which writes bytes in big-endian
macro_rules! read_num_be_bytes {
($ty:ty, $size:expr, $src:expr) => {{
debug_assert!($size <= $src.len());
let mut buffer = <$ty as $crate::common::bit::FromBytes>::Buffer::default();
buffer.as_mut()[..$size].copy_from_slice(&$src[..$size]);
<$ty>::from_be_bytes(buffer)
}};
}

/// Converts value `val` of type `T` to a byte vector, by reading `num_bytes` from `val`.
/// NOTE: if `val` is less than the size of `T` then it can be truncated.
#[inline]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use std::{
sync::Arc,
};

/// A physical expression that checks if a value might be in a bloom filter. It corresponds to the
/// Spark's `BloomFilterMightContain` expression.
#[derive(Debug)]
pub struct BloomFilterMightContain {
pub bloom_filter_expr: Arc<dyn PhysicalExpr>,
Expand Down
3 changes: 3 additions & 0 deletions core/src/execution/datafusion/util/spark_bit_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

/// A simple bit array implementation that simulates the behavior of Spark's BitArray which is
/// used in the BloomFilter implementation. Some methods are not implemented as they are not
/// required for the current use case.
#[derive(Debug, Hash)]
pub struct SparkBitArray {
data: Vec<u64>,
Expand Down
10 changes: 0 additions & 10 deletions core/src/execution/datafusion/util/spark_bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ pub struct SparkBloomFilter {
num_hashes: u32,
}

/// similar to the `read_num_be_bytes` macro in `bit.rs` but read nums from bytes in big-endian
macro_rules! read_num_be_bytes {
($ty:ty, $size:expr, $src:expr) => {{
debug_assert!($size <= $src.len());
let mut buffer = <$ty as $crate::common::bit::FromBytes>::Buffer::default();
buffer.as_mut()[..$size].copy_from_slice(&$src[..$size]);
<$ty>::from_be_bytes(buffer)
}};
}

impl SparkBloomFilter {
pub fn new_from_buf(buf: &[u8]) -> Self {
let mut offset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CometExpressionPlusSuite extends CometTestBase with AdaptiveSparkPlanHelpe
}
}

test("test NULl inputs for BloomFilterMightContain") {
test("test NULL inputs for BloomFilterMightContain") {
val table = "test"
withTable(table) {
sql(s"create table $table(col1 long, col2 int) using parquet")
Expand All @@ -77,8 +77,8 @@ class CometExpressionPlusSuite extends CometTestBase with AdaptiveSparkPlanHelpe
}

test("test BloomFilterMightContain from random input") {
val bf = BloomFilter.create(1000, 100)
val longs = (0 until 100).map(_ => Random.nextLong())
val bf = BloomFilter.create(100000, 10000)
val longs = (0 until 10000).map(_ => Random.nextLong())
longs.foreach(bf.put)
val os = new ByteArrayOutputStream()
bf.writeTo(os)
Expand Down

0 comments on commit de678d3

Please sign in to comment.