Skip to content

Commit

Permalink
Refactor SSOHost and make test parametrization more understandable
Browse files Browse the repository at this point in the history
  • Loading branch information
lhellebr committed Jan 28, 2025
1 parent 4495774 commit bb9a399
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 129 deletions.
13 changes: 8 additions & 5 deletions pytest_fixtures/component/satellite_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
LDAP_ATTR,
LDAP_SERVER_TYPE,
)
from robottelo.hosts import IPAHost, SSOHost
from robottelo.hosts import IPAHost, RHBKHost, RHSSOHost
from robottelo.logging import logger
from robottelo.utils.datafactory import gen_string
from robottelo.utils.installer import InstallerCommand

Expand All @@ -24,9 +25,11 @@
@pytest.fixture(scope='module')
def default_sso_host(request, module_target_sat):
"""Returns default sso host"""
if hasattr(request, 'param'):
return SSOHost(module_target_sat, rhbk=request.param)
return SSOHost(module_target_sat)
if hasattr(request, 'param') and request.param:
logger.info("Using RHBK host for SSO")
return RHBKHost(module_target_sat)
logger.info("Using RHSSO host for SSO")
return RHSSOHost(module_target_sat)


@pytest.fixture(scope='module')
Expand Down Expand Up @@ -346,7 +349,7 @@ def enable_external_auth_rhsso(
enroll_configure_rhsso_external_auth, default_sso_host, module_target_sat
):
"""register the satellite with RH-SSO Server for single sign-on"""
client_id = default_sso_host.get_rhsso_client_id()
client_id = default_sso_host.get_sso_client_id()
default_sso_host.create_mapper(GROUP_MEMBERSHIP_MAPPER, client_id)
audience_mapper = copy.deepcopy(AUDIENCE_MAPPER)
audience_mapper['config']['included.client.audience'] = audience_mapper['config'][
Expand Down
12 changes: 12 additions & 0 deletions robottelo/config/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@
must_exist=True,
),
],
rhbk=[
Validator(
'rhbk.host_name',
'rhbk.host_port',
'rhbk.host_url',
'rhbk.rhbk_user',
'rhbk.rhbk_password',
'rhbk.realm',
'rhbk.totp_secret',
must_exist=True,
),
],
remotedb=[
Validator(
'remotedb.server',
Expand Down
109 changes: 60 additions & 49 deletions robottelo/hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2421,40 +2421,19 @@ def run_orphan_cleanup(self, smart_proxy_id=None):


class SSOHost(Host):
"""Class for RHSSO functions and setup"""
"""Class for SSO functions and setup"""

def __init__(self, sat_obj, rhbk=False, **kwargs):
def __init__(self, sat_obj, **kwargs):
self.satellite = sat_obj
if rhbk:
self.rhbk = True
self.host_url = settings.rhbk.host_url
self.host_name = settings.rhbk.host_name
self.host_port = settings.rhbk.host_port
self.realm = settings.rhbk.realm
self.user = settings.rhbk.rhbk_user
self.password = settings.rhbk.rhbk_password
self.kcadm = RHBK_CLI
kwargs['hostname'] = kwargs.get('hostname', settings.rhbk.host_name)
else:
self.rhbk = False
self.host_url = settings.rhsso.host_url
self.host_name = settings.rhsso.host_name
self.host_port = 443
self.realm = settings.rhsso.realm
self.user = settings.rhsso.rhsso_user
self.password = settings.rhsso.rhsso_password
self.kcadm = KEY_CLOAK_CLI
kwargs['hostname'] = kwargs.get('hostname', settings.rhsso.host_name)
kwargs['ipv6'] = kwargs.get('ipv6', settings.server.is_ipv6)
super().__init__(**kwargs)

def get_rhsso_client_id(self):
def get_sso_client_id(self):
"""getter method for fetching the client id and can be used other functions"""
client_name = f'{self.satellite.hostname}-foreman-openidc'
uri = self.host_url if self.rhbk else self.host_url.replace("https://", "http://")
self.execute(
f'{self.kcadm} config credentials '
f'--server {uri}/auth '
f'--server {self.uri}/auth '
f'--realm {self.realm} '
f'--user {self.user} '
f'--password {self.password}'
Expand All @@ -2470,21 +2449,21 @@ def get_rhsso_client_id(self):
return client_id

@lru_cache
def get_rhsso_user_details(self, username):
def get_sso_user_details(self, username):
"""Getter method to receive the user id"""
result = self.execute(f"{self.kcadm} get users -r {self.realm} -q username={username}")
result_json = json.loads(result.stdout)
return result_json[0]

@lru_cache
def get_rhsso_groups_details(self, group_name):
def get_sso_groups_details(self, group_name):
"""Getter method to receive the group id"""
result = self.execute(f"{self.kcadm} get groups -r {self.realm}")
group_list = json.loads(result.stdout)
query_group = [group for group in group_list if group['name'] == group_name]
return query_group[0]

def upload_rhsso_entity(self, json_content, entity_name):
def upload_sso_entity(self, json_content, entity_name):
"""Helper method to upload the RHSSO entity file on RHSSO Server.
Overwrites already existing file with the same name.
"""
Expand All @@ -2499,13 +2478,13 @@ def upload_rhsso_entity(self, json_content, entity_name):

def create_mapper(self, json_content, client_id):
"""Helper method to create the RH-SSO Client Mapper"""
self.upload_rhsso_entity(json_content, "mapper_file")
self.upload_sso_entity(json_content, "mapper_file")
self.execute(
f'{self.kcadm} create clients/{client_id}/protocol-mappers/models -r '
f'{self.realm} -f {"mapper_file"}'
)

def create_new_rhsso_user(self, username=None):
def create_new_sso_user(self, username=None):
"""create new user in RHSSO instance and set the password"""
update_data_user = Box(RHSSO_NEW_USER)
update_data_pass = Box(RHSSO_RESET_PASSWORD)
Expand All @@ -2514,54 +2493,52 @@ def create_new_rhsso_user(self, username=None):
update_data_user.username = username
update_data_user.email = username + random.choice(valid_emails_list())
update_data_pass.value = self.password
self.upload_rhsso_entity(update_data_user, "create_user")
self.upload_rhsso_entity(update_data_pass, "reset_password")
self.upload_sso_entity(update_data_user, "create_user")
self.upload_sso_entity(update_data_pass, "reset_password")
self.execute(f"{self.kcadm} create users -r {self.realm} -f create_user")
user_details = self.get_rhsso_user_details(update_data_user.username)
user_details = self.get_sso_user_details(update_data_user.username)
self.execute(
f'{self.kcadm} update -r {self.realm} '
f'users/{user_details["id"]}/reset-password -f {"reset_password"}'
)
return update_data_user

def update_rhsso_user(self, username, group_name=None):
def update_sso_user(self, username, group_name=None):
update_data_user = Box(RHSSO_USER_UPDATE)
user_details = self.get_rhsso_user_details(username)
user_details = self.get_sso_user_details(username)
update_data_user.realm = self.realm
update_data_user.userId = f"{user_details['id']}"
if group_name:
group_details = self.get_rhsso_groups_details(group_name=group_name)
group_details = self.get_sso_groups_details(group_name=group_name)
update_data_user['groupId'] = f"{group_details['id']}"
self.upload_rhsso_entity(update_data_user, "update_user")
self.upload_sso_entity(update_data_user, "update_user")
group_path = f"users/{user_details['id']}/groups/{group_details['id']}"
self.execute(f"{self.kcadm} update -r {self.realm} {group_path} -f update_user")

def delete_rhsso_user(self, username):
def delete_sso_user(self, username):
"""Delete the RHSSO user"""
user_details = self.get_rhsso_user_details(username)
self.execute(f"{self.kcadm} delete -r {settings.rhsso.realm} users/{user_details['id']}")
user_details = self.get_sso_user_details(username)
self.execute(f"{self.kcadm} delete -r {settings.sso.realm} users/{user_details['id']}")

def create_group(self, group_name=None):
"""Create the RHSSO group"""
update_user_group = Box(RHSSO_NEW_GROUP)
if not group_name:
group_name = gen_string('alphanumeric')
update_user_group.name = group_name
self.upload_rhsso_entity(update_user_group, "create_group")
result = self.execute(
f"{self.kcadm} create groups -r {settings.rhsso.realm} -f create_group"
)
self.upload_sso_entity(update_user_group, "create_group")
result = self.execute(f"{self.kcadm} create groups -r {settings.sso.realm} -f create_group")
return result.stdout

def delete_rhsso_group(self, group_name):
def delete_sso_group(self, group_name):
"""Delete the RHSSO group"""
group_details = self.get_rhsso_groups_details(group_name)
self.execute(f"{self.kcadm} delete -r {settings.rhsso.realm} groups/{group_details['id']}")
group_details = self.get_sso_groups_details(group_name)
self.execute(f"{self.kcadm} delete -r {settings.sso.realm} groups/{group_details['id']}")

def update_client_configuration(self, json_content):
"""Update the client configuration"""
client_id = self.get_rhsso_client_id()
self.upload_rhsso_entity(json_content, "update_client_info")
client_id = self.get_sso_client_id()
self.upload_sso_entity(json_content, "update_client_info")
update_cmd = (
f"{self.kcadm} update clients/{client_id} " # EOL space important
"-f update_client_info -s enabled=true --merge"
Expand Down Expand Up @@ -2605,6 +2582,40 @@ def set_the_redirect_uri(self):
self.update_client_configuration(client_config)


class RHBKHost(SSOHost):
"""Class for RHBK functions and setup"""

def __init__(self, sat_obj, **kwargs):
self.rhbk = True
self.host_url = settings.rhbk.host_url
self.uri = self.host_url
self.host_name = settings.rhbk.host_name
self.host_port = settings.rhbk.host_port
self.realm = settings.rhbk.realm
self.user = settings.rhbk.rhbk_user
self.password = settings.rhbk.rhbk_password
self.kcadm = RHBK_CLI
kwargs['hostname'] = kwargs.get('hostname', settings.rhbk.host_name)
super().__init__(sat_obj, **kwargs)


class RHSSOHost(SSOHost):
"""Class for RHSSO functions and setup"""

def __init__(self, sat_obj, **kwargs):
self.rhbk = False
self.host_url = settings.rhsso.host_url
self.uri = self.host_url.replace("https://", "http://")
self.host_name = settings.rhsso.host_name
self.host_port = 443
self.realm = settings.rhsso.realm
self.user = settings.rhsso.rhsso_user
self.password = settings.rhsso.rhsso_password
self.kcadm = KEY_CLOAK_CLI
kwargs['hostname'] = kwargs.get('hostname', settings.rhsso.host_name)
super().__init__(sat_obj, **kwargs)


class IPAHost(Host):
def __init__(self, sat_obj, **kwargs):
self.satellite = sat_obj
Expand Down
82 changes: 51 additions & 31 deletions tests/foreman/api/test_templatesync.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
FOREMAN_TEMPLATES_NOT_IMPORTED_COUNT,
)
from robottelo.logging import logger
from robottelo.utils.issue_handlers import is_open

git = settings.git

Expand Down Expand Up @@ -1066,29 +1065,58 @@ def test_positive_export_log_to_production(
@pytest.mark.tier2
@pytest.mark.skip_if_not_set('git')
@pytest.mark.parametrize(
'url',
('url', 'git_repository', 'use_proxy', 'setup_http_proxy_without_global_settings'),
[
f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}',
f'ssh://git@{git.hostname}:{git.ssh_port}',
(
f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}',
True,
True,
True,
),
(
f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}',
True,
True,
False,
),
(
f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}',
True,
False,
True,
),
(
f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}',
False,
True,
True,
),
(
f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}',
False,
False,
True,
),
(
f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}',
False,
True,
False,
),
(f'ssh://git@{git.hostname}:{git.ssh_port}', True, False, True),
(f'ssh://git@{git.hostname}:{git.ssh_port}', False, False, True),
],
ids=['http', 'ssh'],
)
@pytest.mark.parametrize(
'git_repository',
[True, False],
indirect=True,
ids=['non_empty_repo', 'empty_repo'],
)
@pytest.mark.parametrize(
'use_proxy',
[True, False],
ids=['use_proxy', 'do_not_use_proxy'],
)
@pytest.mark.parametrize(
'setup_http_proxy_without_global_settings',
[True, False],
indirect=True,
ids=['auth_http_proxy', 'unauth_http_proxy'],
ids=[
'auth_http_proxy-use_proxy-non_empty_repo-http',
'unauth_http_proxy-use_proxy-non_empty_repo-http',
'auth_http_proxy-do_not_use_proxy-non_empty_repo-http',
'auth_http_proxy-use_proxy-empty_repo-http',
'auth_http_proxy-do_not_use_proxy-empty_repo-http',
'unauth_http_proxy-use_proxy-empty_repo-http',
'auth_http_proxy-do_not_use_proxy-non_empty_repo-ssh',
'auth_http_proxy-do_not_use_proxy-empty_repo-ssh',
],
indirect=['git_repository', 'setup_http_proxy_without_global_settings'],
)
def test_positive_export_all_templates_to_repo(
self,
Expand Down Expand Up @@ -1119,15 +1147,7 @@ def test_positive_export_all_templates_to_repo(
:CaseImportance: Low
"""
# TODO remove this
if is_open('SAT-28933') and 'ssh' in url:
pytest.skip("Temporary skip of SSH tests")
proxy, param = setup_http_proxy_without_global_settings
if not use_proxy and not param:
# only do-not-use one kind of proxy
pytest.skip(
"Invalid parameter combination. DO NOT USE PROXY scenario should only be tested once."
)
proxy, _ = setup_http_proxy_without_global_settings
try:
data = {
'repo': f'{url}/{git.username}/{git_repository["name"]}',
Expand Down
Loading

0 comments on commit bb9a399

Please sign in to comment.