Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a bloom command keyspace test #40

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions tests/test_bloom_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ def test_too_large_bloom_obj(self):
# is rejected.
assert client.execute_command('CONFIG SET bf.bloom-memory-usage-limit 1000') == b'OK'
cmds = [
'BF.INSERT filter items new_item1',
'BF.ADD filter new_item1',
'BF.MADD filter new_item1 new_item2',
'BF.INSERT filter ITEMS',
'BF.ADD filter',
'BF.MADD filter',
]
# Fill a filter to capacity.
assert client.execute_command('BF.RESERVE filter 0.001 100 EXPANSION 10') == b'OK'
Expand All @@ -94,12 +94,19 @@ def test_too_large_bloom_obj(self):
assert client.execute_command('BF.INFO filter FILTERS') == 1
assert client.execute_command('BF.INFO filter EXPANSION') == 10
# Validate that scale out is rejected with appropriate error.
new_item_idx = 0
for cmd in cmds:
if "BF.ADD" in cmd:
self.verify_error_response(self.client, cmd, obj_exceeds_size_err)
else:
response = client.execute_command(cmd)
assert obj_exceeds_size_err in str(response[0])
response = ""
while obj_exceeds_size_err not in response:
item = f"new_item{new_item_idx}"
new_item_idx += 1
if "BF.ADD" in cmd:
response = self.verify_error_response(self.client,f"{cmd} {item}", obj_exceeds_size_err)
else:
response = str(client.execute_command(f"{cmd} {item}"))
if "1" in response:
assert False, f"{cmd} returned a value of 1 when it should have thrown an {obj_exceeds_size_err}"
new_item_idx -= 1

def test_large_allocation_when_below_maxmemory(self):
two_megabytes = 2 * 1024 * 1024
Expand Down
70 changes: 70 additions & 0 deletions tests/test_bloom_keyspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging, time
from valkey_bloom_test_case import ValkeyBloomTestCaseBase
from valkeytests.conftest import resource_port_tracker

class TestKeyEventNotifications(ValkeyBloomTestCaseBase):
RESERVE_KEYSPACE_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyspace@0__:intermediate_val', 'data': b'bloom.reserve'}
RESERVE_KEYEVENT_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyevent@0__:bloom.reserve', 'data': b'intermediate_val'}
ADD_KEYSPACE_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyspace@0__:intermediate_val', 'data': b'bloom.add'}
ADD_KEYEVENT_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyevent@0__:bloom.add', 'data': b'intermediate_val'}

def create_expected_message_list(self, reserve_expected, add_expected, key_name):
expected_messages = []
self.RESERVE_KEYSPACE_MESSAGE['channel'] = f"__keyspace@0__:{key_name}".encode('utf-8')
self.RESERVE_KEYEVENT_MESSAGE['data'] = f"{key_name}".encode('utf-8')
self.ADD_KEYSPACE_MESSAGE['channel'] = f"__keyspace@0__:{key_name}".encode('utf-8')
self.ADD_KEYEVENT_MESSAGE['data'] = f"{key_name}".encode('utf-8')
if reserve_expected:
expected_messages.append(self.RESERVE_KEYEVENT_MESSAGE)
expected_messages.append(self.RESERVE_KEYSPACE_MESSAGE)
if add_expected:
expected_messages.append(self.ADD_KEYSPACE_MESSAGE)
expected_messages.append(self.ADD_KEYEVENT_MESSAGE)
return expected_messages

def check_response(self, result_messages, expected_messages):
extra_message = self.keyspace_client_subscribe.get_message()
if extra_message:
assert False, f"Unexpected extra message returned: {extra_message}"
for message in expected_messages:
assert message in result_messages, f"{message} was not found in messages received"

def get_subscribe_client_messages(self, client, cmd, expected_message_count):
client.execute_command(cmd)
count = 0
messages = []
timeout = time.time() + 5
while expected_message_count != count:
message = self.keyspace_client_subscribe.get_message()
if message:
# Only for the first time we get messages we should skip the first message gotten
if count > 0 or "BF.ADD" not in cmd:
messages.append(message)
count = count + 1
if timeout < time.time():
assert False, f"The number of expected messages failed tor eturn in time, messages received so far {messages}"
return messages

def test_keyspace_bloom_commands(self):
self.create_subscribe_clients()
# The first call to get messages will return message that shows we subscribed to messages so we expect one more message than we need to check for
# the first time we look at messages
bloom_commands = [
('BF.ADD add_test key', True, True, 5),
('BF.MADD madd_test key1 key2', True, True, 4),
('BF.EXISTS exists_test key', False, False, 0),
('BF.INSERT insert_test ITEMS key1 key2', True, True, 4),
('BF.RESERVE reserve_test 0.01 1000', True, False, 2)
]

for command, reserve_expected, add_expected, expected_message_count in bloom_commands:
expected_messages = self.create_expected_message_list(reserve_expected, add_expected, command.split()[1]) if reserve_expected else []
result_messages = self.get_subscribe_client_messages(self.keyspace_client, command, expected_message_count)
self.check_response(result_messages, expected_messages)

def create_subscribe_clients(self):
self.keyspace_client = self.server.get_new_client()
self.keyspace_client_subscribe = self.keyspace_client.pubsub()
self.keyspace_client_subscribe.psubscribe('__key*__:*')
self.keyspace_client.execute_command('CONFIG' ,'SET','notify-keyspace-events', 'KEA')

2 changes: 1 addition & 1 deletion tests/test_bloom_save_and_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_restore_failed_large_bloom_filter(self):
# Create a large bloom filter.
# When we try to restore this on a server with the default max allowed filter size of 128MB, start up should fail.
updated_max_size = 180 * 1024 * 1024
original_max_size = 64 * 1024 * 1024
original_max_size = int(client.execute_command('CONFIG GET bf.bloom-memory-usage-limit')[1])
bf_add_result_1 = client.execute_command('CONFIG SET bf.bloom-memory-usage-limit ' + str(updated_max_size))
client.execute_command('BF.RESERVE testSave 0.001 100000000')
assert int(client.execute_command('BF.INFO testSave size')) > original_max_size
Expand Down
1 change: 1 addition & 0 deletions tests/valkey_bloom_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def verify_error_response(self, client, cmd, expected_err_reply):
except ResponseError as e:
assert_error_msg = f"Actual error message: '{str(e)}' is different from expected error message '{expected_err_reply}'"
assert str(e) == expected_err_reply, assert_error_msg
return str(e)

def verify_command_success_reply(self, client, cmd, expected_result):
cmd_actual_result = client.execute_command(cmd)
Expand Down
Loading