diff --git a/fmf/__init__.py b/fmf/__init__.py index 12af8b8..5e574ba 100644 --- a/fmf/__init__.py +++ b/fmf/__init__.py @@ -11,6 +11,7 @@ __version__ = importlib.metadata.version("fmf") __all__ = [ + "__version__", "Context", "Tree", "filter", diff --git a/fmf/__main__.py b/fmf/__main__.py new file mode 100644 index 0000000..4e28416 --- /dev/null +++ b/fmf/__main__.py @@ -0,0 +1,3 @@ +from .cli import main + +main() diff --git a/fmf/cli.py b/fmf/cli.py index f5dfbf6..49442ec 100644 --- a/fmf/cli.py +++ b/fmf/cli.py @@ -16,205 +16,189 @@ of available options. """ -import argparse -import os -import os.path -import shlex -import sys +import functools +from pathlib import Path + +import click +from click_option_group import optgroup import fmf import fmf.utils as utils # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -# Parser +# Common option groups # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -class Parser: - """ Command line options parser """ - - def __init__(self, arguments=None, path=None): - """ Prepare the parser. """ - # Change current working directory (used for testing) - if path is not None: - os.chdir(path) - # Split command line if given as a string (used for testing) - if isinstance(arguments, str): - self.arguments = shlex.split(arguments) - # Otherwise use sys.argv - if arguments is None: - self.arguments = sys.argv - # Enable debugging output if requested - if "--debug" in self.arguments: - utils.log.setLevel(utils.LOG_DEBUG) - # Show current version and exit - if "--version" in self.arguments: - self.output = f"{fmf.__version__}" - print(self.output) - return - - # Handle subcommands (mapped to format_* methods) - self.parser = argparse.ArgumentParser( - usage="fmf command [options]\n" + __doc__) - self.parser.add_argument( - "--version", action="store_true", - help="print fmf version with commit hash and exit") - self.parser.add_argument('command', help='Command to run') - self.command = self.parser.parse_args(self.arguments[1:2]).command - if not hasattr(self, "command_" + self.command): - self.parser.print_help() - raise utils.GeneralError( - "Unrecognized command: '{0}'".format(self.command)) - # Initialize the rest and run the subcommand - self.output = "" - getattr(self, "command_" + self.command)() - - def options_select(self): - """ Select by name, filter """ - group = self.parser.add_argument_group("Select") - group.add_argument( - "--key", dest="keys", action="append", default=[], - help="Key content definition (required attributes)") - group.add_argument( - "--name", dest="names", action="append", default=[], - help="List objects with name matching regular expression") - group.add_argument( - "--source", dest="sources", action="append", default=[], - help="List objects defined in specified source files") - group.add_argument( - "--filter", dest="filters", action="append", default=[], - help="Apply advanced filter (see 'pydoc fmf.filter')") - group.add_argument( - "--condition", dest="conditions", action="append", default=[], - metavar="EXPR", - help="Use arbitrary Python expression for filtering") - group.add_argument( - "--whole", dest="whole", action="store_true", - help="Consider the whole tree (leaves only by default)") - - def options_formatting(self): - """ Formating options """ - group = self.parser.add_argument_group("Format") - group.add_argument( - "--format", dest="formatting", default=None, - help="Custom output format using the {} expansion") - group.add_argument( - "--value", dest="values", action="append", default=[], - help="Values for the custom formatting string") - - def options_utils(self): - """ Utilities """ - group = self.parser.add_argument_group("Utils") - group.add_argument( - "--path", action="append", dest="paths", - help="Path to the metadata tree (default: current directory)") - group.add_argument( - "--verbose", action="store_true", - help="Print information about parsed files to stderr") - group.add_argument( - "--debug", action="store_true", - help="Turn on debugging output, do not catch exceptions") - - def command_ls(self): - """ List names """ - self.parser = argparse.ArgumentParser( - description="List names of available objects") - self.options_select() - self.options_utils() - self.options = self.parser.parse_args(self.arguments[2:]) - self.show(brief=True) - - def command_clean(self): - """ Clean cache """ - self.parser = argparse.ArgumentParser( - description="Remove cache directory and its content") - self.clean() - - def command_show(self): - """ Show metadata """ - self.parser = argparse.ArgumentParser( - description="Show metadata of available objects") - self.options_select() - self.options_formatting() - self.options_utils() - self.options = self.parser.parse_args(self.arguments[2:]) - self.show(brief=False) - - def command_init(self): - """ Initialize tree """ - self.parser = argparse.ArgumentParser( - description="Initialize a new metadata tree") - self.options_utils() - self.options = self.parser.parse_args(self.arguments[2:]) - # For each path create an .fmf directory and version file - for path in self.options.paths or ["."]: - root = fmf.Tree.init(path) - print("Metadata tree '{0}' successfully initialized.".format(root)) - - def show(self, brief=False): - """ Show metadata for each path given """ - output = [] - for path in self.options.paths or ["."]: - if self.options.verbose: - utils.info("Checking {0} for metadata.".format(path)) - tree = fmf.Tree(path) - for node in tree.prune( - self.options.whole, - self.options.keys, - self.options.names, - self.options.filters, - self.options.conditions, - self.options.sources): - if brief: - show = node.show(brief=True) - else: - show = node.show( - brief=False, - formatting=self.options.formatting, - values=self.options.values) - # List source files when in debug mode - if self.options.debug: - for source in node.sources: - show += utils.color("{0}\n".format(source), "blue") - if show is not None: - output.append(show) - - # Print output and summary - if brief or self.options.formatting: - joined = "".join(output) - else: - joined = "\n".join(output) - print(joined, end="") - if self.options.verbose: - utils.info("Found {0}.".format( - utils.listed(len(output), "object"))) - self.output = joined - - def clean(self): - """ Remove cache directory """ - try: - cache = utils.get_cache_directory(create=False) - utils.clean_cache_directory() - print("Cache directory '{0}' has been removed.".format(cache)) - except Exception as error: # pragma: no cover - utils.log.error( - "Unable to remove cache, exception was: {0}".format(error)) +def _select_options(func): + """Select group options""" + + @optgroup.group("Select") + @optgroup.option("--key", "keys", metavar="KEY", default=[], multiple=True, + help="Key content definition (required attributes)") + @optgroup.option("--name", "names", metavar="NAME", default=[], multiple=True, + help="List objects with name matching regular expression") + @optgroup.option("--source", "sources", metavar="SOURCE", default=[], multiple=True, + help="List objects defined in specified source files") + @optgroup.option("--filter", "filters", metavar="FILTER", default=[], multiple=True, + help="Apply advanced filter (see 'pydoc fmf.filter')") + @optgroup.option("--condition", "conditions", metavar="EXPR", default=[], multiple=True, + help="Use arbitrary Python expression for filtering") + @optgroup.option("--whole", is_flag=True, default=False, + help="Consider the whole tree (leaves only by default)") + @functools.wraps(func) + def wrapper(*args, **kwargs): + # Hack to group the options into one variable + select = { + opt: kwargs.pop(opt) + for opt in ("keys", "names", "sources", "filters", "conditions", "whole") + } + return func(*args, select=select, **kwargs) + + return wrapper + + +def _format_options(func): + """Formating group options""" + + @optgroup.group("Format") + @optgroup.option("--format", "formatting", metavar="FORMAT", default=None, + help="Custom output format using the {} expansion") + @optgroup.option("--value", "values", metavar="VALUE", default=[], multiple=True, + help="Values for the custom formatting string") + @functools.wraps(func) + def wrapper(*args, **kwargs): + # Hack to group the options into one variable + format = { + opt: kwargs.pop(opt) + for opt in ("formatting", "values") + } + return func(*args, format=format, **kwargs) + + return wrapper + + +def _utils_options(func): + """Utilities group options""" + + @optgroup.group("Utils") + @optgroup.option("--path", "paths", metavar="PATH", multiple=True, + type=Path, default=["."], + show_default="current directory", + help="Path to the metadata tree") + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Main # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -def main(arguments=None, path=None): - """ Parse options, do what is requested """ - parser = Parser(arguments, path) - return parser.output +class CatchAllExceptions(click.Group): + def __call__(self, *args, **kwargs): + # TODO: This actually has no effect + try: + return self.main(*args, **kwargs) + except fmf.utils.GeneralError as error: + # TODO: Better handling of --debug + if "--debug" not in kwargs: + fmf.utils.log.error(error) + raise + + +@click.group("fmf", cls=CatchAllExceptions) +@click.version_option(fmf.__version__, message="%(version)s") +@click.option("--verbose", is_flag=True, default=False, type=bool, + help="Print information about parsed files to stderr") +@click.option("--debug", "-d", count=True, default=0, type=int, + help="Provide debugging information. Repeat to see more details.") +@click.pass_context +def main(ctx, debug, verbose) -> None: + """This is command line interface for the Flexible Metadata Format.""" + ctx.ensure_object(dict) + if debug: + utils.log.setLevel(debug) + ctx.obj["verbose"] = verbose + ctx.obj["debug"] = debug -def cli_entry(): +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Sub-commands +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +@main.command("ls") +@_select_options +@_utils_options +@click.pass_context +def ls(ctx, paths, select) -> None: + """List names of available objects""" + _show(ctx, paths, select, brief=True) + + +@main.command("clean") +def clean() -> None: + """Remove cache directory and its content""" + _clean() + + +@main.command("show") +@_select_options +@_format_options +@_utils_options +@click.pass_context +def show(ctx, paths, select, format) -> None: + """Show metadata of available objects""" + _show(ctx, paths, select, format_opts=format, brief=False) + + +@main.command("init") +@_utils_options +def init(paths) -> None: + """Initialize a new metadata tree""" + # For each path create an .fmf directory and version file + for path in paths: + root = fmf.Tree.init(path) + click.echo("Metadata tree '{0}' successfully initialized.".format(root)) + + +def _show(ctx, paths, select_opts, format_opts=None, brief=False): + """ Show metadata for each path given """ + output = [] + for path in paths: + if ctx.obj["verbose"]: + utils.info("Checking {0} for metadata.".format(path)) + tree = fmf.Tree(path) + for node in tree.prune(**select_opts): + if brief: + show = node.show(brief=True) + else: + assert format_opts is not None + show = node.show(brief=False, **format_opts) + # List source files when in debug mode + if ctx.obj["debug"]: + for source in node.sources: + show += utils.color("{0}\n".format(source), "blue") + if show is not None: + output.append(show) + + # Print output and summary + if brief or format_opts and format_opts["formatting"]: + joined = "".join(output) + else: + joined = "\n".join(output) + click.echo(joined, nl=False) + if ctx.obj["verbose"]: + utils.info("Found {0}.".format( + utils.listed(len(output), "object"))) + + +def _clean(): + """Remove cache directory""" try: - main() - except fmf.utils.GeneralError as error: - if "--debug" not in sys.argv: - fmf.utils.log.error(error) - raise + cache = utils.get_cache_directory(create=False) + utils.clean_cache_directory() + click.echo("Cache directory '{0}' has been removed.".format(cache)) + except Exception as error: # pragma: no cover + utils.log.error( + "Unable to remove cache, exception was: {0}".format(error)) diff --git a/pyproject.toml b/pyproject.toml index 5d7141b..0dbf8e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,8 @@ dependencies = [ 'ruamel.yaml', 'filelock', 'jsonschema', + 'click', + 'click-option-group', ] dynamic = ['version'] @@ -50,7 +52,7 @@ docs = [ ] [project.scripts] -fmf = 'fmf.cli:cli_entry' +fmf = 'fmf.cli:main' [tool.hatch] version.source = 'vcs' diff --git a/tests/unit/test_base.py b/tests/unit/test_base.py index b925594..acae830 100644 --- a/tests/unit/test_base.py +++ b/tests/unit/test_base.py @@ -6,6 +6,7 @@ from shutil import rmtree import pytest +from click.testing import CliRunner from ruamel.yaml import YAML import fmf.cli @@ -231,7 +232,7 @@ def test_find_root(self): def test_yaml_syntax_errors(self, tmp_path): """ Handle YAML syntax errors """ with utils.cd(tmp_path): - fmf.cli.main("fmf init") + CliRunner().invoke(fmf.cli.main, "init") with (tmp_path / "main.fmf").open("w") as main: main.write("missing\ncolon:") with pytest.raises(utils.FileError): @@ -241,7 +242,7 @@ def test_yaml_syntax_errors(self, tmp_path): def test_yaml_duplicate_keys(self, tmp_path): """ Handle YAML duplicate keys """ with utils.cd(tmp_path): - fmf.cli.main("fmf init") + CliRunner().invoke(fmf.cli.main, "init") # Simple test with (tmp_path / "main.fmf").open("w") as main: diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 62698dd..1e8a7af 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -1,7 +1,7 @@ import os -import sys import pytest +from click.testing import CliRunner import fmf.cli import fmf.utils as utils @@ -16,117 +16,122 @@ class TestCommandLine: def test_smoke(self): """ Smoke test """ + runner = CliRunner() with utils.cd(WGET): - fmf.cli.main("fmf show") - fmf.cli.main("fmf show --debug") - fmf.cli.main("fmf show --verbose") - fmf.cli.main("fmf --version") + runner.invoke(fmf.cli.main, "show") + runner.invoke(fmf.cli.main, "show --debug") + runner.invoke(fmf.cli.main, "show --verbose") + runner.invoke(fmf.cli.main, "--version") def test_missing_root(self): """ Missing root """ with utils.cd("/"): - with pytest.raises(utils.FileError): - fmf.cli.main("fmf show") + with pytest.raises(utils.RootError): + CliRunner().invoke(fmf.cli.main, "show", catch_exceptions=False) def test_invalid_path(self): """ Missing root """ with pytest.raises(utils.FileError): - fmf.cli.main("fmf show --path /some-non-existent-path") + CliRunner().invoke( + fmf.cli.main, + "show --path /some-non-existent-path", + catch_exceptions=False) def test_wrong_command(self): """ Wrong command """ - with pytest.raises(utils.GeneralError): - fmf.cli.main("fmf wrongcommand") + result = CliRunner().invoke(fmf.cli.main, "wrongcommand", catch_exceptions=False) + assert result.exit_code == 2 + assert "No such command 'wrongcommand'" in result.stdout def test_output(self): """ There is some output """ with utils.cd(WGET): - output = fmf.cli.main("fmf show") - assert "download" in output + result = CliRunner().invoke(fmf.cli.main, "show") + assert "download" in result.output def test_recursion(self): """ Recursion """ with utils.cd(WGET): - output = fmf.cli.main("fmf show --name recursion/deep") - assert "1000" in output + result = CliRunner().invoke(fmf.cli.main, "show --name recursion/deep") + assert "1000" in result.output def test_inheritance(self): """ Inheritance """ with utils.cd(WGET): - output = fmf.cli.main("fmf show --name protocols/https") - assert "psplicha" in output - - def test_sys_argv(self): - """ Parsing sys.argv """ - backup = sys.argv - sys.argv = ['fmf', 'show', '--path', WGET, '--name', 'recursion/deep'] - output = fmf.cli.main() - assert "1000" in output - sys.argv = backup + result = CliRunner().invoke(fmf.cli.main, "show --name protocols/https") + assert "psplicha" in result.output def test_missing_attribute(self): """ Missing attribute """ with utils.cd(WGET): - output = fmf.cli.main("fmf show --filter x:y") - assert "wget" not in output + result = CliRunner().invoke(fmf.cli.main, "show --filter x:y") + assert "wget" not in result.output def test_filtering_by_source(self): """ By source """ with utils.cd(WGET): - output = fmf.cli.main("fmf show --source protocols/ftp/main.fmf") - assert "/protocols/ftp" in output + result = CliRunner().invoke(fmf.cli.main, "show --source protocols/ftp/main.fmf") + assert "/protocols/ftp" in result.output def test_filtering(self): """ Filtering """ + runner = CliRunner() with utils.cd(WGET): - output = fmf.cli.main( - "fmf show --filter tags:Tier1 --filter tags:TierSecurity") - assert "/download/test" in output - output = fmf.cli.main( - "fmf show --filter tags:Tier1 --filter tags:Wrong") - assert "wget" not in output - output = fmf.cli.main( - " fmf show --filter 'tags: Tier[A-Z].*'") - assert "/download/test" in output - assert "/recursion" not in output + result = runner.invoke( + fmf.cli.main, + "show --filter tags:Tier1 --filter tags:TierSecurity") + assert "/download/test" in result.output + result = runner.invoke( + fmf.cli.main, + "show --filter tags:Tier1 --filter tags:Wrong") + assert "wget" not in result.output + result = runner.invoke( + fmf.cli.main, + "show --filter 'tags: Tier[A-Z].*'") + assert "/download/test" in result.output + assert "/recursion" not in result.output def test_key_content(self): """ Key content """ with utils.cd(WGET): - output = fmf.cli.main("fmf show --key depth") - assert "/recursion/deep" in output - assert "/download/test" not in output + result = CliRunner().invoke(fmf.cli.main, "show --key depth") + assert "/recursion/deep" in result.output + assert "/download/test" not in result.output def test_format_basic(self): """ Custom format (basic) """ - output = fmf.cli.main(WGET + "fmf show --format foo") - assert "wget" not in output - assert "foo" in output + with utils.cd(WGET): + result = CliRunner().invoke(fmf.cli.main, "show --format foo") + assert "wget" not in result.output + assert "foo" in result.output def test_format_key(self): """ Custom format (find by key, check the name) """ with utils.cd(WGET): - output = fmf.cli.main( - "fmf show --key depth --format {0} --value name") - assert "/recursion/deep" in output + result = CliRunner().invoke( + fmf.cli.main, + "show --key depth --format {0} --value name") + assert "/recursion/deep" in result.output def test_format_functions(self): """ Custom format (using python functions) """ with utils.cd(WGET): - output = fmf.cli.main( - "fmf show --key depth --format {0} --value os.path.basename(name)") - assert "deep" in output - assert "/recursion" not in output + result = CliRunner().invoke( + fmf.cli.main, + "show --key depth --format {0} --value os.path.basename(name)") + assert "deep" in result.output + assert "/recursion" not in result.output @pytest.mark.skipif(os.geteuid() == 0, reason="Running as root") def test_init(self, tmp_path): """ Initialize metadata tree """ + runner = CliRunner() with utils.cd(tmp_path): - fmf.cli.main("fmf init") - fmf.cli.main("fmf show") + runner.invoke(fmf.cli.main, "init") + runner.invoke(fmf.cli.main, "show") # Already exists with pytest.raises(utils.FileError): - fmf.cli.main("fmf init") + runner.invoke(fmf.cli.main, "init", catch_exceptions=False) version_path = tmp_path / ".fmf" / "version" with version_path.open() as version: assert "1" in version.read() @@ -134,35 +139,42 @@ def test_init(self, tmp_path): secret_path = tmp_path / "denied" secret_path.mkdir(0o666) with pytest.raises(utils.FileError): - fmf.cli.main('fmf init --path {}'.format(secret_path)) + runner.invoke( + fmf.cli.main, + "init --path {}".format(secret_path), + catch_exceptions=False) secret_path.chmod(0o777) # Invalid version with version_path.open("w") as version: version.write("bad") with pytest.raises(utils.FormatError): - fmf.cli.main("fmf ls") + runner.invoke(fmf.cli.main, "ls", catch_exceptions=False) # Missing version version_path.unlink() with pytest.raises(utils.FormatError): - fmf.cli.main("fmf ls") + runner.invoke(fmf.cli.main, "ls", catch_exceptions=False) def test_conditions(self): """ Advanced filters via conditions """ path = PATH + "/../../examples/conditions" # Compare numbers + runner = CliRunner() with utils.cd(path): - output = fmf.cli.main("fmf ls --condition 'float(release) >= 7'") - assert len(output.splitlines()) == 3 - output = fmf.cli.main("fmf ls --condition 'float(release) > 7'") - assert len(output.splitlines()) == 2 + # Compare numbers + result = runner.invoke(fmf.cli.main, "ls --condition 'float(release) >= 7'") + assert len(result.output.splitlines()) == 3 + result = runner.invoke(fmf.cli.main, "ls --condition 'float(release) > 7'") + assert len(result.output.splitlines()) == 2 # Access a dictionary key - output = fmf.cli.main( - "fmf ls --condition \"execute['how'] == 'dependency'\"") - assert output.strip() == "/top/rhel7" + result = runner.invoke( + fmf.cli.main, + "ls --condition \"execute['how'] == 'dependency'\"") + assert result.output.strip() == "/top/rhel7" # Wrong key means unsatisfied condition - output = fmf.cli.main( - "fmf ls --condition \"execute['wrong key'] == 0\"") - assert output == '' + result = runner.invoke( + fmf.cli.main, + "ls --condition \"execute['wrong key'] == 0\"") + assert result.output == '' def test_clean(self, tmpdir, monkeypatch): """ Cache cleanup """ @@ -170,5 +182,5 @@ def test_clean(self, tmpdir, monkeypatch): monkeypatch.setattr('fmf.utils._CACHE_DIRECTORY', str(tmpdir)) testing_file = tmpdir.join("something") testing_file.write("content") - fmf.cli.main("fmf clean") + CliRunner().invoke(fmf.cli.main, "clean") assert not os.path.isfile(str(testing_file)) diff --git a/tests/unit/test_smoke.py b/tests/unit/test_smoke.py index b8cfcda..04b56a7 100644 --- a/tests/unit/test_smoke.py +++ b/tests/unit/test_smoke.py @@ -1,7 +1,9 @@ import os -import fmf.cli +from click.testing import CliRunner + import fmf.utils as utils +from fmf.cli import main # Prepare path to examples PATH = os.path.dirname(os.path.realpath(__file__)) @@ -14,10 +16,10 @@ class TestSmoke: def test_smoke(self): """ Smoke test """ with utils.cd(WGET): - fmf.cli.main("fmf ls") + CliRunner().invoke(main, ['ls']) def test_output(self): """ There is some output """ with utils.cd(WGET): - output = fmf.cli.main("fmf ls") - assert "download" in output + result = CliRunner().invoke(main, ['ls']) + assert "download" in result.output