From 1df99e9135db8d67271c4c87abc1c0f6c580f452 Mon Sep 17 00:00:00 2001 From: John Baublitz Date: Tue, 28 Jan 2025 14:18:55 -0500 Subject: [PATCH] Fix Python tests for r8 --- tests/client-dbus/tests/udev/_utils.py | 57 ++++-- tests/client-dbus/tests/udev/test_bind.py | 42 ++-- tests/client-dbus/tests/udev/test_predict.py | 2 +- tests/client-dbus/tests/udev/test_revert.py | 4 +- tests/client-dbus/tests/udev/test_udev.py | 201 +++++++++++++------ 5 files changed, 214 insertions(+), 92 deletions(-) diff --git a/tests/client-dbus/tests/udev/_utils.py b/tests/client-dbus/tests/udev/_utils.py index 9ea1784835..fd1a00815d 100644 --- a/tests/client-dbus/tests/udev/_utils.py +++ b/tests/client-dbus/tests/udev/_utils.py @@ -83,10 +83,10 @@ def create_pool( Creates a stratis pool. :param name: Name of pool :param devices: Devices to use for pool - :param key_description: optional key description - :type key_description: str or NoneType + :param key_description: optional key descriptions and token slots + :type key_description: list of tuples :param clevis_info: clevis information, pin and config - :type clevis_info: pair of str * (bool, str) + :type clevis_info: list of tuples :return: result of pool create if operation succeeds :rtype: bool * str * list of str :raises RuntimeError: if pool is not created @@ -98,9 +98,18 @@ def create_legacy_pool(): if len(get_pools(name)) == 0: cmdline = [_LEGACY_POOL, name] + devices if key_description is not None: - cmdline.extend(["--key-desc", key_description]) + if len(key_description) > 1: + raise RuntimeError( + "Can only provide one key description to legacy pools" + ) + (kd, _) = key_description[0] + cmdline.extend(["--key-desc", kd]) if clevis_info is not None: - (pin, (tang_url, thp)) = clevis_info + if len(clevis_info) > 1: + raise RuntimeError( + "Can only provide one Clevis info to legacy pools" + ) + (pin, (tang_url, thp), _) = clevis_info[0] cmdline.extend(["--clevis", pin]) if pin == "tang": cmdline.extend(["--tang-url", tang_url]) @@ -131,12 +140,26 @@ def create_legacy_pool(): return (newly_created, (pool_object_path, bd_object_paths)) def create_v2_pool(): - if clevis_info is None: - clevis_arg = None - else: - (pin, (tang_url, thp)) = clevis_info + dbus_key_descriptions = [] + for kd, slot in key_description if key_description is not None else []: + if slot is None: + dbus_slot = (False, 0) + else: + dbus_slot = (True, slot) + + dbus_key_descriptions.append((dbus_slot, kd)) + + dbus_clevis_infos = [] + for pin, (tang_url, thp), slot in ( + clevis_info if clevis_info is not None else [] + ): + if slot is None: + dbus_slot = (False, 0) + else: + dbus_slot = (True, slot) + if pin == "tang": - clevis_arg = ( + (pin, config) = ( "tang", json.dumps( {"url": tang_url, "stratis:tang:trust_url": True} @@ -145,19 +168,19 @@ def create_v2_pool(): ), ) else: - clevis_arg = None + raise RuntimeError( + "Currently only Tang is supported for Clevis in the test infrastructure" + ) + + dbus_clevis_infos.append((dbus_slot, pin, config)) (result, exit_code, error_str) = Manager.Methods.CreatePool( get_object(TOP_OBJECT), { "name": name, "devices": devices, - "key_desc": ( - (False, "") if key_description is None else (True, key_description) - ), - "clevis_info": ( - (False, ("", "")) if clevis_arg is None else (True, clevis_arg) - ), + "key_desc": dbus_key_descriptions, + "clevis_info": dbus_clevis_infos, "journal_size": (False, 0), "tag_spec": (False, ""), "allocate_superblock": (False, False), diff --git a/tests/client-dbus/tests/udev/test_bind.py b/tests/client-dbus/tests/udev/test_bind.py index c35e47747b..2d38608457 100644 --- a/tests/client-dbus/tests/udev/test_bind.py +++ b/tests/client-dbus/tests/udev/test_bind.py @@ -57,13 +57,17 @@ def test_binding_and_adding(self): with OptionalKeyServiceContextManager(key_spec=[(key_description, key)]): pool_name = random_string(5) (_, (pool_object_path, _)) = create_pool( - pool_name, initial_devnodes, key_description=key_description + pool_name, initial_devnodes, key_description=[(key_description, None)] ) self.wait_for_pools(1) (_, exit_code, message) = Pool.Methods.BindClevis( get_object(pool_object_path), - {"pin": "tang", "json": self._CLEVIS_CONFIG_STR}, + { + "pin": "tang", + "json": self._CLEVIS_CONFIG_STR, + "token_slot": (False, 0), + }, ) self.assertEqual(exit_code, StratisdErrors.OK, message) @@ -88,13 +92,17 @@ def test_binding_unbinding_adding(self): with OptionalKeyServiceContextManager(key_spec=[(key_description, key)]): pool_name = random_string(5) (_, (pool_object_path, _)) = create_pool( - pool_name, initial_devnodes, key_description=key_description + pool_name, initial_devnodes, key_description=[(key_description, None)] ) self.wait_for_pools(1) (_, exit_code, message) = Pool.Methods.BindClevis( get_object(pool_object_path), - {"pin": "tang", "json": self._CLEVIS_CONFIG_STR}, + { + "pin": "tang", + "json": self._CLEVIS_CONFIG_STR, + "token_slot": (False, 0), + }, ) self.assertEqual(exit_code, StratisdErrors.OK, message) @@ -126,13 +134,17 @@ def test_swap_binding(self): with OptionalKeyServiceContextManager(key_spec=[(key_description, key)]): pool_name = random_string(5) (_, (pool_object_path, _)) = create_pool( - pool_name, initial_devnodes, key_description=key_description + pool_name, initial_devnodes, key_description=[(key_description, None)] ) self.wait_for_pools(1) (_, exit_code, message) = Pool.Methods.BindClevis( get_object(pool_object_path), - {"pin": "tang", "json": self._CLEVIS_CONFIG_STR}, + { + "pin": "tang", + "json": self._CLEVIS_CONFIG_STR, + "token_slot": (False, 0), + }, ) self.assertEqual(exit_code, StratisdErrors.OK, message) @@ -164,12 +176,15 @@ def test_swap_binding_2(self): with OptionalKeyServiceContextManager(key_spec=[(key_description, key)]): pool_name = random_string(5) (_, (pool_object_path, _)) = create_pool( - pool_name, initial_devnodes, clevis_info=("tang", (_TANG_URL, None)) + pool_name, + initial_devnodes, + clevis_info=[("tang", (_TANG_URL, None), None)], ) self.wait_for_pools(1) (_, exit_code, message) = Pool.Methods.BindKeyring( - get_object(pool_object_path), {"key_desc": key_description} + get_object(pool_object_path), + {"key_desc": key_description, "token_slot": (False, 0)}, ) self.assertEqual(exit_code, StratisdErrors.OK, message) @@ -201,12 +216,14 @@ def test_rebind_with_clevis(self): pool_name = random_string(5) (_, (pool_object_path, _)) = create_pool( - pool_name, initial_devnodes, clevis_info=("tang", (_TANG_URL, None)) + pool_name, + initial_devnodes, + clevis_info=[("tang", (_TANG_URL, None), None)], ) self.wait_for_pools(1) (_, exit_code, message) = Pool.Methods.RebindClevis( - get_object(pool_object_path), {} + get_object(pool_object_path), {"token_slot": (False, 0)} ) self.assertEqual(exit_code, StratisdErrors.OK, message) @@ -239,12 +256,13 @@ def test_rebind_with_new_key_description(self): with OptionalKeyServiceContextManager(key_spec=keys): pool_name = random_string(5) (_, (pool_object_path, _)) = create_pool( - pool_name, initial_devnodes, key_description=keys[0][0] + pool_name, initial_devnodes, key_description=[(keys[0][0], None)] ) self.wait_for_pools(1) (_, exit_code, message) = Pool.Methods.RebindKeyring( - get_object(pool_object_path), {"key_desc": keys[1][0]} + get_object(pool_object_path), + {"key_desc": keys[1][0], "token_slot": (False, 0)}, ) self.assertEqual(exit_code, StratisdErrors.OK, message) diff --git a/tests/client-dbus/tests/udev/test_predict.py b/tests/client-dbus/tests/udev/test_predict.py index 214c64eeab..ec38e78a1b 100644 --- a/tests/client-dbus/tests/udev/test_predict.py +++ b/tests/client-dbus/tests/udev/test_predict.py @@ -302,7 +302,7 @@ def test_prediction_encrypted(self): (key_description, key) = ("key_spec", "data") with OptionalKeyServiceContextManager(key_spec=[(key_description, key)]): pool_name = random_string(5) - create_pool(pool_name, devnodes, key_description=key_description) + create_pool(pool_name, devnodes, key_description=[(key_description, None)]) self.wait_for_pools(1) self._test_prediction(pool_name) diff --git a/tests/client-dbus/tests/udev/test_revert.py b/tests/client-dbus/tests/udev/test_revert.py index 99bfe00106..df8c7bd533 100644 --- a/tests/client-dbus/tests/udev/test_revert.py +++ b/tests/client-dbus/tests/udev/test_revert.py @@ -158,7 +158,7 @@ def test_revert(self): # pylint: disable=too-many-locals { "id": pool_name, "id_type": "name", - "unlock_method": (False, ""), + "unlock_method": (False, (False, 0)), "key_fd": (False, 0), }, ) @@ -276,7 +276,7 @@ def test_revert_snapshot_chain(self): # pylint: disable=too-many-locals { "id": pool_name, "id_type": "name", - "unlock_method": (False, ""), + "unlock_method": (False, (False, 0)), "key_fd": (False, 0), }, ) diff --git a/tests/client-dbus/tests/udev/test_udev.py b/tests/client-dbus/tests/udev/test_udev.py index eaa3047d66..a6b3c9812a 100644 --- a/tests/client-dbus/tests/udev/test_udev.py +++ b/tests/client-dbus/tests/udev/test_udev.py @@ -252,7 +252,7 @@ def _simple_initial_discovery_test( self.wait_for_pools(0) (_, (pool_object_path, device_object_paths)) = create_pool( - random_string(5), devnodes, key_description=key_description + random_string(5), devnodes, key_description=[(key_description, None)] ) wait_for_udev(STRATIS_FS_TYPE, get_devnodes(device_object_paths)) self.wait_for_pools(1) @@ -262,15 +262,30 @@ def _simple_initial_discovery_test( remove_stratis_setup() with OptionalKeyServiceContextManager(key_spec=key_spec): - ((changed, _), exit_code, _) = Manager.Methods.StartPool( - get_object(TOP_OBJECT), - { - "id": pool_uuid, - "unlock_method": (True, str(EncryptionMethod.KEYRING)), - "id_type": "uuid", - "key_fd": (False, 0), - }, - ) + if _LEGACY_POOL is not None: + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": pool_uuid, + "unlock_method": (True, (True, 1)), + "id_type": "uuid", + "key_fd": (False, 0), + }, + ) + else: + kds = Pool.Properties.KeyDescriptions.Get(get_object(pool_object_path)) + self.assertEqual(len(kds), 1) + (slot, _) = kds[0] + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": pool_uuid, + "unlock_method": (True, (True, slot)), + "id_type": "uuid", + "key_fd": (False, 0), + }, + ) + if key_spec is None: self.assertNotEqual(exit_code, StratisdErrors.OK) self.assertEqual(changed, False) @@ -344,7 +359,7 @@ def _simple_event_test(self, *, key_spec=None): # pylint: disable=too-many-loca self.wait_for_pools(0) (_, (pool_object_path, _)) = create_pool( - random_string(5), devnodes, key_description=key_description + random_string(5), devnodes, key_description=[(key_description, None)] ) self.wait_for_pools(1) pool_uuid = Pool.Properties.Uuid.Get(get_object(pool_object_path)) @@ -367,15 +382,30 @@ def _simple_event_test(self, *, key_spec=None): # pylint: disable=too-many-loca wait_for_udev(udev_wait_type, self._lb_mgr.device_files(tokens_up)) self.wait_for_pools(0) - ((changed, _), exit_code, _) = Manager.Methods.StartPool( - get_object(TOP_OBJECT), - { - "id": pool_uuid, - "unlock_method": (True, str(EncryptionMethod.KEYRING)), - "id_type": "uuid", - "key_fd": (False, 0), - }, - ) + if _LEGACY_POOL is not None: + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": pool_uuid, + "unlock_method": (True, (True, 1)), + "id_type": "uuid", + "key_fd": (False, 0), + }, + ) + else: + kds = Pool.Properties.KeyDescriptions.Get(get_object(pool_object_path)) + self.assertEqual(len(kds), 1) + (slot, _) = kds[0] + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": pool_uuid, + "unlock_method": (True, (True, slot)), + "id_type": "uuid", + "key_fd": (False, 0), + }, + ) + # This should always fail because a pool cannot be successfully # started without all devices present. self.assertNotEqual(exit_code, StratisdErrors.OK) @@ -388,15 +418,29 @@ def _simple_event_test(self, *, key_spec=None): # pylint: disable=too-many-loca wait_for_udev(udev_wait_type, self._lb_mgr.device_files(tokens_up)) - ((changed, _), exit_code, _) = Manager.Methods.StartPool( - get_object(TOP_OBJECT), - { - "id": pool_uuid, - "unlock_method": (True, str(EncryptionMethod.KEYRING)), - "id_type": "uuid", - "key_fd": (False, 0), - }, - ) + if _LEGACY_POOL is not None: + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": pool_uuid, + "unlock_method": (True, (True, 1)), + "id_type": "uuid", + "key_fd": (False, 0), + }, + ) + else: + kds = Pool.Properties.KeyDescriptions.Get(get_object(pool_object_path)) + self.assertEqual(len(kds), 1) + (slot, _) = kds[0] + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": pool_uuid, + "unlock_method": (True, (True, slot)), + "id_type": "uuid", + "key_fd": (False, 0), + }, + ) if key_spec is None or _LEGACY_POOL is None: self.assertNotEqual(exit_code, StratisdErrors.OK) @@ -469,7 +513,9 @@ def test_duplicate_pool_name( if random.choice([True, False]) else None ) - create_pool(pool_name, devnodes, key_description=key_description) + create_pool( + pool_name, devnodes, key_description=[(key_description, None)] + ) if key_description is None: unencrypted_indices.append(i) else: @@ -523,15 +569,26 @@ def test_duplicate_pool_name( ) for pool_uuid in variant_pool_uuids: - Manager.Methods.StartPool( - get_object(TOP_OBJECT), - { - "id": pool_uuid, - "unlock_method": (True, str(EncryptionMethod.KEYRING)), - "id_type": "uuid", - "key_fd": (False, 0), - }, - ) + if _LEGACY_POOL is not None: + Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": pool_uuid, + "unlock_method": (True, (True, 1)), + "id_type": "uuid", + "key_fd": (False, 0), + }, + ) + else: + Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": pool_uuid, + "unlock_method": (True, (False, 0)), + "id_type": "uuid", + "key_fd": (False, 0), + }, + ) wait_for_udev_count(len(non_luks_tokens) + len(luks_tokens)) @@ -703,7 +760,9 @@ def _simple_start_by_name_test(self): ) as key_desc: self.wait_for_pools(0) - create_pool("encrypted", devnodes[:2], key_description=key_desc[0]) + create_pool( + "encrypted", devnodes[:2], key_description=[(key_desc[0], None)] + ) create_pool("unencrypted", devnodes[2:]) self.wait_for_pools(2) @@ -733,15 +792,26 @@ def _simple_start_by_name_test(self): ) settle() - ((changed, _), exit_code, _) = Manager.Methods.StartPool( - get_object(TOP_OBJECT), - { - "id": "encrypted", - "unlock_method": (True, str(EncryptionMethod.KEYRING)), - "id_type": "name", - "key_fd": (False, 0), - }, - ) + if _LEGACY_POOL is not None: + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": "encrypted", + "unlock_method": (True, (True, 1)), + "id_type": "name", + "key_fd": (False, 0), + }, + ) + else: + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": "encrypted", + "unlock_method": (True, (False, 0)), + "id_type": "name", + "key_fd": (False, 0), + }, + ) self.assertFalse(changed) self.assertEqual(exit_code, 1) self.assertEqual( @@ -753,7 +823,7 @@ def _simple_start_by_name_test(self): get_object(TOP_OBJECT), { "id": "unencrypted", - "unlock_method": (False, ""), + "unlock_method": (False, (False, 0)), "id_type": "name", "key_fd": (False, 0), }, @@ -770,15 +840,26 @@ def _simple_start_by_name_test(self): ) settle() - ((changed, _), exit_code, _) = Manager.Methods.StartPool( - get_object(TOP_OBJECT), - { - "id": "encrypted", - "unlock_method": (True, str(EncryptionMethod.KEYRING)), - "id_type": "name", - "key_fd": (False, 0), - }, - ) + if _LEGACY_POOL is not None: + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": "encrypted", + "unlock_method": (True, (True, 1)), + "id_type": "name", + "key_fd": (False, 0), + }, + ) + else: + ((changed, _), exit_code, _) = Manager.Methods.StartPool( + get_object(TOP_OBJECT), + { + "id": "encrypted", + "unlock_method": (True, (False, 0)), + "id_type": "name", + "key_fd": (False, 0), + }, + ) self.assertTrue(changed) self.assertEqual(exit_code, 0) self.assertEqual( @@ -789,7 +870,7 @@ def _simple_start_by_name_test(self): get_object(TOP_OBJECT), { "id": "unencrypted", - "unlock_method": (False, ""), + "unlock_method": (False, (False, 0)), "id_type": "name", "key_fd": (False, 0), },