Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large bloom object handling + Rename to BloomFilterType to BloomObject #37

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
# cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
Expand Down Expand Up @@ -56,7 +56,7 @@ jobs:
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
# cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
Expand All @@ -75,7 +75,7 @@ jobs:
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
# cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
Expand Down
42 changes: 21 additions & 21 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::bloom::data_type::BLOOM_FILTER_TYPE;
use crate::bloom::data_type::BLOOM_TYPE;
use crate::bloom::utils;
use crate::bloom::utils::BloomFilterType;
use crate::bloom::utils::BloomObject;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
Expand All @@ -18,7 +18,7 @@ fn handle_bloom_add(
args: &[ValkeyString],
argc: usize,
item_idx: usize,
bf: &mut BloomFilterType,
bf: &mut BloomObject,
multi: bool,
add_succeeded: &mut bool,
validate_size_limit: bool,
Expand Down Expand Up @@ -170,7 +170,7 @@ pub fn bloom_filter_add_value(
curr_cmd_idx += 1;
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -216,7 +216,7 @@ pub fn bloom_filter_add_value(
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut bloom = match BloomFilterType::new_reserved(
let mut bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand Down Expand Up @@ -244,7 +244,7 @@ pub fn bloom_filter_add_value(
&mut add_succeeded,
validate_size_limit,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(
ctx,
Expand All @@ -262,7 +262,7 @@ pub fn bloom_filter_add_value(
}

/// Helper function used to check whether an item (or multiple items) exists on a bloom object.
fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue {
fn handle_item_exists(value: Option<&BloomObject>, item: &[u8]) -> ValkeyValue {
if let Some(val) = value {
if val.item_exists(item) {
return ValkeyValue::Integer(1);
Expand Down Expand Up @@ -290,7 +290,7 @@ pub fn bloom_filter_exists(
curr_cmd_idx += 1;
// Parse the value to be checked whether it exists in the filter
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -319,7 +319,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// Parse the filter name
let filter_name = &input_args[curr_cmd_idx];
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -389,7 +389,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand All @@ -408,7 +408,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
let bloom = match BloomFilterType::new_reserved(
let bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand All @@ -427,7 +427,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
seed: bloom.seed(),
items: &[],
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, false, true, replicate_args);
VALKEY_OK
Expand Down Expand Up @@ -498,10 +498,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
if !(num > BLOOM_TIGHTENING_RATIO_MIN
&& num < BLOOM_TIGHTENING_RATIO_MAX) =>
{
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
return Err(ValkeyError::Str(utils::TIGHTENING_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
return Err(ValkeyError::Str(utils::BAD_TIGHTENING_RATIO));
}
};
}
Expand Down Expand Up @@ -571,7 +571,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -606,7 +606,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
if nocreate {
return Err(ValkeyError::Str(utils::NOT_FOUND));
}
let mut bloom = match BloomFilterType::new_reserved(
let mut bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand Down Expand Up @@ -634,7 +634,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
&mut add_succeeded,
!replicated_cmd,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(
ctx,
Expand Down Expand Up @@ -662,7 +662,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
let filter_name = &input_args[curr_cmd_idx];
curr_cmd_idx += 1;
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -724,7 +724,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// find filter
let filter_key = ctx.open_key_writable(filter_name);

let filter = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let filter = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
// error
Expand All @@ -740,7 +740,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// if filter not exists, create it.
let hex = value.to_vec();
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bloom = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) {
let bloom = match BloomObject::decode_object(&hex, validate_size_limit) {
Ok(v) => v,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
Expand All @@ -754,7 +754,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
seed: bloom.seed(),
items: &input_args[idx..],
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(_) => {
replicate_and_notify_events(ctx, filter_name, false, true, replicate_args);
VALKEY_OK
Expand Down
41 changes: 25 additions & 16 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::bloom::utils::BloomObject;
use crate::configs;
use crate::wrapper::bloom_callback;
use crate::wrapper::digest::Digest;
Expand All @@ -8,15 +8,16 @@ use std::os::raw::c_int;
use valkey_module::native_types::ValkeyType;
use valkey_module::{logging, raw};

/// Used for decoding and encoding `BloomFilterType`. Currently used in AOF Rewrite.
/// This value must increased when `BloomFilterType` struct change.
pub const BLOOM_TYPE_VERSION: u8 = 1;
/// Used for decoding and encoding `BloomObject`. Currently used in AOF Rewrite.
/// This value must increased when `BloomObject` struct change.
pub const BLOOM_OBJECT_VERSION: u8 = 1;

const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1;
/// Bloom Module data type RDB encoding version.
const BLOOM_TYPE_ENCODING_VERSION: i32 = 1;

pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
pub static BLOOM_TYPE: ValkeyType = ValkeyType::new(
"bloomfltr",
BLOOM_FILTER_TYPE_ENCODING_VERSION,
BLOOM_TYPE_ENCODING_VERSION,
raw::RedisModuleTypeMethods {
version: raw::REDISMODULE_TYPE_METHOD_VERSION as u64,
rdb_load: Some(bloom_callback::bloom_rdb_load),
Expand Down Expand Up @@ -48,15 +49,15 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
);

pub trait ValkeyDataType {
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType>;
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomObject>;
fn debug_digest(&self, dig: Digest);
}

impl ValkeyDataType for BloomFilterType {
impl ValkeyDataType for BloomObject {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomObject> {
if encver > BLOOM_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_TYPE_ENCODING_VERSION).as_str());
return None;
}
let Ok(num_filters) = raw::load_unsigned(rdb) else {
Expand All @@ -79,7 +80,8 @@ impl ValkeyDataType for BloomFilterType {
// We start off with capacity as 1 to match the same expansion of the vector that would have occurred during bloom
// object creation and scaling as a result of BF.* operations.
let mut filters = Vec::with_capacity(1);

// Calculate the memory usage of the BloomFilter/s by summing up BloomFilter sizes as they are de-serialized.
let mut filters_memory_usage = 0;
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand All @@ -97,10 +99,17 @@ impl ValkeyDataType for BloomFilterType {
return None;
}
};
if !BloomFilter::validate_size(capacity as i64, new_fp_rate) {
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate);
let curr_object_size = BloomObject::compute_size(filters.capacity())
+ filters_memory_usage
+ curr_filter_size;
if !BloomObject::validate_size(curr_object_size) {
logging::log_warning(
"Failed to restore bloom object: Object larger than the allowed memory limit.",
);
return None;
}
filters_memory_usage += curr_filter_size;
// Only load num_items when it's the last filter
let num_items = if i == num_filters - 1 {
match raw::load_unsigned(rdb) {
Expand All @@ -118,7 +127,7 @@ impl ValkeyDataType for BloomFilterType {
}
filters.push(Box::new(filter));
}
let item = BloomFilterType::from_existing(
let item = BloomObject::from_existing(
expansion as u32,
fp_rate,
tightening_ratio,
Expand Down
Loading
Loading