Skip to content

Commit

Permalink
Fixing ATLEASTCAPACITY calculation as well as adding MAXCAPACITY func…
Browse files Browse the repository at this point in the history
…tionality for info

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Jan 28, 2025
1 parent 2be839e commit 2b79825
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 41 deletions.
56 changes: 48 additions & 8 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut wanted_capacity = -1;
let mut can_scale_to = -1;
let mut nocreate = false;
let mut items_provided = false;
while idx < argc {
Expand Down Expand Up @@ -554,12 +554,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"ATLEASTCAPACITY" => {
"VALIDATESCALETO" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
wanted_capacity = match input_args[idx].to_string_lossy().parse::<i64>() {
can_scale_to = match input_args[idx].to_string_lossy().parse::<i64>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
Expand All @@ -585,24 +585,24 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
return Err(ValkeyError::WrongArity);
}
// Check if we have a wanted capacity and calculate if we can reach that capacity
if wanted_capacity > 0 {
if can_scale_to > 0 {
if expansion == 0 {
return Err(ValkeyError::Str(
utils::NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID,
utils::NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID,
));
}
match utils::BloomObject::calculate_if_wanted_capacity_is_valid(
match utils::BloomObject::calculate_if_can_scale_to_is_valid(
capacity,
fp_rate,
wanted_capacity,
can_scale_to,
tightening_ratio,
expansion,
) {
Ok(result) => result,
Err(e) => {
return Err(e);
}
}
};
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
Expand Down Expand Up @@ -714,12 +714,31 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
"SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)),
"FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)),
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
"ERROR" => Ok(ValkeyValue::Float(val.fp_rate())),
"EXPANSION" => {
if val.expansion() == 0 {
return Ok(ValkeyValue::Null);
}
Ok(ValkeyValue::Integer(val.expansion() as i64))
}
"MAXSCALEDCAPACITY" if val.expansion() > 0 => {
let max_capacity = match utils::BloomObject::calculate_if_can_scale_to_is_valid(
val.filters()
.first()
.expect("Filter will be populated")
.capacity(),
val.fp_rate(),
-1,
val.tightening_ratio(),
val.expansion(),
) {
Ok(result) => result,
Err(e) => {
return Err(e);
}
};
Ok(ValkeyValue::Integer(max_capacity))
}
_ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)),
}
}
Expand All @@ -733,13 +752,34 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
ValkeyValue::Integer(val.num_filters() as i64),
ValkeyValue::SimpleStringStatic("Number of items inserted"),
ValkeyValue::Integer(val.cardinality()),
ValkeyValue::SimpleStringStatic("Error rate"),
ValkeyValue::Float(val.fp_rate()),
ValkeyValue::SimpleStringStatic("Expansion rate"),
];
if val.expansion() == 0 {
result.push(ValkeyValue::Null);
} else {
result.push(ValkeyValue::Integer(val.expansion() as i64));
}
if val.expansion() != 0 {
let max_capacity = match utils::BloomObject::calculate_if_can_scale_to_is_valid(
val.filters()
.first()
.expect("Filter will be populated")
.capacity(),
val.fp_rate(),
-1,
val.tightening_ratio(),
val.expansion(),
) {
Ok(result) => result,
Err(e) => {
return Err(e);
}
};
result.push(ValkeyValue::SimpleStringStatic("Max scaled capacity"));
result.push(ValkeyValue::Integer(max_capacity));
}
Ok(ValkeyValue::Array(result))
}
_ => Err(ValkeyError::Str(utils::NOT_FOUND)),
Expand Down
93 changes: 69 additions & 24 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@ pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
pub const BAD_TIGHTENING_RATIO: &str = "ERR bad tightening ratio";
pub const TIGHTENING_RATIO_RANGE: &str = "ERR (0 < tightening ratio range < 1)";
pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const FALSE_POSITIVE_DEGRADES_TO_O: &str = "ERR false positive degrades to 0 on scale out";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit";
pub const WANTED_CAPACITY_EXCEEDS_MAX_SIZE: &str =
"ERR Wanted capacity would go beyond bloom object memory limit";
pub const WANTED_CAPACITY_FALSE_POSITIVE_INVALID: &str =
"ERR False positive degrades too much to reach wanted capacity";
pub const VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: &str =
"ERR provided VALIDATESCALETO causes bloom object to exceed memory limit";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: &str =
"ERR provided VALIDATESCALETO causes false positive to degrades to 0";
pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists.";
pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed";
pub const DECODE_UNSUPPORTED_VERSION: &str =
"ERR bloom object decoding failed. Unsupported version";
pub const NON_SCALING_AND_WANTED_CAPACITY_IS_INVALID: &str =
"ERR Specifying NONSCALING and ATLEASTCAPCITY is not allowed";
pub const NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID: &str =
"ERR cannot use NONSCALING and VALIDATESCALETO options together";
/// Logging Error messages
pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object.";

Expand All @@ -56,6 +57,8 @@ pub enum BloomError {
DecodeUnsupportedVersion,
ErrorRateRange,
BadExpansion,
FalsePositiveReachesZero,
BadCapacity,
}

impl BloomError {
Expand All @@ -69,6 +72,8 @@ impl BloomError {
BloomError::DecodeUnsupportedVersion => DECODE_UNSUPPORTED_VERSION,
BloomError::ErrorRateRange => ERROR_RATE_RANGE,
BloomError::BadExpansion => BAD_EXPANSION,
BloomError::FalsePositiveReachesZero => FALSE_POSITIVE_DEGRADES_TO_O,
BloomError::BadCapacity => BAD_CAPACITY,
}
}
}
Expand Down Expand Up @@ -319,7 +324,7 @@ impl BloomObject {
Some(new_capacity) => new_capacity,
None => {
// u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9).
return Err(BloomError::MaxNumScalingFilters);
return Err(BloomError::BadCapacity);
}
};
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
Expand Down Expand Up @@ -373,7 +378,7 @@ impl BloomObject {
) -> Result<f64, BloomError> {
match fp_rate * tightening_ratio.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
_ => Err(BloomError::MaxNumScalingFilters),
_ => Err(BloomError::FalsePositiveReachesZero),
}
}

Expand Down Expand Up @@ -463,42 +468,78 @@ impl BloomObject {
}
}

pub fn calculate_if_wanted_capacity_is_valid(
/// This method is called from two different bloom commands BF.INFO and BF.INSERT. The functionality varies slightly on which command it
/// is called from. When called from BF.INFO this method is used to find the maximum possible size that the bloom object could scale to
/// without throwing an error. When called from BF.INSERT this method is used to determine if it is possible to reach the capacity that
/// has been provided.
///
/// # Arguments
///
/// * `capacity` - The size of the initial filter in the bloom object.
/// * `fp_rate` - the false positive rate for the bloom object
/// * `can_scale_to` - the capcity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we
/// want to check the maximum size we could scale up till
/// * `tightening_ratio` - The tightening ratio of the object
/// * `expansion` - The expanison rate of the object
///
/// # Returns
/// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached after going past max capacity
/// * ValkeyError - Can return two different errors:
/// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit
/// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capcity would cause the false positive rate to reach 0
pub fn calculate_if_can_scale_to_is_valid(
capacity: i64,
fp_rate: f64,
wanted_capacity: i64,
can_scale_to: i64,
tightening_ratio: f64,
expansion: u32,
) -> Result<(), ValkeyError> {
let mut curr_capacity = capacity;
let mut curr_num_filters: u64 = 1;
let mut curr_fp_rate = fp_rate;
) -> Result<i64, ValkeyError> {
let mut curr_filter_capacity = capacity;
let mut curr_total_capacity = capacity;
let mut curr_num_filters: u64 = 0;
let mut filters_memory_usage = 0;
while curr_capacity < wanted_capacity {
curr_fp_rate = match BloomObject::calculate_fp_rate(
curr_fp_rate,
while curr_total_capacity < can_scale_to || can_scale_to == -1 {
curr_filter_capacity = match curr_filter_capacity.checked_mul(expansion.into()) {
Some(new_capacity) => new_capacity,
None => {
// u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9).
return Err(ValkeyError::Str(BAD_CAPACITY));
}
};
curr_total_capacity += curr_filter_capacity;
curr_num_filters += 1;

// Check to see if scaling to the next filter will cause a degradation in FP to 0
let curr_fp_rate = match BloomObject::calculate_fp_rate(
fp_rate,
curr_num_filters as i32,
tightening_ratio,
) {
Ok(rate) => rate,
Err(_) => {
return Err(ValkeyError::Str(WANTED_CAPACITY_FALSE_POSITIVE_INVALID));
if can_scale_to == -1 {
return Ok(curr_total_capacity - curr_filter_capacity);
}
return Err(ValkeyError::Str(VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID));
}
};
let curr_filter_size = BloomFilter::compute_size(curr_capacity, curr_fp_rate);
// Check that if it scales to this number of filters that the object won't exceed the memory limit
let curr_filter_size = BloomFilter::compute_size(curr_filter_capacity, curr_fp_rate);
// For vectors of size < 4 the capacity of the vector is 4. However after that the capacity is always a power of two above or equal to the size
let curr_object_size = BloomObject::compute_size(
std::cmp::max(4, curr_num_filters).next_power_of_two() as usize,
) + filters_memory_usage
+ curr_filter_size;
if !BloomObject::validate_size(curr_object_size) {
return Err(ValkeyError::Str(WANTED_CAPACITY_EXCEEDS_MAX_SIZE));
if can_scale_to == -1 {
return Ok(curr_total_capacity - curr_filter_capacity);
}
return Err(ValkeyError::Str(VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE));
}
// Update overall memory usage
filters_memory_usage += curr_filter_size;
curr_capacity *= expansion as i64;
curr_num_filters += 1;
}
Ok(())
Ok(curr_total_capacity / expansion as i64)
}
}

Expand Down Expand Up @@ -1006,6 +1047,10 @@ mod tests {
let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
let test_seed2 = test_bloom_filter2.seed();
assert_ne!(test_seed2, configs::FIXED_SEED);
// Check that the random seed changes for each BloomFilter
let test_bloom_filter3 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
let test_seed3 = test_bloom_filter3.seed();
assert_ne!(test_seed2, test_seed3);
}

#[test]
Expand Down
7 changes: 4 additions & 3 deletions tests/test_bloom_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def test_bloom_command_error(self):
('BF.INSERT TEST_LIMIT EXPANSION 4294967299 ITEMS EXPAN', 'bad expansion'),
('BF.INSERT TEST_NOCREATE NOCREATE ITEMS A B', 'not found'),
('BF.INSERT KEY HELLO', 'unknown argument received'),
('BF.INSERT KEY CAPACITY 1 ERROR 0.0000000001 ATLEASTCAPACITY 10000000 EXPANSION 1', 'False positive degrades too much to reach wanted capacity'),
('BF.INSERT KEY ATLEASTCAPACITY 1000000000000', 'Wanted capacity would go beyond bloom object memory limit'),
('BF.INSERT KEY ATLEASTCAPACITY 1000000000000 NONSCALING', 'Specifying NONSCALING and ATLEASTCAPCITY is not allowed'),
('BF.INSERT KEY CAPACITY 1 ERROR 0.0000000001 VALIDATESCALETO 10000000 EXPANSION 1', 'provided VALIDATESCALETO causes false positive to degrades to 0'),
('BF.INSERT KEY VALIDATESCALETO 1000000000000', 'provided VALIDATESCALETO causes bloom object to exceed memory limit'),
('BF.INSERT KEY VALIDATESCALETO 1000000000000 NONSCALING', 'cannot use NONSCALING and VALIDATESCALETO options together'),
('BF.RESERVE KEY String 100', 'bad error rate'),
('BF.RESERVE KEY 0.99999999999999999 3000', '(0 < error rate range < 1)'),
('BF.RESERVE KEY 2 100', '(0 < error rate range < 1)'),
Expand Down Expand Up @@ -120,6 +120,7 @@ def test_bloom_command_behavior(self):
('bf.info TEST expansion', 2),
('BF.INFO TEST_EXPANSION EXPANSION', 9),
('BF.INFO TEST_CAPACITY CAPACITY', 2000),
('BF.INFO TEST MAXSCALEDCAPACITY', 26214300),
('BF.CARD key', 3),
('BF.CARD hello', 5),
('BF.CARD TEST', 5),
Expand Down
31 changes: 31 additions & 0 deletions tests/test_bloom_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_non_scaling_filter(self):
assert info_dict[b'Number of filters'] == 1
assert info_dict[b'Size'] > 0
assert info_dict[b'Expansion rate'] == None
assert "Maximum Capacity" not in info_dict
# Use a margin on the expected_fp_rate when asserting for correctness.
fp_margin = 0.002
# Validate that item "add" operations on bloom filters are ensuring correctness.
Expand Down Expand Up @@ -74,6 +75,7 @@ def test_scaling_filter(self):
assert info_dict[b'Number of filters'] == 1
assert info_dict[b'Size'] > 0
assert info_dict[b'Expansion rate'] == expansion
assert info_dict[b'Max scaled capacity'] == 20470000

# Scale out by adding items.
total_error_count = 0
Expand All @@ -92,6 +94,7 @@ def test_scaling_filter(self):
assert info_dict[b'Number of filters'] == filter_idx
assert info_dict[b'Size'] > 0
assert info_dict[b'Expansion rate'] == expansion
assert info_dict[b'Max scaled capacity'] == 20470000

# Use a margin on the expected_fp_rate when asserting for correctness.
fp_margin = 0.002
Expand Down Expand Up @@ -127,3 +130,31 @@ def test_scaling_filter(self):
info_dict = dict(zip(it, it))
# Validate correctness on a copy of a scaling bloom filter.
self.validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict)

def test_max_and_can_scale_to_correctness(self):
can_scale_to_commands = [
('BF.INSERT key ERROR 0.00000001 VALIDATESCALETO 13107101', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ),
('BF.INSERT key EXPANSION 1 VALIDATESCALETO 101601', "provided VALIDATESCALETO causes false positive to degrades to 0" )
]
for cmd in can_scale_to_commands:
try:
self.client.execute_command(cmd[0])
assert False, "Expect BF.INSERT to fail if the wanted capacity would cause an error"
except Exception as e:
print(cmd[0])
assert cmd[1] == str(e), f"Unexpected error message: {e}"
self.client.execute_command('BF.INSERT MemLimitKey ERROR 0.00000001 VALIDATESCALETO 13107100')
self.client.execute_command('BF.INSERT FPKey VALIDATESCALETO 101600 EXPANSION 1')
FPKey_max_capacity = self.client.execute_command(f'BF.INFO FPKey MAXSCALEDCAPACITY')
MemLimitKeyMaxCapacity = self.client.execute_command(f'BF.INFO MemLimitKey MAXSCALEDCAPACITY')
self.add_items_till_capacity(self.client, "FPKey", 101600, 1, "item")
self.add_items_till_capacity(self.client, "MemLimitKey", 13107100, 1, "item")
key_names = [("MemLimitKey", MemLimitKeyMaxCapacity, "operation exceeds bloom object memory limit"), ("FPKey", FPKey_max_capacity, "false positive degrades to 0 on scale out")]
for key in key_names:
try:
self.add_items_till_capacity(self.client, key[0], key[1]+1, 1, "new_item")
assert False, "Expect adding to an item after reaching max capacity should fail"
except Exception as e:
assert key[2] in str(e)
# Check that max capacity doesnt change even after adding items.
assert self.client.execute_command(f'BF.INFO {key[0]} MAXSCALEDCAPACITY') == key[1]
6 changes: 0 additions & 6 deletions tests/valkeytests/valkey_test_case.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import subprocess
import time
import random
import os
import pytest
import re
import struct
import threading
import io
import socket
from contextlib import contextmanager
from functools import wraps
from valkey import *
from valkey.client import Pipeline
from util.waiters import *

from enum import Enum
Expand Down

0 comments on commit 2b79825

Please sign in to comment.