Skip to content

Commit

Permalink
Fixing ATLEASTCAPACITY calculation as well as adding MAXCAPACITY func…
Browse files Browse the repository at this point in the history
…tionality for info

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Jan 29, 2025
1 parent 2be839e commit 1ecb04b
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 56 deletions.
59 changes: 47 additions & 12 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut wanted_capacity = -1;
let mut validate_scale_to = -1;
let mut nocreate = false;
let mut items_provided = false;
while idx < argc {
Expand Down Expand Up @@ -554,12 +554,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"ATLEASTCAPACITY" => {
"VALIDATESCALETO" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
wanted_capacity = match input_args[idx].to_string_lossy().parse::<i64>() {
validate_scale_to = match input_args[idx].to_string_lossy().parse::<i64>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
Expand All @@ -584,25 +584,25 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
// When the `ITEMS` argument is provided, we expect additional item arg/s to be provided.
return Err(ValkeyError::WrongArity);
}
// Check if we have a wanted capacity and calculate if we can reach that capacity
if wanted_capacity > 0 {
// Check if we have a wanted capacity and calculate if we can reach that capacity. Using VALIDATESCALETO and NONSCALING options together is invalid.
if validate_scale_to > 0 {
if expansion == 0 {
return Err(ValkeyError::Str(
utils::NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID,
utils::NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID,
));
}
match utils::BloomObject::calculate_if_wanted_capacity_is_valid(
match utils::BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
wanted_capacity,
validate_scale_to,
tightening_ratio,
expansion,
) {
Ok(result) => result,
Err(e) => {
return Err(e);
Ok(_) => (),
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
}
}
};
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
Expand Down Expand Up @@ -714,12 +714,29 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
"SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)),
"FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)),
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
"ERROR" => Ok(ValkeyValue::Float(val.fp_rate())),
"EXPANSION" => {
if val.expansion() == 0 {
return Ok(ValkeyValue::Null);
}
Ok(ValkeyValue::Integer(val.expansion() as i64))
}
// Only calculate and expose MAXSCALEDCAPACITY for scaling bloom objects.
"MAXSCALEDCAPACITY" if val.expansion() > 0 => {
let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity(
val.starting_capacity(),
val.fp_rate(),
-1,
val.tightening_ratio(),
val.expansion(),
) {
Ok(result) => result,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
}
};
Ok(ValkeyValue::Integer(max_capacity))
}
_ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)),
}
}
Expand All @@ -733,13 +750,31 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
ValkeyValue::Integer(val.num_filters() as i64),
ValkeyValue::SimpleStringStatic("Number of items inserted"),
ValkeyValue::Integer(val.cardinality()),
ValkeyValue::SimpleStringStatic("Error rate"),
ValkeyValue::Float(val.fp_rate()),
ValkeyValue::SimpleStringStatic("Expansion rate"),
];
if val.expansion() == 0 {
result.push(ValkeyValue::Null);
} else {
result.push(ValkeyValue::Integer(val.expansion() as i64));
}
if val.expansion() != 0 {
let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity(
val.starting_capacity(),
val.fp_rate(),
-1,
val.tightening_ratio(),
val.expansion(),
) {
Ok(result) => result,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
}
};
result.push(ValkeyValue::SimpleStringStatic("Max scaled capacity"));
result.push(ValkeyValue::Integer(max_capacity));
}
Ok(ValkeyValue::Array(result))
}
_ => Err(ValkeyError::Str(utils::NOT_FOUND)),
Expand Down
19 changes: 9 additions & 10 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ impl ValkeyDataType for BloomObject {
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
let new_fp_rate =
match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, i as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate);
let curr_object_size = BloomObject::compute_size(filters.capacity())
+ filters_memory_usage
Expand Down
Loading

0 comments on commit 1ecb04b

Please sign in to comment.