diff --git a/README.md b/README.md index 6abffc8..548eba5 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,14 @@ Default: `true` Type: `bool` +### sudo_check_if_configured + +This variable provides semantic check of already configured sudoers in case ansible setup is not needed and it is skipped. + +Default: `true` + +Type: `bool` + ### sudo_remove_unauthorized_included_files ***Dangerous!*** Setting this to `true` removes each existing sudoers file in the `include_directories` dictionary that are not defined in the`sudo_sudoers_files` variable. diff --git a/ansible_pytest_extra_requirements.txt b/ansible_pytest_extra_requirements.txt new file mode 100644 index 0000000..6bafb6f --- /dev/null +++ b/ansible_pytest_extra_requirements.txt @@ -0,0 +1,6 @@ +# SPDX-License-Identifier: MIT + +# ansible and dependencies for all supported platforms +ansible ; python_version > "2.6" +idna<2.8 ; python_version < "2.7" +PyYAML<5.1 ; python_version < "2.7" diff --git a/defaults/main.yml b/defaults/main.yml index 89274f4..680b3fe 100644 --- a/defaults/main.yml +++ b/defaults/main.yml @@ -2,6 +2,8 @@ sudo_rewrite_default_sudoers_file: true sudo_remove_unauthorized_included_files: false +sudo_check_if_configured: true + sudo_visudo_path: /usr/sbin/visudo sudo_transactional_update_reboot_ok: null diff --git a/library/scan_sudoers.py b/library/scan_sudoers.py new file mode 100644 index 0000000..7bcfc7a --- /dev/null +++ b/library/scan_sudoers.py @@ -0,0 +1,686 @@ +#!/usr/bin/python + +# Copyright: (c) 2019, Andrew J. Huffman +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +DOCUMENTATION = """ +--- +module: "scan_sudoers" +short_description: "Parses the /etc/sudoers and /etc/sudoers.d/* files." +author: + - "Andrew J. Huffman (@ahuffman)" +description: + - "This module is designed to collect information from C(/etc/sudoers)." + - "The #include (files) and #includedir (directories) will be dynamically calculated and all included files will be parsed." + - "This module is compatible with Linux and Unix systems." + - "You will need to run the playbook as a privileged user or a user with appropriate privilege escalation" +options: + output_raw_configs: + description: + - Whether or not to output raw configuration lines (excluding comments) from the scanned sudoers files + type: bool + default: true + required: false + output_parsed_configs: + description: + - Whether or not to output parsed data from the scanned sudoers files + type: bool + default: true + required: false +""" + +EXAMPLES = """ +- name: "Scan sudoers files - output everything" + scan_sudoers: + +- name: "Scan sudoers files - output raw configuration lines only" + scan_sudoers: + output_parsed_configs: false + +- name: "Scan sudoers files - output parsed configurations only" + scan_sudoers: + output_raw_configs: false + +- name: "Scan sudoers files - output only included files and paths (minimal output)" + scan_sudoers: + output_raw_configs: false + output_parsed_configs: false +""" + +RETURN = """ +sudoers: + description: "List of parsed sudoers data and included sudoers data" + returned: "success" + type: "list" + sample: + ansible_facts: + sudoers: + all_scanned_files: + - /etc/sudoers.d/group1 + - /etc/sudoers.d/group2 + - /etc/sudoers + sudoers_files: + - aliases: + cmnd_alias: + host_alias: + runas_alias: + user_alias: + configuration: + - 'Host_Alias SOMEHOSTS = server1, server2' + - ... + - '#includedir /etc/sudoers.d' + defaults: + - '!visiblepw' + - env_reset + - secure_path: + - /usr/local/sbin + - /usr/local/bin + - /usr/sbin + - /usr/bin + - /sbin + - /bin + - env_keep: + - COLORS + - DISPLAY + - ... + - ... + include_directories: + - /etc/sudoers.d + include_files: + - /etc/sudoers.d/file1 + - /etc/sudoers.d/file2 + - /tmp/some/file + - ... + path: /etc/sudoers + user_specifications: + - commands: + - ALL + hosts: + - ALL + operators: + - ALL + tags: + - NOPASSWD + users: + - '%wheel' + - defaults: + - '!requiretty' + type: user + users: + - STAFF + - INTERNS + - aliases: + ... +""" + +from ansible.module_utils.basic import AnsibleModule +import os +from os.path import isfile, join +import re + + +def get_includes(path): + # Get includes + fp = open(path, "r") + sudoers_file = fp.read() + fp.close() + + includes = dict() + includes["include_files"] = list() + include_dir = "" + includes["include_directories"] = list() + + # Regex for "#includedir" and "#include" sudoers options + includedir_re = re.compile(r"(^#includedir)+\s+(.*$)") + include_re = re.compile(r"(^#include)+\s+(.*$)") + + for sline in sudoers_file.strip().split("\n"): + line = sline.replace("\n", "").replace("\t", " ") + # Search for '#includedir' + if includedir_re.search(line): + include_dir = includedir_re.search(line).group(2) + + # Search for '#include' + if include_re.search(line): + includes["include_files"].append(include_re.search(line).group(2)) + + if include_dir: + # build multi-file output + includes["include_directories"].append(include_dir) + # Get list of all included sudoers files + includes["include_files"] += [ + join(include_dir, filename) + for filename in os.listdir(include_dir) + if isfile(join(include_dir, filename)) + ] + elif not includes["include_files"]: + includes.pop("include_files") + + if not includes["include_directories"]: + includes.pop("include_directories") + + return includes + + +def get_config_lines(path, params): + # Read sudoers file + fp = open(path, "r") + all_lines = fp.read() + fp.close() + + # Initialize empty return dict + sudoer_file = dict() + # Initialize aliases vars + sudoer_aliases = dict() + user_aliases = list() + runas_aliases = list() + host_aliases = list() + command_aliases = list() + user_specifications = list() + # Raw config lines output + config_lines = list() + # Regex for Parsers + comment_re = re.compile(r"^#+") + include_re = re.compile(r"^#include") + defaults_re = re.compile(r"^(Defaults)+\s+(.*$)") + cmnd_alias_re = re.compile( + r"(^Cmnd_Alias)+\s+(\S+)+\s*\={1}\s*((\S+,{1}\s*)+\S+|\S+)\s*(\:)*(.*)*$" + ) + host_alias_re = re.compile( + r"(^Host_Alias)+\s+(\S+)+\s*\={1}\s*((\S+,{1}\s*)+\S+|\S+)\s*(\:)*(.*)*$" + ) + runas_alias_re = re.compile( + r"(^Runas_Alias)+\s+(\S+)+\s*\={1}\s*((\S+,{1}\s*)+\S+|\S+)\s*(\:)*(.*)*$" + ) + user_alias_re = re.compile( + r"(^User_Alias)+\s+(\S+)+\s*\={1}\s*((\S+,{1}\s*)+\S+|\S+)\s*(\:)*(.*)*$" + ) + + # Defaults Parsing vars + config_defaults = list() + env_keep_opts = list() + + # Get includes from file + includes = get_includes(path) + # if we have included files add them to the list + try: + sudoer_file["include_files"] = includes["include_files"] + except KeyError: # ignore keyerror + pass + try: + sudoer_file["include_directories"] = includes["include_directories"] + except KeyError: # ignore keyerror + pass + # Work on each line of sudoers file + for sline in all_lines.strip().split("\n"): + line = sline.replace("\n", "").replace( + "\t", " " + ) # cleaning up chars we don't want + # only output raw config lines if we ask for them + if params["output_raw_configs"]: + # All raw (non-comment) config lines out + if comment_re.search(line) is None and line != "" and line is not None: + config_lines.append(line) + if include_re.search(line): + config_lines.append(line) + + # only output parsed configs if we ask for them + if params["output_parsed_configs"]: + # Parser for defaults + if defaults_re.search(line): + defaults_config_line = defaults_re.search(line).group(2) + defaults_env_keep_re = re.compile( + r"^(env_keep)+((\s\=)|(\s\+\=))+(\s)+(.*$)" + ) + defaults_sec_path_re = re.compile( + r"^(secure_path)+(\s)+(\=)+(\s)+(.*$)" + ) + # Break up multi-line defaults config lines into single config options + if defaults_env_keep_re.search(defaults_config_line): + defaults_multi = ( + defaults_env_keep_re.search(defaults_config_line) + .group(6) + .split() + ) + # env_keep default options + for i in defaults_multi: + env_keep_opts.append(i.replace('"', "")) + # build secure path dict and append to defaults list + elif defaults_sec_path_re.search(defaults_config_line): + secure_paths = ( + defaults_sec_path_re.search(defaults_config_line) + .group(5) + .split(":") + ) + config_defaults.append({"secure_path": secure_paths}) + # single defaults option case + else: + config_defaults.append(defaults_config_line) + # Aliases: + # Parser for Command Alias + if cmnd_alias_re.search(line): + if cmnd_alias_re.search(line).group(5) == ":": + # We have a multi line alias + cmnd_multi_line_aliases = line.split(":") + # Process each alias + ca_multi_re = re.compile( + r"(^Cmnd_Alias)*\s*(\S+)+\s*\={1}\s*((\S+,{1}\s*)+\S+|\S+).*$" + ) + for ca in cmnd_multi_line_aliases: + ca_fields = ca_multi_re.search(ca) + cmnds_name = ca_fields.group(2) + ca_cmnds = list() + ca_cmnds_split = ca_fields.group(3).split(",") + for cmnd in ca_cmnds_split: + ca_cmnds.append(cmnd.lstrip()) + cmnd_alias_formatted = { + "name": cmnds_name, + "commands": ca_cmnds, + } + command_aliases.append(cmnd_alias_formatted) + else: + command_name = cmnd_alias_re.search(line).group(2) + commands = list() + for i in cmnd_alias_re.search(line).group(3).split(","): + # Append a space free item to the list + commands.append(i.replace(" ", "")) + # Build command alias dict + cmnd_alias_formatted = {"name": command_name, "commands": commands} + command_aliases.append(cmnd_alias_formatted) + + # Parser for Host Alias + if host_alias_re.search(line): + if host_alias_re.search(line).group(5) == ":": + # We have a multi line alias + host_multi_line_aliases = line.split(":") + # Process each alias + ha_multi_re = re.compile( + r"(^Host_Alias)*\s*(\S+)+\s*\={1}\s*((\S+,{1}\s*)+\S+|\S+).*$" + ) + for ha in host_multi_line_aliases: + ha_fields = ha_multi_re.search(ha) + hosts_name = ha_fields.group(2) + ha_hosts = list() + ha_hosts_split = ha_fields.group(3).split(",") + for host in ha_hosts_split: + ha_hosts.append(host.lstrip()) + host_alias_formatted = {"name": hosts_name, "hosts": ha_hosts} + host_aliases.append(host_alias_formatted) + else: + host_name = host_alias_re.search(line).group(2) + hosts = list() + for i in host_alias_re.search(line).group(3).split(","): + # Append a space free item to the list + hosts.append(i.replace(" ", "")) + # Build command alias dict + host_alias_formatted = {"name": host_name, "hosts": hosts} + host_aliases.append(host_alias_formatted) + + # Parser for RunAs Alias + if runas_alias_re.search(line): + if runas_alias_re.search(line).group(5) == ":": + # We have a multi line alias + runas_multi_line_aliases = line.split(":") + # Process each alias + ra_multi_re = re.compile( + r""" + (^Runas_Alias)* # Optionally match 'Runas_Alias' at the start of the line + \s* # Optional whitespace + (\S+)+ # One or more non-whitespace characters (alias name) + \s* # Optional whitespace + \={1} # Exactly one equals sign + \s* # Optional whitespace + ( # Start of the group for the right-hand side of the equals sign + (\S+,{1}\s*)+ # One or more sequences of non-whitespace characters followed by a comma and optional whitespace + \S+ # Followed by one or more non-whitespace characters + |\S+ # Or just a single non-whitespace string + ) # End of the group + .*$ # Match the rest of the line + """, + re.VERBOSE, + ) + for ra in runas_multi_line_aliases: + ra_fields = ra_multi_re.search(ra) + runas_name = ra_fields.group(2) + ra_users = list() + ra_users_split = ra_fields.group(3).split(",") + for user in ra_users_split: + ra_users.append(user.lstrip()) + runas_alias_formatted = {"name": runas_name, "users": ra_users} + runas_aliases.append(runas_alias_formatted) + else: + runas_name = runas_alias_re.search(line).group(2) + ra_users = list() + for i in runas_alias_re.search(line).group(3).split(","): + # Append a space free item to the list + ra_users.append(i.replace(" ", "")) + # Build command alias dict + runas_alias_formatted = {"name": runas_name, "users": ra_users} + runas_aliases.append(runas_alias_formatted) + + # Parser for User Alias + if user_alias_re.search(line): + if user_alias_re.search(line).group(5) == ":": + # We have a multi line alias + user_multi_line_aliases = line.split(":") + # Process each alias + ua_multi_re = re.compile( + r""" + (^User_Alias)* # Optionally match 'User_Alias' at the beginning + \s* # Optional whitespace + (\S+)+ # One or more non-whitespace characters (alias name) + \s* # Optional whitespace + \={1} # Exactly one equals sign + \s* # Optional whitespace + ((\S+,{1}\s*)+\S+|\S+) # One or more non-whitespace characters separated by commas, or a single non-whitespace string + .*$ # Match the rest of the line + """, + re.VERBOSE, + ) + for ua in user_multi_line_aliases: + ua_fields = ua_multi_re.search(ua) + users_name = ua_fields.group(2) + ua_users = list() + ua_users_split = ua_fields.group(3).split(",") + for user in ua_users_split: + ua_users.append(user.lstrip()) + user_alias_formatted = {"name": users_name, "users": ua_users} + user_aliases.append(user_alias_formatted) + else: + users_name = user_alias_re.search(line).group(2) + ua_users = list() + for i in user_alias_re.search(line).group(3).split(","): + # Append a space free item to the list + ua_users.append(i.lstrip()) + # Build command alias dict + user_alias_formatted = {"name": users_name, "users": ua_users} + user_aliases.append(user_alias_formatted) + + # Parser for user_specs + if ( + not user_alias_re.search(line) + and not runas_alias_re.search(line) + and not host_alias_re.search(line) + and not cmnd_alias_re.search(line) + and not include_re.search(line) + and not comment_re.search(line) + and not defaults_re.search(line) + and line != "" + and line is not None + ): + user_spec = get_user_specs(line) + user_specifications.append(user_spec) + # Build the sudoer file's dict output + sudoer_file["path"] = path + + # only output raw configs if we ask for it + if params["output_raw_configs"]: + sudoer_file["configuration"] = config_lines + + if params["output_parsed_configs"]: + # Build defaults env_keep dict and append to the rest of the config_defaults list + if env_keep_opts: + config_defaults.append({"env_keep": env_keep_opts}) + if config_defaults: + sudoer_file["defaults"] = config_defaults + # Build aliases output dictionary + sudoer_aliases = { + "user_alias": user_aliases, + "runas_alias": runas_aliases, + "cmnd_alias": command_aliases, + "host_alias": host_aliases, + } + # cleanup unused outputs + if not sudoer_aliases["user_alias"]: + sudoer_aliases.pop("user_alias") + if not sudoer_aliases["runas_alias"]: + sudoer_aliases.pop("runas_alias") + if not sudoer_aliases["cmnd_alias"]: + sudoer_aliases.pop("cmnd_alias") + if not sudoer_aliases["host_alias"]: + sudoer_aliases.pop("host_alias") + if sudoer_aliases: + sudoer_file["aliases"] = sudoer_aliases + sudoer_file["user_specifications"] = user_specifications + + return sudoer_file + + +def get_user_specs(line): + user_spec = dict() + user_spec_re = re.compile( + r""" + (^\S+,{1}\s*\S+|^\S+) # First part of the string (non-whitespace characters) + \s* # Optional whitespace + (\S+,{1}\s*|\S+){1} # Second part (comma-separated or not) + \s*={1}\s* # Equals sign, with optional surrounding whitespace + (\({1}(.*)\){1})* # Optional parentheses around content + \s* # Optional whitespace + (ROLE\s*=\s*(\S+)|TYPE\s*=\s*(\S+))* # Optional ROLE or TYPE with values + \s* # Optional whitespace + (ROLE\s*=\s*(\S+)|TYPE\s*=\s*(\S+))* # Optional ROLE or TYPE repeated + \s* # Optional whitespace + (PRIVS\s*=\s*(\S+)|LIMITPRIVS\s*=\s*(\S+))* # Optional PRIVS or LIMITPRIVS with values + \s* # Optional whitespace + (PRIVS\s*=\s*(\S+)|LIMITPRIVS\s*=\s*(\S+))* # Optional PRIVS or LIMITPRIVS repeated + \s* # Optional whitespace + (\S+:{1})* # Optional section with colon + \s*(.*$) # Capture the rest of the line + """, + re.VERBOSE, + ) + default_override_re = re.compile( + r""" + (Defaults) # Match the word 'Defaults' + {1} # Ensure it's matched exactly once + ([@:!>]){1} # Match one of the special characters: @, :, !, or > + ((\s*\S+,{1})+ # Match one or more non-whitespace characters followed by a comma + \s*\S+|\S+) # Optionally match more non-whitespace characters, or a single non-whitespace string + \s* # Optional whitespace + (.*$) # Match the rest of the line + """, + re.VERBOSE, + ) + spec_fields = user_spec_re.search(line) + if user_spec_re.search(line): + user_spec["users"] = list() + user_spec["hosts"] = list() + user_spec["operators"] = list() + user_spec["selinux_role"] = "" + user_spec["selinux_type"] = "" + user_spec["solaris_privs"] = "" + user_spec["solaris_limitprivs"] = "" + user_spec["tags"] = list() + user_spec["commands"] = list() + # users + users = spec_fields.group(1).split(",") + for user in users: + if user != "" and user is not None: + user_spec["users"].append(user.lstrip()) + # hosts + hosts = spec_fields.group(2).split(",") + for host in hosts: + if host != "" and host is not None: + user_spec["hosts"].append(host.lstrip()) + # operators - optional + if spec_fields.group(4): + operators = spec_fields.group(4).split(",") + for op in operators: + if op != "" and op is not None: + user_spec["operators"].append(op.lstrip()) + # SELinux - optional + if spec_fields.group(5) or spec_fields.group(8): + # TYPE + type_re = re.compile(r"(^TYPE){1}\s*={1}\s*") + if spec_fields.group(5): + if type_re.search(spec_fields.group(5)): + if type_re.search(spec_fields.group(5)).group(1) == "TYPE": + user_spec["selinux_type"] = spec_fields.group(7) + if spec_fields.group(8): + if type_re.search(spec_fields.group(8)): + if type_re.search(spec_fields.group(8)).group(1) == "TYPE": + user_spec["selinux_type"] = spec_fields.group(10) + # ROLE + role_re = re.compile(r"(^ROLE){1}\s*={1}\s*") + if spec_fields.group(5): + if role_re.search(spec_fields.group(5)): + if role_re.search(spec_fields.group(5)).group(1) == "ROLE": + user_spec["selinux_role"] = spec_fields.group(6) + if spec_fields.group(8): + if role_re.search(spec_fields.group(8)): + if role_re.search(spec_fields.group(8)).group(1) == "ROLE": + user_spec["selinux_role"] = spec_fields.group(9) + # Solaris - optional + if spec_fields.group(11) or spec_fields.group(14): + # PRIVS + privs_re = re.compile(r"(^PRIVS){1}\s*={1}\s*") + if spec_fields.group(11): + if privs_re.search(spec_fields.group(11)): + if privs_re.search(spec_fields.group(11)).group(1) == "PRIVS": + user_spec["solaris_privs"] = spec_fields.group(12) + if spec_fields.group(14): + if privs_re.search(spec_fields.group(14)): + if privs_re.search(spec_fields.group(14)).group(1) == "PRIVS": + user_spec["solaris_privs"] = spec_fields.group(17) + # LIMITPRIVS + limitprivs_re = re.compile(r"(^LIMITPRIVS){1}\s*={1}\s*") + if spec_fields.group(11): + if limitprivs_re.search(spec_fields.group(11)): + if ( + limitprivs_re.search(spec_fields.group(11)).group(1) + == "LIMITPRIVS" + ): + user_spec["solaris_limitprivs"] = spec_fields.group(13) + if spec_fields.group(14): + if limitprivs_re.search(spec_fields.group(14)): + if ( + limitprivs_re.search(spec_fields.group(14)).group(1) + == "LIMITPRIVS" + ): + user_spec["solaris_limitprivs"] = spec_fields.group(16) + # tags - optional + if spec_fields.group(17): + tags = spec_fields.group(17).split(":") + for tag in tags: + if tag != "" and tag is not None: + user_spec["tags"].append(tag) + # commands + commands = spec_fields.group(18).split(",") + for command in commands: + if command != "" and command is not None: + user_spec["commands"].append(command.lstrip()) + # Cleanup unused output + if user_spec["selinux_role"] == "": + user_spec.pop("selinux_role") + if user_spec["selinux_type"] == "": + user_spec.pop("selinux_type") + if user_spec["solaris_privs"] == "": + user_spec.pop("solaris_privs") + if user_spec["solaris_limitprivs"] == "": + user_spec.pop("solaris_limitprivs") + if not user_spec["users"]: + user_spec.pop("users") + if not user_spec["hosts"]: + user_spec.pop("hosts") + if not user_spec["operators"]: + user_spec.pop("operators") + if not user_spec["tags"]: + user_spec.pop("tags") + if not user_spec["commands"]: + user_spec.pop("commands") + else: + if default_override_re.search(line): + default_override = default_override_re.search(line) + # type + if default_override.group(2) == "@": + user_spec["type"] = "host" + user_spec["hosts"] = list() + hosts = default_override.group(3).split(",") + for host in hosts: + if host != "" and host is not None: + user_spec["hosts"].append(host.lstrip()) + elif default_override.group(2) == ":": + user_spec["type"] = "user" + user_spec["users"] = list() + users = default_override.group(3).split(",") + for user in users: + if user != "" and user is not None: + user_spec["users"].append(user.lstrip()) + elif default_override.group(2) == "!": + user_spec["type"] = "command" + user_spec["commands"] = list() + commands = default_override.group(3).split(",") + for command in commands: + if command != "" and command is not None: + user_spec["commands"].append(command.lstrip()) + elif default_override.group(2) == ">": + user_spec["type"] = "runas" + user_spec["operators"] = list() + operators = default_override.group(3).split(",") + for op in operators: + if op != "" and op is not None: + user_spec["operators"].append(op.lstrip()) + user_spec["defaults"] = list() + defaults = default_override.group(5).split(",") + for default in defaults: + if default != "" and default is not None: + user_spec["defaults"].append(default.lstrip()) + return user_spec + + +def get_sudoers_configs(path, params): + sudoers = dict() + include_files = list() + + default_sudoers = path + + # Get parsed values from default sudoers file + sudoers["sudoers_files"] = list() + default = get_config_lines(path, params) + if default: + sudoers["sudoers_files"].append(default) + try: + include_files += default["include_files"] + except KeyError: # ignore keyerror + pass + # Capture each included sudoer file + for file in include_files: + include_file = get_config_lines(file, params) + if include_file: + sudoers["sudoers_files"].append(include_file) + # append even more included files as we parse deeper + try: + include_files += include_file["include_files"] + except KeyError: # ignore keyerror + pass + # return back everything that was included off of the default sudoers file + include_files.append(default_sudoers) + sudoers["all_scanned_files"] = include_files + return sudoers + + +def main(): + default_sudoers = "/etc/sudoers" + + module_args = dict( + output_raw_configs=dict(type="bool", default=True, required=False), + output_parsed_configs=dict(type="bool", default=True, required=False), + ) + + result = dict(changed=False, original_message="", message="") + + module = AnsibleModule(argument_spec=module_args, supports_check_mode=True) + params = module.params + + sudoers = get_sudoers_configs(default_sudoers, params) + result = {"ansible_facts": {"sudoers": sudoers}} + + module.exit_json(**result) + + +if __name__ == "__main__": + main() diff --git a/meta/main.yml b/meta/main.yml index c8cebd3..201bfdf 100644 --- a/meta/main.yml +++ b/meta/main.yml @@ -12,8 +12,7 @@ galaxy_info: - all - name: EL versions: - - "8" - - "9" + - all galaxy_tags: - el8 - el9 diff --git a/pylint_extra_requirements.txt b/pylint_extra_requirements.txt new file mode 100644 index 0000000..8d280fc --- /dev/null +++ b/pylint_extra_requirements.txt @@ -0,0 +1,3 @@ +# SPDX-License-Identifier: MIT + +# Write extra requirements for running pylint here: diff --git a/pytest_extra_requirements.txt b/pytest_extra_requirements.txt new file mode 100644 index 0000000..8e11eaa --- /dev/null +++ b/pytest_extra_requirements.txt @@ -0,0 +1,7 @@ +# SPDX-License-Identifier: MIT + +# Write extra requirements for running pytest here: +# If you need ansible then uncomment the following line: +-ransible_pytest_extra_requirements.txt +# If you need mock then uncomment the following line: +mock ; python_version < "3.0" diff --git a/tasks/main.yml b/tasks/main.yml index 1cfffff..74c1c0a 100644 --- a/tasks/main.yml +++ b/tasks/main.yml @@ -2,6 +2,26 @@ - name: Set version specific variables include_tasks: set_vars.yml +- name: Scan sudoers + scan_sudoers: + output_parsed_configs: true + output_raw_configs: false + no_log: true + when: sudo_check_if_configured | bool + +- name: Compare + set_fact: + comparison_result: "{{ ansible_facts['sudoers'].sudoers_files + | combine({'include_files': omit}) + == sudo_sudoers_files }}" + when: sudo_check_if_configured | bool + +- name: Sudo is already configured + ansible.builtin.meta: end_host + when: + - (comparison_result | default(false)) | bool + - sudo_check_if_configured | bool + - name: Ensure sudo is installed package: name: "{{ __sudo_packages }}" diff --git a/tests/files/tests_default_sudoers.in b/tests/files/tests_default_sudoers.in new file mode 100644 index 0000000..2bab6ba --- /dev/null +++ b/tests/files/tests_default_sudoers.in @@ -0,0 +1,21 @@ +# Default specifications +Defaults !visiblepw +Defaults always_set_home +Defaults match_group_by_gid +Defaults always_query_group_plugin +Defaults env_reset +Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/bin +Defaults env_keep = "COLORS DISPLAY HOSTNAME HISTSIZE KDEDIR" +Defaults env_keep += "LS_COLORS MAIL PS1 PS2 QTDIR" +Defaults env_keep += "USERNAME LANG LC_ADDRESS LC_CTYPE LC_COLLATE" +Defaults env_keep += "LC_IDENTIFICATION LC_MEASUREMENT LC_MESSAGES LC_MONETARY LC_NAME" +Defaults env_keep += "LC_NUMERIC LC_PAPER LC_TELEPHONE LC_TIME LC_ALL" +Defaults env_keep += "LANGUAGE LINGUAS _XKB_CHARSET XAUTHORITY" + +# User specifications +root ALL=(ALL) ALL +%wheel ALL=(ALL) ALL + +# Includes +## Include directories +#includedir /etc/sudoers.d diff --git a/tests/tasks/check_not_present_header.yml b/tests/tasks/check_not_present_header.yml new file mode 100644 index 0000000..3362e18 --- /dev/null +++ b/tests/tasks/check_not_present_header.yml @@ -0,0 +1,16 @@ +# SPDX-License-Identifier: MIT +--- +- name: Get file + slurp: + path: "{{ __file }}" + register: __content + when: not __file_content is defined + +- name: Check for presence of ansible managed header, fingerprint + assert: + that: + - ansible_managed not in content + - __fingerprint not in content + vars: + content: "{{ (__file_content | d(__content)).content | b64decode }}" + ansible_managed: "{{ lookup('template', 'get_ansible_managed.j2') }}" diff --git a/tests/tasks/check_header.yml b/tests/tasks/check_present_header.yml similarity index 100% rename from tests/tasks/check_header.yml rename to tests/tasks/check_present_header.yml diff --git a/tests/tests_check_if_configured.yml b/tests/tests_check_if_configured.yml new file mode 100644 index 0000000..204746e --- /dev/null +++ b/tests/tests_check_if_configured.yml @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: MIT +--- +- name: Ensure that the role runs with default parameters + hosts: all + gather_facts: false # test that role works in this case + tasks: + - name: Run tests + block: + - name: Test setup + include_tasks: tasks/setup.yml + + + - name: Install new sudoers file + ansible.builtin.copy: + src: files/tests_default_sudoers.in # Local file path + dest: /etc/sudoers # Target file path on the remote system + owner: root + group: root + mode: '0440' # Correct permission for the sudoers file + + + - name: Remove all files from sudoers.d directory + ansible.builtin.file: + path: /etc/sudoers.d + state: absent # Recursively remove the entire directory + + + - name: Run the role + include_role: + name: linux-system-roles.sudo + vars: + sudo_check_if_configured: true + + - name: Check header for ansible_managed, fingerprint + include_tasks: tasks/check_not_present_header.yml + vars: + __file: /etc/sudoers + __fingerprint: system_role:sudo + + always: + - name: Test cleanup + include_tasks: tasks/cleanup.yml diff --git a/tests/tests_default.yml b/tests/tests_default.yml index c52706e..ffccc05 100644 --- a/tests/tests_default.yml +++ b/tests/tests_default.yml @@ -12,9 +12,11 @@ - name: Run the role include_role: name: linux-system-roles.sudo + vars: + sudo_check_if_configured: false - name: Check header for ansible_managed, fingerprint - include_tasks: tasks/check_header.yml + include_tasks: tasks/check_present_header.yml vars: __file: /etc/sudoers __fingerprint: system_role:sudo diff --git a/tests/unit/test_scan_sudoers.py b/tests/unit/test_scan_sudoers.py new file mode 100644 index 0000000..09f22b8 --- /dev/null +++ b/tests/unit/test_scan_sudoers.py @@ -0,0 +1,500 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2024, Radovan Sroka +# SPDX-License-Identifier: GPL-2.0-or-later +# +""" Unit tests for the scan_sudoers module """ + + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import sys +import unittest + +try: + from unittest.mock import patch, mock_open +except ImportError: + from mock import patch, mock_open + + +from scan_sudoers import get_includes, get_user_specs, get_config_lines + +from pprint import pprint + +if sys.version_info[0] < 3: + open_path = "__builtin__.open" # Python 2.7 +else: + open_path = "builtins.open" # Python 3+ + + +class TestScanSudoers(unittest.TestCase): + + @patch("scan_sudoers.os.listdir") + @patch("scan_sudoers.isfile") + @patch( + open_path, + new_callable=mock_open, + read_data="#includedir /etc/sudoers.d\n", + ) + def test_get_includes01(self, mock_open, mock_isfile, mock_listdir): + # Arrange + mock_isfile.return_value = True + mock_listdir.return_value = ["file1", "file2"] + expected_output = { + "include_directories": ["/etc/sudoers.d"], + "include_files": ["/etc/sudoers.d/file1", "/etc/sudoers.d/file2"], + } + + # Act + result = get_includes("/etc/sudoers") + + print(result) + # Assert + self.assertEqual(result, expected_output) + mock_open.assert_called_once_with("/etc/sudoers", "r") + mock_isfile.assert_called() + mock_listdir.assert_called_once_with("/etc/sudoers.d") + + @patch("scan_sudoers.os.listdir") + @patch("scan_sudoers.isfile") + @patch( + open_path, + new_callable=mock_open, + read_data="#include /etc/sudoers.d/file1\n#include /etc/sudoers.d/file2\n", + ) + def test_get_includes02(self, mock_open, mock_isfile, mock_listdir): + # Arrange + mock_isfile.return_value = True + mock_listdir.return_value = ["file1", "file2"] + expected_output = { + "include_files": ["/etc/sudoers.d/file1", "/etc/sudoers.d/file2"] + } + + # Act + result = get_includes("/etc/sudoers2") + + print(result) + # Assert + self.assertEqual(result, expected_output) + mock_open.assert_called_once_with("/etc/sudoers2", "r") + + def test_get_user_specs03(self): + # Arrange + expected_output = { + "users": ["%wheel"], + "hosts": ["ALL"], + "operators": ["ALL"], + "tags": ["NOPASSWD"], + "commands": ["ALL"], + } + + # Act + result = get_user_specs("%wheel ALL=(ALL) NOPASSWD: ALL") + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs04(self): + # Arrange + expected_output = { + "users": ["username1"], + "hosts": ["ALL"], + "operators": ["ALL"], + "commands": ["ALL"], + } + + # Act + result = get_user_specs("username1 ALL=(ALL) ALL") + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs05(self): + # Arrange + expected_output = { + "users": ["username2"], + "hosts": ["hostname"], + "operators": ["runas_user:runas_group"], + "commands": ["ALL"], + } + + # Act + result = get_user_specs("username2 hostname=(runas_user:runas_group) ALL") + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs06(self): + # Arrange + expected_output = { + "users": ["john"], + "hosts": ["ALL"], + "operators": ["ALL:ALL"], + "commands": ["ALL"], + } + + # Act + result = get_user_specs("john ALL=(ALL:ALL) ALL") + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs07(self): + # Arrange + expected_output = { + "users": ["jane"], + "hosts": ["ALL"], + "operators": ["ALL:ALL"], + "tags": ["NOPASSWD"], + "commands": ["ALL"], + } + + # Act + result = get_user_specs("jane ALL=(ALL:ALL) NOPASSWD: ALL") + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs08(self): + # Arrange + expected_output = { + "users": ["developer"], + "hosts": ["ALL"], + "operators": ["root:ALL"], + "commands": ["/usr/bin/systemctl", "/usr/bin/journalctl"], + } + + # Act + result = get_user_specs( + "developer ALL=(root:ALL) /usr/bin/systemctl, /usr/bin/journalctl" + ) + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs09(self): + # Arrange + expected_output = { + "users": ["%admins"], + "hosts": ["ALL"], + "operators": ["ALL:ALL"], + "commands": ["ALL"], + } + + # Act + result = get_user_specs("%admins ALL=(ALL:ALL) ALL") + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs10(self): + # Arrange + expected_output = { + "users": ["deploy"], + "hosts": ["ALL"], + "operators": ["root:ALL"], + "tags": ["NOPASSWD"], + "commands": ["/usr/sbin/service apache2 restart"], + } + + # Act + result = get_user_specs( + "deploy ALL=(root:ALL) NOPASSWD: /usr/sbin/service apache2 restart" + ) + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs11(self): + # Arrange + expected_output = { + "users": ["backup"], + "hosts": ["ALL"], + "operators": ["root:ALL"], + "commands": ["/bin/tar", "/usr/bin/rsync"], + } + + # Act + result = get_user_specs("backup ALL=(root:ALL) /bin/tar, /usr/bin/rsync") + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs12(self): + # Arrange + expected_output = { + "users": ["sysadmin"], + "hosts": ["ALL"], + "operators": ["ALL:ALL"], + "commands": ["/usr/sbin/ifconfig", "/sbin/reboot", "/sbin/shutdown"], + } + + # Act + result = get_user_specs( + "sysadmin ALL=(ALL:ALL) /usr/sbin/ifconfig, /sbin/reboot, /sbin/shutdown" + ) + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs13(self): + # Arrange + expected_output = { + "users": ["dbadmin"], + "hosts": ["ALL"], + "operators": ["root:ALL"], + "tags": ["NOPASSWD"], + "commands": ["/usr/sbin/service mysql restart", "/usr/bin/mysqladmin"], + } + + # Act + result = get_user_specs( + "dbadmin ALL=(root:ALL) NOPASSWD: /usr/sbin/service mysql restart, /usr/bin/mysqladmin" + ) + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs14(self): + # Arrange + expected_output = { + "users": ["audit"], + "hosts": ["ALL"], + "operators": ["root:ALL"], + "commands": ["/usr/bin/journalctl", "/bin/dmesg", "/usr/bin/uptime"], + } + + # Act + result = get_user_specs( + "audit ALL=(root:ALL) /usr/bin/journalctl, /bin/dmesg, /usr/bin/uptime" + ) + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + def test_get_user_specs15(self): + # Arrange + expected_output = { + "users": ["%devops"], + "hosts": ["ALL"], + "operators": ["root:ALL"], + "commands": [ + "/usr/bin/docker ps", + "/usr/bin/docker exec", + "/usr/bin/docker logs", + ], + } + + # Act + result = get_user_specs( + "%devops ALL=(root:ALL) /usr/bin/docker ps, /usr/bin/docker exec, /usr/bin/docker logs" + ) + print(result) + print(expected_output) + + # Assert + self.assertEqual(result, expected_output) + + @patch( + open_path, + new_callable=mock_open, + read_data=""" +#includedir /etc/sudoers.d +Cmnd_Alias MY_CMDS = /bin/ls, /bin/cat +Defaults env_keep += \"COLORS DISPLAY\" +%wheel ALL=(ALL) NOPASSWD: ALL +""", + ) + @patch( + "scan_sudoers.get_includes", + return_value={"include_files": ["/etc/sudoers.d/file1"]}, + ) + def test_get_config_lines01(self, mock_open, mock_get_includes): + # Arrange + + params = {"output_raw_configs": True, "output_parsed_configs": True} + expected_output = { + "path": "/etc/sudoers10", + "configuration": [ + "#includedir /etc/sudoers.d", + "Cmnd_Alias MY_CMDS = /bin/ls, /bin/cat", + 'Defaults env_keep += "COLORS DISPLAY"', + "%wheel ALL=(ALL) NOPASSWD: ALL", + ], + "include_files": ["/etc/sudoers.d/file1"], + "aliases": { + "cmnd_alias": [{"commands": ["/bin/ls", "/bin/cat"], "name": "MY_CMDS"}] + }, + "defaults": [{"env_keep": ["COLORS", "DISPLAY"]}], + "user_specifications": [ + { + "commands": ["ALL"], + "hosts": ["ALL"], + "operators": ["ALL"], + "tags": ["NOPASSWD"], + "users": ["%wheel"], + } + ], + } + # Act + result = get_config_lines("/etc/sudoers10", params) + pprint(result) + pprint(expected_output) + + # Assert + self.assertEqual(result["configuration"], expected_output["configuration"]) + self.assertEqual(result["include_files"], expected_output["include_files"]) + self.assertEqual(result, expected_output) + + @patch( + open_path, + new_callable=mock_open, + read_data=""" +#includedir /etc/sudoers.d +Host_Alias WEBSERVERS = web1, web2, web3 +Host_Alias DB_SERVERS = db1, db2 +User_Alias ADMINS = john, jane, %sysadmins +User_Alias DEVS = deploy, dev, %developers +User_Alias DBAS = %db_team +Cmnd_Alias DB_CMDS = /usr/bin/mysql, /usr/bin/psql +Cmnd_Alias NETWORK_CMDS = /sbin/ifconfig, /sbin/ip +Cmnd_Alias SYSTEM_CMDS = /usr/bin/top, /usr/bin/htop, /usr/bin/uptime +ADMINS ALL=(ALL:ALL) ALL +DEVS WEBSERVERS=(root:ALL) NOPASSWD: APACHE_CMDS, NGINX_CMDS +DBAS DB_SERVERS=(root:ALL) NOPASSWD: DB_CMDS +netops ALL=(root:ALL) NOPASSWD: NETWORK_CMDS +monitor ALL=(ALL:ALL) SYSTEM_CMDS +""", + ) + @patch( + "scan_sudoers.get_includes", + return_value={"include_files": ["/etc/sudoers.d/file1"]}, + ) + def test_get_config_lines02(self, mock_open, mock_get_includes): + # Arrange + params = {"output_raw_configs": True, "output_parsed_configs": True} + expected_output = { + "path": "/etc/sudoers10", + "configuration": [ + "#includedir /etc/sudoers.d", + "Host_Alias WEBSERVERS = web1, web2, web3", + "Host_Alias DB_SERVERS = db1, db2", + "User_Alias ADMINS = john, jane, %sysadmins", + "User_Alias DEVS = deploy, dev, %developers", + "User_Alias DBAS = %db_team", + "Cmnd_Alias DB_CMDS = /usr/bin/mysql, /usr/bin/psql", + "Cmnd_Alias NETWORK_CMDS = /sbin/ifconfig, /sbin/ip", + "Cmnd_Alias SYSTEM_CMDS = /usr/bin/top, /usr/bin/htop, /usr/bin/uptime", + "ADMINS ALL=(ALL:ALL) ALL", + "DEVS WEBSERVERS=(root:ALL) NOPASSWD: APACHE_CMDS, NGINX_CMDS", + "DBAS DB_SERVERS=(root:ALL) NOPASSWD: DB_CMDS", + "netops ALL=(root:ALL) NOPASSWD: NETWORK_CMDS", + "monitor ALL=(ALL:ALL) SYSTEM_CMDS", + ], + "include_files": ["/etc/sudoers.d/file1"], + "aliases": { + "cmnd_alias": [ + { + "name": "DB_CMDS", + "commands": ["/usr/bin/mysql", "/usr/bin/psql"], + }, + { + "name": "NETWORK_CMDS", + "commands": ["/sbin/ifconfig", "/sbin/ip"], + }, + { + "name": "SYSTEM_CMDS", + "commands": [ + "/usr/bin/top", + "/usr/bin/htop", + "/usr/bin/uptime", + ], + }, + ], + "host_alias": [ + {"hosts": ["web1", "web2", "web3"], "name": "WEBSERVERS"}, + {"hosts": ["db1", "db2"], "name": "DB_SERVERS"}, + ], + "user_alias": [ + {"name": "ADMINS", "users": ["john", "jane", "%sysadmins"]}, + {"name": "DEVS", "users": ["deploy", "dev", "%developers"]}, + {"name": "DBAS", "users": ["%db_team"]}, + ], + }, + "user_specifications": [ + { + "users": ["ADMINS"], + "hosts": ["ALL"], + "operators": ["ALL:ALL"], + "commands": ["ALL"], + }, + { + "users": ["DEVS"], + "hosts": ["WEBSERVERS"], + "operators": ["root:ALL"], + "commands": ["APACHE_CMDS", "NGINX_CMDS"], + "tags": ["NOPASSWD"], + }, + { + "users": ["DBAS"], + "hosts": ["DB_SERVERS"], + "operators": ["root:ALL"], + "commands": ["DB_CMDS"], + "tags": ["NOPASSWD"], + }, + { + "users": ["netops"], + "hosts": ["ALL"], + "operators": ["root:ALL"], + "commands": ["NETWORK_CMDS"], + "tags": ["NOPASSWD"], + }, + { + "users": ["monitor"], + "hosts": ["ALL"], + "operators": ["ALL:ALL"], + "commands": ["SYSTEM_CMDS"], + }, + ], + } + # Act + result = get_config_lines("/etc/sudoers10", params) + # pprint(result) + # Assert + self.assertEqual(result["configuration"], expected_output["configuration"]) + self.assertEqual(result["include_files"], expected_output["include_files"]) + self.assertEqual( + result["user_specifications"], expected_output["user_specifications"] + ) + self.assertEqual(result["aliases"], expected_output["aliases"]) + # self.assertEqual(result, expected_output) + + +if __name__ == "__main__": + unittest.main()