diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 049d30103..8ec997d7c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -7,7 +7,7 @@ To contribute code or documentation, please submit a [pull request](https://gith A good way to familiarize yourself with the codebase and contribution process is to look for and tackle low-hanging fruit in the [issue tracker](https://github.com/oscal-compass/compliance-trestle/issues). -Before embarking on a more ambitious contribution, please quickly [get in touch](https://oscal-compass.github.io/compliance-trestle/maintainers/) with us. +Before embarking on a more ambitious contribution, please quickly [get in touch](https://oscal-compass.github.io/compliance-trestle/latest/contributing/maintainers/) with us. **Note: We appreciate your effort, and want to avoid a situation where a contribution requires extensive rework (by you or by us), sits in backlog for a long time, or @@ -32,7 +32,7 @@ review to indicate acceptance. A change requires LGTMs from at least two reviewers. One of the reviewers must be a [`CODEOWNER`](https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners). -For a list of the maintainers (also codeowners), see the [maintainers](https://oscal-compass.github.io/compliance-trestle/maintainers/) page. +For a list of the maintainers (also codeowners), see the [maintainers](https://oscal-compass.github.io/compliance-trestle/latest/contributing/maintainers/) ### Trestle updating, testing and release logistics @@ -88,7 +88,7 @@ The devops process does not _strictly_ enforce typing, however, the expectation commits with a focus on quality over quantity (e.g. don't add `Any` everywhere just to meet coverage requirements). Python typing of functions is an active work in progress. -`mkbuild` is used to generate the [trestle documenation site](https://oscal-compass.github.io/compliance-trestle). The `mkbuild` +`mkbuild` is used to generate the [trestle documenation site](https://oscal-compass.github.io/compliance-trestle/latest). The `mkbuild` website includes an API reference section generated from the code. Docstrings within the code are expected to follow [google style docstrings](https://www.sphinx-doc.org/en/master/usage/extensions/example_google.html). @@ -116,7 +116,7 @@ e.g. We have tried to make it as easy as possible to make contributions. This applies to how we handle the legal aspects of contribution. We use the -same approach - the [Developer's Certificate of Origin 1.1 (DCO)](https://oscal-compass.github.io/compliance-trestle/contributing/DCO/) - that the Linux® Kernel [community](https://developercertificate.org/) +same approach - the [Developer's Certificate of Origin 1.1 (DCO)](https://oscal-compass.github.io/compliance-trestle/latest/contributing/DCO/) - that the Linux® Kernel [community](https://developercertificate.org/) uses to manage code contributions. We simply ask that when submitting a patch for review, the developer diff --git a/README.md b/README.md index 731f01d18..be8b71a48 100644 --- a/README.md +++ b/README.md @@ -64,11 +64,11 @@ Users needing to import XML OSCAL artifacts are recommended to look at NIST's XM Trestle runs on almost all Python platforms (e.g. Linux, Mac, Windows), is available on PyPi and can be easily installed via pip. It is under active development and new releases are made available regularly.\ To install run: `pip install compliance-trestle`\ -See [Install trestle in a python virtual environment](https://oscal-compass.github.io/compliance-trestle/python_trestle_setup/) for the full installation guide. +See [Install trestle in a python virtual environment](https://oscal-compass.github.io/compliance-trestle/latest/installation/) for the full installation guide. ## Complete documentation and tutorials -Complete documentation, tutorials, and background on compliance can be found [here](https://oscal-compass.github.io/compliance-trestle). +Complete documentation, tutorials, and background on compliance can be found [here](https://oscal-compass.github.io/compliance-trestle/latest). ## Agile Authoring @@ -101,7 +101,7 @@ Please refer to the community [README](https://github.com/oscal-compass/communit ## Contributing to Trestle -Our project welcomes external contributions. Please consult [contributing](https://oscal-compass.github.io/compliance-trestle/contributing/mkdocs_contributing/) to get started. +Our project welcomes external contributions. Please consult [contributing](https://oscal-compass.github.io/compliance-trestle/latest/contributing/mkdocs_contributing/) to get started. ## Code of Conduct diff --git a/docs/index.md b/docs/index.md index b76cd62bc..abb987ad3 100644 --- a/docs/index.md +++ b/docs/index.md @@ -37,11 +37,61 @@ Trestle provides tooling to help orchestrate the compliance process across a num ## Important Note: -The current version of trestle supports NIST OSCAL 1.0.0-4. There was a breaking change in OSCAL moving from -version 1.0.0 to 1.0.2 mainly due to `prop` becoming `props` in AssessmentResults. As a result, the current development path of trestle requires OSCAL 1.0.4, but for those who require OSCAL 1.0.0 please use trestle version 0.37.x. That version is stable but will not have any features added, and we encourage users to move to OSCAL 1.0.4. +The current version of trestle 3.x supports NIST OSCAL 1.1.2. +Below shows trestle versions correspondence with OSCAL versions: -OSCAL version 1.0.0 files are still handled on import but any AssessmentResults must conform to the OSCAL 1.0.4 schema, with -props instead of prop. And all files created by trestle will be output as OSCAL version 1.0.4. +``` +trestle 3.x => OSCAL 1.1.2 +trestle 2.x => OSCAL 1.0.4 +trestle 1.x => OSCAL 1.0.2 +trestle 0.37.x => OSCAL 1.0.0 +``` + +Visit [pypi](https://pypi.org/project/compliance-trestle/#history) for trestle release history and downloads. + +## Notes for install of current and older versions of trestle + +#### Install of trestle 3.x + +Use python 3.11. + +``` +python3.11 -m venv venv.trestle +source venv.trestle/bin/activate +pip install compliance-trestle==3.6.0 +trestle version +Trestle version v3.6.0 based on OSCAL version 1.1.2 +``` + +#### Install of trestle 2.x + +Use python 3.9. + +``` +python3.9 -m venv venv.trestle +source venv.trestle/bin/activate +pip install compliance-trestle==2.6.0 +trestle version +Trestle version v2.6.0 based on OSCAL version 1.0.4 +``` + +#### Install of trestle 1.x + +Use python 3.9. + +Due to dependency updates since the release of trestle 1.2.0, perform the following in your venv: + +``` +python3.9 -m venv venv.trestle +source venv.trestle/bin/activate +pip install compliance-trestle==1.2.0 +pip uninstall pydantic +pip uninstall pydantic_core +pip install pydantic==1.10.2 +pip install requests +trestle version +Trestle version v1.2.0 based on OSCAL version 1.0.2 +``` ## Why Trestle @@ -79,7 +129,7 @@ Trestle runs on most all python platforms (e.g. Linux, Mac, Windows) and is avai ## Development status -Compliance trestle is currently stable and is based on NIST OSCAL version 1.0.4, with active development continuing. +Compliance trestle is currently stable and is based on NIST OSCAL version 1.1.2, with active development continuing. ## Contributing to Trestle diff --git a/docs/reference/API/trestle/tasks/cis_xlsx_to_oscal_cd.md b/docs/reference/API/trestle/tasks/cis_xlsx_to_oscal_cd.md new file mode 100644 index 000000000..a57dcd57c --- /dev/null +++ b/docs/reference/API/trestle/tasks/cis_xlsx_to_oscal_cd.md @@ -0,0 +1,7 @@ +--- +title: trestle.tasks.cis_xlsx_to_oscal_cd +description: Documentation for trestle.tasks.cis_xlsx_to_oscal_cd module +--- + +::: trestle.tasks.cis_xlsx_to_oscal_cd +handler: python diff --git a/docs/tutorials/Transformers_and_Tasks/OCP4_CIS_profile_to_oscal_cd.md b/docs/tutorials/Transformers_and_Tasks/OCP4_CIS_profile_to_oscal_cd.md index 119647497..ceb12f7f9 100644 --- a/docs/tutorials/Transformers_and_Tasks/OCP4_CIS_profile_to_oscal_cd.md +++ b/docs/tutorials/Transformers_and_Tasks/OCP4_CIS_profile_to_oscal_cd.md @@ -17,7 +17,7 @@ The second is a one-command transformation from `.profile` to `OSCAL.json`. ## Step 1: Install trestle in a Python virtual environment -Follow the instructions [here](https://oscal-compass.github.io/compliance-trestle/python_trestle_setup/) to install trestle in a virtual environment. +Follow the instructions [here](https://oscal-compass.github.io/compliance-trestle/latest/installation/) to install trestle in a virtual environment. ## Step 2: Transform profile data (CIS benchmarks) diff --git a/docs/tutorials/Transformers_and_Tasks/csv_to_oscal_cd.md b/docs/tutorials/Transformers_and_Tasks/csv_to_oscal_cd.md index 13aa4898d..90c16d220 100644 --- a/docs/tutorials/Transformers_and_Tasks/csv_to_oscal_cd.md +++ b/docs/tutorials/Transformers_and_Tasks/csv_to_oscal_cd.md @@ -169,7 +169,7 @@ The below table represents the expectations of trestle task `csv-to-oscal-cd` fo ## *Step 1: Install trestle in a Python virtual environment* -Follow the instructions [here](https://oscal-compass.github.io/compliance-trestle/python_trestle_setup/) to install trestle in a virtual environment. +Follow the instructions [here](https://oscal-compass.github.io/compliance-trestle/latest/installation/) to install trestle in a virtual environment. ## *Step 2: Transform profile data (CIS benchmarks)* diff --git a/docs/tutorials/Trestle_authoring/ssp_profile_catalog_authoring.md b/docs/tutorials/Trestle_authoring/ssp_profile_catalog_authoring.md index 3a8881692..ff3185d37 100644 --- a/docs/tutorials/Trestle_authoring/ssp_profile_catalog_authoring.md +++ b/docs/tutorials/Trestle_authoring/ssp_profile_catalog_authoring.md @@ -321,7 +321,7 @@ Access control policy and procedures address the controls in the AC family that - + ## Control Implementation Guidance diff --git a/docs/tutorials/Trestle_authoring/trestle_author.md b/docs/tutorials/Trestle_authoring/trestle_author.md index 9ddffa333..3340861f1 100644 --- a/docs/tutorials/Trestle_authoring/trestle_author.md +++ b/docs/tutorials/Trestle_authoring/trestle_author.md @@ -690,7 +690,7 @@ CLI evocation: > trestle author catalog-assemble -The `catalog` author commands allow you to convert a control catalog to markdown and edit its control statement, then assemble markdown back into an OSCAL catalog with the modifications to the statement. Items in the statement may be edited or added. For more details on its usage please see [the catalog authoring tutorial](https://oscal-compass.github.io/compliance-trestle/tutorials/ssp_profile_catalog_authoring/ssp_profile_catalog_authoring). +The `catalog` author commands allow you to convert a control catalog to markdown and edit its control statement, then assemble markdown back into an OSCAL catalog with the modifications to the statement. Items in the statement may be edited or added. For more details on its usage please see [the catalog authoring tutorial](https://oscal-compass.github.io/compliance-trestle/latest/tutorials/Trestle_authoring/ssp_profile_catalog_authoring/). ### Profile authoring @@ -704,7 +704,7 @@ CLI evocation: > trestle author profile-assemble -The `profile` author commands allow you to edit additions made by a profile to its imported controls that end up in the final resolved profile catalog. Only the additions may be edited or added to the generated markdown control files - and those additions can then be assembled into a new version of the original profile, with those additions. For more details on its usage please see [the profile authoring tutorial](https://oscal-compass.github.io/compliance-trestle/tutorials/ssp_profile_catalog_authoring/ssp_profile_catalog_authoring). +The `profile` author commands allow you to edit additions made by a profile to its imported controls that end up in the final resolved profile catalog. Only the additions may be edited or added to the generated markdown control files - and those additions can then be assembled into a new version of the original profile, with those additions. For more details on its usage please see [the profile authoring tutorial](https://oscal-compass.github.io/compliance-trestle/latest/tutorials/Trestle_authoring/ssp_profile_catalog_authoring/). ### Profile generation with inheritance @@ -719,7 +719,7 @@ All components must have exported provided statements, no exported responsibilit As with the other related author commands, if an existing destination file already exists, it is not updated if no changes would be made. -For more details on its usage please see [the ssp-filter tutorial](https://oscal-compass.github.io/compliance-trestle/tutorials/ssp_profile_catalog_authoring/ssp_profile_catalog_authoring). +For more details on its usage please see [the ssp-filter tutorial](https://oscal-compass.github.io/compliance-trestle/latest/tutorials/Trestle_authoring/ssp_profile_catalog_authoring/). ### SSP authoring @@ -735,7 +735,7 @@ CLI evocation: The `ssp-generate` sub-command creates a partial SSP (System Security Plan) from a profile and optional yaml header file. `ssp-assemble` can then assemble the markdown files into a single json SSP file. -For more details on its usage please see [the ssp authoring tutorial](https://oscal-compass.github.io/compliance-trestle/tutorials/ssp_profile_catalog_authoring/ssp_profile_catalog_authoring). +For more details on its usage please see [the ssp authoring tutorial](https://oscal-compass.github.io/compliance-trestle/latest/tutorials/Trestle_authoring/ssp_profile_catalog_authoring/). ### SSP Content Filtering @@ -757,6 +757,6 @@ You may filter by a combination of a profile, list of component names, implement As with the other related author commands, if an existing destination file already exists, it is not updated if no changes would be made. -For more details on its usage please see [the ssp-filter tutorial](https://oscal-compass.github.io/compliance-trestle/tutorials/ssp_profile_catalog_authoring/ssp_profile_catalog_authoring). +For more details on its usage please see [the ssp-filter tutorial](https://oscal-compass.github.io/compliance-trestle/latest/tutorials/Trestle_authoring/ssp_profile_catalog_authoring/). diff --git a/docs/tutorials/cli.md b/docs/tutorials/cli.md index ae91b4662..150e0fab2 100644 --- a/docs/tutorials/cli.md +++ b/docs/tutorials/cli.md @@ -148,7 +148,7 @@ This command will return the current version of Trestle and OSCAL it is using. Running `trestle version` will return: -> Trestle version v2.0.0 based on OSCAL version 1.0.4 +> Trestle version v3.x.x based on OSCAL version 1.1.2 It can also be used to retrieve the metadata version of the OSCAL object: @@ -165,7 +165,7 @@ It can also be used to retrieve the metadata version of the OSCAL object: "version": "0.1.10", <<< this version here - "oscal-version": "1.0.4" + "oscal-version": "1.1.2" }, ... @@ -176,7 +176,7 @@ It can also be used to retrieve the metadata version of the OSCAL object: Running `trestle version -n nist -t catalog` will return: -> Version of OSCAL object of nist catalog is: 1.0.0 +> Version of OSCAL object of nist catalog is: 1.1.2 ## `trestle init` diff --git a/mkdocs.yml b/mkdocs.yml index 5f808ec26..92d6975f1 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -1,5 +1,7 @@ edit_uri: '' extra: + version: + provider: mike analytics: property: G-XT3KGMHSY8 provider: google diff --git a/setup.cfg b/setup.cfg index 4d32d3ba8..6d28c5506 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,7 +29,7 @@ install_requires = attrs ilcli cryptography==43.0.3 - paramiko==3.4.0 + paramiko==3.5.0 ruamel.yaml furl pydantic[email]>=2.0.0 diff --git a/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet.xlsx b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet.xlsx new file mode 100644 index 000000000..73086a493 Binary files /dev/null and b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet.xlsx differ diff --git a/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_bad_control.xlsx b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_bad_control.xlsx new file mode 100644 index 000000000..789d83356 Binary files /dev/null and b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_bad_control.xlsx differ diff --git a/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_combined.xlsx b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_combined.xlsx new file mode 100644 index 000000000..b90484a27 Binary files /dev/null and b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_combined.xlsx differ diff --git a/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_merge.xlsx b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_merge.xlsx new file mode 100644 index 000000000..3201c2768 Binary files /dev/null and b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_merge.xlsx differ diff --git a/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_missing_column.xlsx b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_missing_column.xlsx new file mode 100644 index 000000000..f997fee35 Binary files /dev/null and b/tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_missing_column.xlsx differ diff --git a/tests/data/tasks/cis-xlsx-to-oscal-cd/test-cis-xlsx-to-oscal-cd.db2.snippet.config b/tests/data/tasks/cis-xlsx-to-oscal-cd/test-cis-xlsx-to-oscal-cd.db2.snippet.config new file mode 100644 index 000000000..e9bd631b5 --- /dev/null +++ b/tests/data/tasks/cis-xlsx-to-oscal-cd/test-cis-xlsx-to-oscal-cd.db2.snippet.config @@ -0,0 +1,18 @@ +[task.cis-xlsx-to-oscal-cd] + +benchmark-file = tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet.xlsx +benchmark-title = CIS IBM Db2 11 Benchmark +benchmark-version = 1.1.0 + +namespace = https://oscal-compass/compliance-trestle/schemas/oscal/cd + +component-name = IBM Db2 11 +component-description = IBM Db2 11 +component-type = software + +profile-version = v8 +profile-source = catalogs/CIS_controls_v8/catalog.json +profile-description = CIS catalog v8 + +output-dir = tests/data/tasks/cis-xlsx-to-oscal-cd/output +output-overwrite = true diff --git a/tests/trestle/core/commands/author/ssp_test.py b/tests/trestle/core/commands/author/ssp_test.py index ec4df8d7e..0df5ce34f 100644 --- a/tests/trestle/core/commands/author/ssp_test.py +++ b/tests/trestle/core/commands/author/ssp_test.py @@ -1160,14 +1160,19 @@ def test_ssp_gen_and_assemble_add_props(tmp_trestle_dir: pathlib.Path) -> None: impl_reqs = assem_ssp.control_implementation.implemented_requirements impl_req = next((i_req for i_req in impl_reqs if i_req.control_id == 'ac-1'), None) assert len(impl_req.props) == 1 - assert impl_req.props[0].name == 'prop_with_ns' - assert impl_req.props[0].value == 'prop with ns' - assert impl_req.props[0].ns == 'https://my_new_namespace' + assert impl_req.props[0].name == 'prop_with_ns' # type: ignore + assert impl_req.props[0].value == 'prop with ns' # type: ignore + assert impl_req.props[0].ns == 'https://my_new_namespace' # type: ignore smt_a = next((smt for smt in impl_req.statements if smt.statement_id == 'ac-1_smt.a'), None) assert len(smt_a.props) == 1 - assert smt_a.props[0].name == 'smt_prop' - assert smt_a.props[0].value == 'smt prop' + assert smt_a.props[0].name == 'smt_prop' # type: ignore + assert smt_a.props[0].value == 'smt prop' # type: ignore + + # Run again and check that there is no change + assert ssp_assemble._run(args) == 0 + assem_ssp_2, _ = ModelUtils.load_model_for_class(tmp_trestle_dir, ssp_name, ossp.SystemSecurityPlan) + assert assem_ssp_2.metadata.last_modified == assem_ssp.metadata.last_modified def test_ssp_gen_and_assemble_implementation_parts(tmp_trestle_dir: pathlib.Path, monkeypatch: MonkeyPatch) -> None: diff --git a/tests/trestle/core/utils_test.py b/tests/trestle/core/utils_test.py index ab6a097ed..16f39ece4 100644 --- a/tests/trestle/core/utils_test.py +++ b/tests/trestle/core/utils_test.py @@ -41,6 +41,7 @@ from trestle.common.err import TrestleError from trestle.common.model_utils import ModelUtils from trestle.common.str_utils import AliasMode +from trestle.common.str_utils import as_bool def load_good_catalog() -> catalog.Catalog: @@ -352,3 +353,10 @@ def test_prune_empty_dirs(tmp_path: pathlib.Path) -> None: assert not (tmp_path / 'sub1/sub11/sub111').exists() assert foo_path.exists() assert bar_path.exists() + + +def test_as_bool(tmp_path: pathlib.Path) -> None: + """Test as_bool function.""" + assert as_bool('true') + assert not as_bool('false') + assert not as_bool(None) diff --git a/tests/trestle/tasks/cis_xlsx_to_oscal_cd_test.py b/tests/trestle/tasks/cis_xlsx_to_oscal_cd_test.py new file mode 100644 index 000000000..07a68212c --- /dev/null +++ b/tests/trestle/tasks/cis_xlsx_to_oscal_cd_test.py @@ -0,0 +1,242 @@ +# Copyright (c) 2025 The OSCAL Compass Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""cis-xlsx-to-oscal-cd task tests.""" + +import configparser +import os +import pathlib +from typing import Dict +from unittest.mock import patch + +import trestle.tasks.cis_xlsx_to_oscal_cd as cis_xlsx_to_oscal_cd +from trestle.oscal.component import ComponentDefinition +from trestle.tasks.base_task import TaskOutcome + +db2_config = 'tests/data/tasks/cis-xlsx-to-oscal-cd/test-cis-xlsx-to-oscal-cd.db2.snippet.config' + + +def _get_section(tmp_path: pathlib.Path, file_: str) -> Dict: + """Get section.""" + config = configparser.ConfigParser() + config_path = pathlib.Path(file_) + config.read(config_path) + section = config['task.cis-xlsx-to-oscal-cd'] + section['output-dir'] = str(tmp_path) + return section + + +def test_cis_xlsx_to_oscal_cd_compare(tmp_path: pathlib.Path): + """Test compare.""" + x = cis_xlsx_to_oscal_cd.SortHelper.compare('A', 'B') + assert x == -1 + x = cis_xlsx_to_oscal_cd.SortHelper.compare('0.0', '1.0') + assert x == -1 + x = cis_xlsx_to_oscal_cd.SortHelper.compare('1.0', '0.0') + assert x == 1 + x = cis_xlsx_to_oscal_cd.SortHelper.compare('1.1', '1.1') + assert x == 0 + + +def test_cis_xlsx_to_oscal_cd_print_info(tmp_path: pathlib.Path): + """Test print_info call.""" + section = _get_section(tmp_path, db2_config) + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.print_info() + assert retval is None + + +def test_cis_xlsx_to_oscal_cd_simulate(tmp_path: pathlib.Path): + """Test simulate call.""" + section = _get_section(tmp_path, db2_config) + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.simulate() + assert retval == TaskOutcome.SIM_SUCCESS + assert len(os.listdir(str(tmp_path))) == 0 + + +def test_cis_xlsx_to_oscal_cd_execute(tmp_path: pathlib.Path): + """Test execute call - db2.""" + section = _get_section(tmp_path, db2_config) + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.SUCCESS + _validate_db2(tmp_path) + + +def test_cis_xlsx_to_oscal_cd_execute_combined(tmp_path: pathlib.Path): + """Test execute call - db2.""" + section = _get_section(tmp_path, db2_config) + section['benchmark-file' + ] = 'tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_combined.xlsx' + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.SUCCESS + _validate_db2(tmp_path) + + +def test_cis_xlsx_to_oscal_cd_execute_missing_column(tmp_path: pathlib.Path): + """Test execute call - missing column.""" + section = _get_section(tmp_path, db2_config) + section['benchmark-file' + ] = 'tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_missing_column.xlsx' + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.FAILURE + + +def test_cis_xlsx_to_oscal_cd_execute_bad_config(tmp_path: pathlib.Path): + """Test execute call - bad config.""" + section = _get_section(tmp_path, db2_config) + del section['benchmark-file'] + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.FAILURE + + +def test_cis_xlsx_to_oscal_cd_execute_bad_overwrite(tmp_path: pathlib.Path): + """Test execute call - bad overwrite.""" + section = _get_section(tmp_path, db2_config) + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.SUCCESS + section['output-overwrite'] = 'false' + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.FAILURE + + +def test_cis_xlsx_to_oscal_cd_execute_merge(tmp_path: pathlib.Path): + """Test execute call - merge.""" + section = _get_section(tmp_path, db2_config) + section['benchmark-file' + ] = 'tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_merge.xlsx' + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.SUCCESS + _validate_db2(tmp_path) + + +def test_cis_xlsx_to_oscal_cd_execute_rule_prefix(tmp_path: pathlib.Path): + """Test execute call - rule prefix.""" + section = _get_section(tmp_path, db2_config) + section['benchmark-rule-prefix'] = 'CIS' + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.SUCCESS + _validate_db2(tmp_path) + + +def test_cis_xlsx_to_oscal_cd_execute_control_prefix(tmp_path: pathlib.Path): + """Test execute call - control prefix.""" + section = _get_section(tmp_path, db2_config) + section['benchmark-control-prefix'] = 'cisc' + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.SUCCESS + _validate_db2(tmp_path) + + +def test_cis_xlsx_to_oscal_cd_execute_control_bad(tmp_path: pathlib.Path): + """Test execute call - control bad.""" + section = _get_section(tmp_path, db2_config) + section['benchmark-file' + ] = 'tests/data/tasks/cis-xlsx-to-oscal-cd/CIS_IBM_Db2_11_Benchmark_v1.1.0.snippet_bad_control.xlsx' + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.FAILURE + + +def test_cis_xlsx_to_oscal_cd_execute_columns_exclude(tmp_path: pathlib.Path): + """Test execute call - control prefix.""" + section = _get_section(tmp_path, db2_config) + section['columns-exclude'] = '"Recommendation #", "Profile", "Description"' + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.SUCCESS + _validate_db2(tmp_path) + + +def test_cis_xlsx_to_oscal_cd_execute_config_missing(tmp_path: pathlib.Path): + """Test execute call - config_missing.""" + section = None + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.FAILURE + + +def test_cis_xlsx_to_oscal_cd_execute_count_mismatch(tmp_path: pathlib.Path): + """Test execute call - count mismatch.""" + with patch('trestle.tasks.cis_xlsx_to_oscal_cd.CombineHelper._populate_combined_map') as mock_original_function: + mock_original_function.return_value = -5 + section = _get_section(tmp_path, db2_config) + tgt = cis_xlsx_to_oscal_cd.CisXlsxToOscalCd(section) + retval = tgt.execute() + assert retval == TaskOutcome.FAILURE + + +def test_cis_xlsx_to_oscal_cd_execute_csv_row_mgr(tmp_path: pathlib.Path): + """Test execute call - csv row mgr.""" + row_names = ['row1', 'row2', 'row3'] + # create new row mgr + csv_row_mgr = cis_xlsx_to_oscal_cd.CsvRowMgr(row_names) + # test valid case + try: + csv_row_mgr.put('row1', '') + except RuntimeError: + assert 0 == 1 + # test invalid case + try: + csv_row_mgr.put('rowX', '') + assert 0 == 1 + except RuntimeError: + pass + + +def _validate_db2(tmp_path: pathlib.Path): + """Validate produced OSCAL for db2 cd.""" + # read catalog + file_path = tmp_path / 'component-definition.json' + component_definition = ComponentDefinition.oscal_read(file_path) + # spot check + assert len(component_definition.components) == 1 + assert component_definition.metadata.title == 'CIS IBM Db2 11 Benchmark' + assert component_definition.metadata.version == '1.1.0' + component = component_definition.components[0] + assert component.type == 'software' + assert len(component.props) == 552 + prop = component.props[0] + assert prop.name == 'Rule_Id' + assert prop.ns == 'https://oscal-compass/compliance-trestle/schemas/oscal/cd' + assert prop.value == 'CIS-1.1.1' + assert prop.remarks == 'rule_set_00' + assert len(component.control_implementations) == 1 + prop = component.props[551] + assert prop.name == 'Group_Description_Level_1' + assert prop.ns == 'https://oscal-compass/compliance-trestle/schemas/oscal/cd' + assert prop.value.startswith('This section provides guidance on various database configuration parameters.') + assert prop.remarks == 'rule_set_22' + assert len(component.control_implementations) == 1 + control_implementation = component.control_implementations[0] + assert len(control_implementation.implemented_requirements) == 6 + assert control_implementation.source == 'catalogs/CIS_controls_v8/catalog.json' + assert control_implementation.description == 'CIS catalog v8' + implemented_requirement = control_implementation.implemented_requirements[0] + assert implemented_requirement.control_id == 'cisc-7.4' + assert len(implemented_requirement.props) == 1 + prop = implemented_requirement.props[0] + assert prop.name == 'Rule_Id' + assert prop.ns == 'https://oscal-compass/compliance-trestle/schemas/oscal/cd' + assert prop.value == 'CIS-1.1.1' + implemented_requirement = control_implementation.implemented_requirements[5] + assert implemented_requirement.control_id == 'cisc-3.10' diff --git a/trestle/common/str_utils.py b/trestle/common/str_utils.py index f247801a8..88f98ad53 100644 --- a/trestle/common/str_utils.py +++ b/trestle/common/str_utils.py @@ -123,6 +123,18 @@ def as_string(string_or_none: Optional[str]) -> str: return string_or_none if string_or_none else '' +def as_bool(string_or_none: Optional[str]) -> bool: + """Convert string to boolean.""" + if string_or_none: + if string_or_none.lower() in ['false']: + rval = False + else: + rval = True + else: + rval = False + return rval + + def string_from_root(item_with_root: Optional[Any]) -> str: """Convert root to string if present.""" return as_string(item_with_root.__root__) if item_with_root else '' diff --git a/trestle/core/catalog/catalog_reader.py b/trestle/core/catalog/catalog_reader.py index 4ed76d132..2e951be00 100644 --- a/trestle/core/catalog/catalog_reader.py +++ b/trestle/core/catalog/catalog_reader.py @@ -349,7 +349,7 @@ def _add_props_to_imp_req( # add the props at control level if props: imp_req.props = as_list(imp_req.props) - imp_req.props.extend(props) + ControlInterface.reconcile_props(imp_req, props) # add the props at the part level for label, part_id in control_part_id_map.items(): @@ -359,7 +359,7 @@ def _add_props_to_imp_req( for statement in as_list(imp_req.statements): if statement.statement_id == part_id: statement.props = as_list(statement.props) - statement.props.extend(props) + ControlInterface.reconcile_props(statement, props) @staticmethod def _update_ssp_with_md_header( diff --git a/trestle/core/control_interface.py b/trestle/core/control_interface.py index ab3c4c95e..69611ae06 100644 --- a/trestle/core/control_interface.py +++ b/trestle/core/control_interface.py @@ -1121,6 +1121,18 @@ def insert_status_in_props(item: TypeWithProps, status: common.ImplementationSta prop = ControlInterface._status_as_prop(status) ControlInterface._replace_prop(item, prop) + @staticmethod + def reconcile_props(item: TypeWithProps, props: List[common.Property]) -> None: + """Add properties to an item with properties while replacing existing.""" + names = [prop.name for prop in as_list(item.props)] + item.props = as_list(item.props) + for prop in props: + if prop.name in names: + index = names.index(prop.name) + item.props[index] = prop + else: + item.props.append(prop) + @staticmethod def _copy_status_in_props(dest: TypeWithProps, src: TypeWithProps) -> None: """Copy status in props from one object to another.""" diff --git a/trestle/tasks/cis_xlsx_to_oscal_cd.py b/trestle/tasks/cis_xlsx_to_oscal_cd.py new file mode 100644 index 000000000..a7011eda3 --- /dev/null +++ b/trestle/tasks/cis_xlsx_to_oscal_cd.py @@ -0,0 +1,965 @@ +# Copyright (c) 2025 The OSCAL Compass Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""OSCAL transformation tasks.""" +import configparser +import csv +import datetime +import logging +import pathlib +import tempfile +import traceback +from configparser import SectionProxy +from functools import cmp_to_key +from typing import Dict, Iterator, List, Optional + +from openpyxl import load_workbook +from openpyxl.workbook.workbook import Workbook + +from trestle.common.str_utils import as_bool +from trestle.tasks.base_task import TaskBase +from trestle.tasks.base_task import TaskOutcome +from trestle.tasks.csv_to_oscal_cd import CsvToOscalComponentDefinition + +logger = logging.getLogger(__name__) + +timestamp = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).isoformat() + +default_benchmark_control_prefix = 'cisc-' +default_benchmark_rule_prefix = 'CIS-' +output_file = 'component-definition.json' + +head_recommendation_no = 'Recommendation #' +head_section_no = 'Section #' + + +class CisXlsxToOscalCd(TaskBase): + """ + Task to transform CIS .xlsx to OSCAL component definition. + + Attributes: + name: Name of the task. + """ + + name = 'cis-xlsx-to-oscal-cd' + + def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None: + """ + Initialize trestle task. + + Args: + config_object: Config section associated with the task. + """ + self.config_object = config_object + super().__init__(config_object) + self._default_benchmark_sheet_name = 'Combined Profiles' + self._default_component_type = 'software' + self._default_output_overwrite = 'True' + # + self._example_benchmark_file = 'data/CIS_IBM_Db2_11_Benchmark_v1.1.0.xlsx' + self._example_benchmark_title = 'CIS IBM Db2 11 Benchmark' + self._example_benchmark_version = '1.1.0' + self._example_oscal_cd_dir = 'data/component-definitions/CIS_IBM_Db2_11_Benchmark_v1.1.0' + self._example_namespace = 'https://oscal-compass/compliance-trestle/schemas/oscal/cd' + self._example_profile_version = 'v8' + self._example_profile_source = 'catalogs/CIS_controls_v8/catalog.json' + self._example_profile_description = 'CIS catalog v8' + self._example_component_name = 'Db2 11' + + def print_info(self) -> None: + """Print the help string.""" + logger.info(f'Help information for {self.name} task.') + logger.info('') + logger.info('Purpose: Create component definition from standard CIS benchmark') + logger.info('') + logger.info('Configuration flags sit under [task.cis-xlsx-to-oscal-cd]:') + text1 = ' benchmark-file = ' + text2 = f'(required) path of file to read the CIS benchmark .xlsx, e.g., "{self._example_benchmark_file}".' + logger.info(text1 + text2) + text1 = ' benchmark-title = ' + text2 = f'(required) title of the CIS benchmark, e.g., "{self._example_benchmark_title}".' + logger.info(text1 + text2) + text1 = ' benchmark-version = ' + text2 = f'(required) version of the CIS benchmark .xlsx, e.g., "{self._example_benchmark_version}".' + logger.info(text1 + text2) + text1 = ' benchmark-control-prefix = ' + text2 = f'(optional) benchmark control prefix, default = "{default_benchmark_control_prefix}".' + logger.info(text1 + text2) + text1 = ' benchmark-rule-prefix = ' + text2 = f'(optional) benchmark rule prefix, default = "{default_benchmark_rule_prefix}".' + logger.info(text1 + text2) + text1 = ' benchmark-sheet-name = ' + text2 = f'(optional) benchmark sheet name, default = "{self._default_benchmark_sheet_name}".' + logger.info(text1 + text2) + text1 = ' component-name = ' + text2 = f'(required) component name, e.g., "{self._example_component_name}".' + logger.info(text1 + text2) + text1 = ' component-description = ' + text2 = f'(required) component description, e.g., "{self._example_component_name}".' + logger.info(text1 + text2) + text1 = ' component-type = ' + text2 = f'(required) component type, e.g., "{self._default_component_type}".' + logger.info(text1 + text2) + text1 = ' namespace = ' + text2 = f'(required) namespace, e.g., "{self._example_namespace}".' + logger.info(text1 + text2) + text1 = ' output-dir = ' + text2 = f'(required) path of folder to write the OSCAL {output_file}, e.g., "{self._example_oscal_cd_dir}".' + logger.info(text1 + text2) + text1 = ' output-overwrite = ' + text2 = f'(optional) output overwrite, default = "{self._default_output_overwrite}".' + logger.info(text1 + text2) + text1 = ' profile-version = ' + text2 = f'(required) profile version, e.g., "{self._example_profile_version}".' + logger.info(text1 + text2) + text1 = ' profile-source = ' + text2 = f'(required) profile source, e.g., "{self._example_profile_source}".' + logger.info(text1 + text2) + text1 = ' profile-description = ' + text2 = f'(required) profile description, e.g., "{self._example_profile_description}".' + logger.info(text1 + text2) + + def simulate(self) -> TaskOutcome: + """Provide a simulated outcome.""" + return TaskOutcome('simulated-success') + + def execute(self) -> TaskOutcome: + """Provide an actual outcome.""" + try: + return self._execute() + except Exception: + logger.info(traceback.format_exc()) + return TaskOutcome('failure') + + def _execute(self) -> TaskOutcome: + """Wrap the execute for exception handling.""" + if not self.config_object: + logger.warning('config missing') + return TaskOutcome('failure') + # required + try: + self._benchmark_file = self.config_object['benchmark-file'] + self._oscal_cd_dir = self.config_object['output-dir'] + self._namespace = self.config_object['namespace'] + self._profile_version = self.config_object['profile-version'] + except KeyError as e: + logger.info(f'key {e.args[0]} missing') + return TaskOutcome('failure') + # output + self._oscal_cd_path = pathlib.Path(self._oscal_cd_dir) + # insure output dir exists + self._oscal_cd_path.mkdir(exist_ok=True, parents=True) + # calculate output file name & check writability + oname = 'component-definition.json' + ofile = self._oscal_cd_path / oname + overwrite = self.config_object.get('output-overwrite', self._default_output_overwrite) + overwrite = as_bool(overwrite) + if not overwrite and pathlib.Path(ofile).exists(): + logger.warning(f'output: {ofile} already exists') + return TaskOutcome('failure') + with self._get_tempdir() as tmpdir: + # step 1 - add combined sheet, if needed + combine_helper = CombineHelper(self.config_object, tmpdir) + combine_helper.run() + # step 2 - create trestle ready csv file from xlsx file + xlsx_to_csv_helper = XlsxToCsvHelper(self.config_object, tmpdir) + xlsx_to_csv_helper.run() + # step 3 - create OSCAL json file from csv file + csv_to_json_helper = CsvToJsonHelper(self.config_object, tmpdir) + task_outcome = csv_to_json_helper.run() + return task_outcome + + def _get_tempdir(self) -> tempfile.TemporaryDirectory(): + """Get tmpdir.""" + return tempfile.TemporaryDirectory() + + +class SortHelper: + """SortHelper.""" + + @staticmethod + def compare(item1: str, item2: str) -> int: + """Compare.""" + # get parts + parts1 = ''.split('.') + if item1 is not None: + parts1 = str(item1).split('.') + parts2 = ''.split('.') + if item2 is not None: + parts2 = str(item2).split('.') + # normalize parts length + while len(parts1) < len(parts2): + parts1.append('0') + while len(parts2) < len(parts1): + parts2.append('0') + # comparison + rval = 0 + for i in range(len(parts1)): + try: + v1 = int(parts1[i]) + except Exception: + rval = -1 + break + try: + v2 = int(parts2[i]) + except Exception: + rval = 1 + break + if v1 < v2: + rval = -1 + break + if v1 > v2: + rval = 1 + break + text = f'compare rval: {rval} item1: {item1} item2: {item2}' + logger.debug(f'{text}') + return rval + + +class SheetHelper: + """SheetHelper.""" + + def __init__(self, wb: Workbook, sn: str) -> None: + """Initialize.""" + self.wb = wb + self.sn = sn + self.ws = self.wb[self.sn] + + def get_sn(self) -> int: + """Get sheet name.""" + return self.sn + + def get_max_col(self) -> int: + """Get max column.""" + return self.ws.max_column + + def row_generator(self) -> Iterator[int]: + """Generate rows until max reached.""" + row = 1 + while row <= self.ws.max_row: + yield row + row += 1 + + def get_cell_value(self, row: int, col: int) -> str: + """Get cell value for given row and column name.""" + cell = self.ws.cell(row, col) + return cell.value + + def put_cell_value(self, row: int, col: int, value: str) -> None: + """Get cell value for given row and column name.""" + cell = self.ws.cell(row, col) + cell.value = value + + @staticmethod + def get_sheetname_prefixes() -> List[str]: + """Get sheetnames prefixes.""" + rval = ['Level 1', 'Level 2'] + return rval + + @staticmethod + def get_sheetname() -> str: + """Get sheetname output.""" + rval = 'Combined Profiles' + return rval + + +class ColHelper: + """Col Helper.""" + + @staticmethod + def get_section() -> int: + """Get section col no.""" + return 1 + + @staticmethod + def get_recommendation() -> int: + """Get recommendation col no.""" + return 2 + + +class Int: + """Int.""" + + def __init__(self, value=0): + """Initialize.""" + self.value = value + + def inc_value(self) -> None: + """Increment.""" + self.value += 1 + + def get_value(self) -> int: + """Get.""" + return self.value + + +class CombineHelper: + """Combine helper.""" + + tgt_col_profile = 3 + + def __init__(self, config: SectionProxy, tmpdir: str) -> None: + """Initialize.""" + benchmark_file = config['benchmark-file'] + self.ipath = pathlib.Path(benchmark_file) + self.opath = pathlib.Path(tmpdir) / self.ipath.name + self.wb = load_workbook(self.ipath) + self.ws_map = {} + self.combined_map = {} + + def run(self) -> None: + """Run.""" + self._add_sheet_combined_profiles() + self._save() + + def _gather_sheets(self) -> None: + """Gather sheets.""" + for sn in self.wb.sheetnames: + for pn in self.sheetnames_prefixes: + if sn.startswith(pn): + self.ws_map[sn] = SheetHelper(self.wb, sn) + logger.debug(f'input sheet {sn} to be combined.') + break + + def _validate_columns_count(self) -> None: + """Validate columns count.""" + columns = -1 + for sn in self.ws_map.keys(): + sheet_helper = self.ws_map[sn] + if columns < 0: + columns = sheet_helper.get_max_col() + if columns != sheet_helper.get_max_col(): + raise RuntimeError(f'{sn} unexpected columns count {sheet_helper.get_max_col()} for sheet {sn}') + + def _populate_combined_map(self) -> int: + """Populate combined map.""" + src_col_section_no = ColHelper.get_section() + src_col_recommendation_no = ColHelper.get_recommendation() + rec_count_sheets = 0 + # populate combined map + for sn in self.ws_map.keys(): + sheet_helper = SheetHelper(self.wb, sn) + # process all rows from individual sheet + rec_count_sheets += self._process_sheet(sheet_helper, src_col_section_no, src_col_recommendation_no) + return rec_count_sheets + + def _process_sheet(self, sheet_helper: SheetHelper, src_col_section_no: int, src_col_recommendation_no: int) -> int: + """Process sheet.""" + rec_count = 0 + for row in sheet_helper.row_generator(): + # section + section_no = sheet_helper.get_cell_value(row, src_col_section_no) + if section_no not in self.combined_map.keys(): + self.combined_map[section_no] = {} + # recommendation + recommendation_no = sheet_helper.get_cell_value(row, src_col_recommendation_no) + if recommendation_no not in self.combined_map[section_no].keys(): + self.combined_map[section_no][recommendation_no] = {} + # combine head or data + if row == 1: + self._combine_head(sheet_helper, row, section_no, recommendation_no, CombineHelper.tgt_col_profile) + else: + self._combine_data(sheet_helper, row, section_no, recommendation_no, CombineHelper.tgt_col_profile) + if recommendation_no: + rec_count += 1 + return rec_count + + def _handle_head_row( + self, combined_helper: SheetHelper, row: Int, kvset: Dict, section_no: str, recommendation_no: str + ) -> None: + """Handle head row.""" + for col in kvset.keys(): + value = self.combined_map[section_no][recommendation_no][col] + if col == CombineHelper.tgt_col_profile: + value = value[0] + combined_helper.put_cell_value(row.get_value(), col, value) + row.inc_value() + + def _handle_data_row_control( + self, + combined_helper: SheetHelper, + row: Int, + kvset: Dict, + section_no: str, + recommendation_no: str, + rec_count_merged: Int + ) -> None: + """Handle data row control.""" + profiles = kvset[CombineHelper.tgt_col_profile] + for profile in profiles: + for col in kvset.keys(): + value = self.combined_map[section_no][recommendation_no][col] + if col == CombineHelper.tgt_col_profile: + value = profile + combined_helper.put_cell_value(row.get_value(), col, value) + row.inc_value() + rec_count_merged.inc_value() + + def _handle_data_row_non_control( + self, combined_helper: SheetHelper, row: Int, kvset: Dict, section_no: str, recommendation_no: str + ) -> None: + """Handle data row non-control.""" + for col in kvset.keys(): + value = self.combined_map[section_no][recommendation_no][col] + if col == CombineHelper.tgt_col_profile: + value = None + combined_helper.put_cell_value(row.get_value(), col, value) + row.inc_value() + + def _handle_data_row( + self, + combined_helper: SheetHelper, + row: Int, + kvset: Dict, + section_no: str, + recommendation_no: str, + rec_count_merged: Int + ) -> None: + """Handle data row.""" + if recommendation_no: + self._handle_data_row_control(combined_helper, row, kvset, section_no, recommendation_no, rec_count_merged) + else: + self._handle_data_row_non_control(combined_helper, row, kvset, section_no, recommendation_no) + + def _populate_combined_sheet(self, combined_helper: SheetHelper) -> int: + """Populate combined sheet.""" + rec_count_merged = Int(0) + row = Int(1) + keys1 = list(self.combined_map.keys()) + keys1.sort(key=cmp_to_key(SortHelper.compare)) + for section_no in keys1: + section = self.combined_map[section_no] + keys2 = list(section.keys()) + keys2.sort(key=cmp_to_key(SortHelper.compare)) + for recommendation_no in keys2: + kvset = self.combined_map[section_no][recommendation_no] + if row.get_value() == 1: + self._handle_head_row(combined_helper, row, kvset, section_no, recommendation_no) + else: + self._handle_data_row(combined_helper, row, kvset, section_no, recommendation_no, rec_count_merged) + return rec_count_merged.get_value() + + def _add_sheet_combined_profiles(self) -> None: + """Add sheet combined profiles.""" + # output sheet + self.sheetname_output = SheetHelper.get_sheetname() + exists = self.sheetname_output in self.wb.sheetnames + if exists: + logger.debug(f'output sheet {self.sheetname_output} exists.') + return + # input sheets + self.sheetnames_prefixes = SheetHelper.get_sheetname_prefixes() + # sheets + self._gather_sheets() + # validate + self._validate_columns_count() + # key columns mappings + rec_count_sheets = self._populate_combined_map() + # add combined sheet + sn = self.sheetname_output + self.wb.create_sheet(sn) + combined_helper = SheetHelper(self.wb, sn) + self.ws_map[sn] = combined_helper + # populate combined sheet + rec_count_merged = self._populate_combined_sheet(combined_helper) + # correctness check + if rec_count_sheets != rec_count_merged: + raise RuntimeError(f'recommendation counts original: {rec_count_sheets} merged: {rec_count_merged}') + + def _combine_head( + self, sheet_helper: SheetHelper, row: int, section_no: str, recommendation_no: str, tgt_col_profile: int + ) -> None: + """Combine head.""" + if not len(self.combined_map[section_no][recommendation_no].keys()): + self.combined_map[section_no][recommendation_no][tgt_col_profile] = ['profile'] + for col in range(1, sheet_helper.get_max_col() + 1): + if col < tgt_col_profile: + tcol = col + else: + tcol = col + 1 + value = sheet_helper.get_cell_value(row, col) + self.combined_map[section_no][recommendation_no][tcol] = value + + def _combine_data( + self, sheet_helper: SheetHelper, row: int, section_no: str, recommendation_no: str, tgt_col_profile: int + ) -> None: + """Combine data.""" + if not len(self.combined_map[section_no][recommendation_no].keys()): + self.combined_map[section_no][recommendation_no][tgt_col_profile] = [] + sn = sheet_helper.get_sn() + self.combined_map[section_no][recommendation_no][tgt_col_profile].append(sn) + for col in range(1, sheet_helper.get_max_col() + 1): + if col < tgt_col_profile: + tcol = col + else: + tcol = col + 1 + self.combined_map[section_no][recommendation_no][tcol] = sheet_helper.get_cell_value(row, col) + + def _save(self) -> None: + """Save.""" + self.wb.save(self.opath) + logger.debug(f'{self.opath} saved') + + +class PropertyHelper: + """Property Helper.""" + + @staticmethod + def normalize_name(name: str) -> str: + """Normalize name.""" + rval = name.replace('(', '').replace(')', '').replace('#', '').strip() + rval = rval.replace(' ', '_') + rval = rval.replace('__', '_') + return rval + + +class CsvHelper: + """Csv Helper.""" + + def __init__(self, path: pathlib.Path) -> None: + """Initialize.""" + self.path = path + self.path.parent.mkdir(parents=True, exist_ok=True) + self.rows = [] + + def add_row(self, row: List[str]) -> None: + """Add row.""" + self.rows.append(row) + + def delete_last_row(self) -> None: + """Delete last row.""" + self.rows = self.rows[:-1] + + def write(self) -> None: + """Write csv file.""" + with open(self.path, 'w', newline='', encoding='utf-8') as output: + csv_writer = csv.writer(output, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) + for row in self.rows: + csv_writer.writerow(row) + + def save(self) -> None: + """Save csv file.""" + self.write() + logger.debug(f'{self.path} saved') + + @staticmethod + def columns() -> Dict: + """Columns.""" + return { + 'Component_Title': 'A human readable name for the component.', # noqa + 'Component_Description': 'A description of the component including information about its function.', # noqa + 'Component_Type': 'A category describing the purpose of the component. ALLOWED VALUES interconnection:software:hardware:service:physical:process-procedure:plan:guidance:standard:validation:', # noqa + 'Profile': 'List of CIS profiles', # noqa + 'Rule_Id': 'A textual label that uniquely identifies a policy (desired state) that can be used to reference it elsewhere in this or other documents.', # noqa + 'Rule_Description': 'A description of the policy (desired state) including information about its purpose and scope.', # noqa + 'Profile_Source': 'A URL reference to the source catalog or profile for which this component is implementing controls for. A profile designates a selection and configuration of controls from one or more catalogs.', # noqa + 'Profile_Description': 'A description of the profile.', # noqa + 'Control_Id_List': 'A list of textual labels that uniquely identify the controls or statements that the component implements.', # noqa + 'Namespace': 'A namespace qualifying the property\'s name. This allows different organizations to associate distinct semantics with the same name. Used in conjunction with "class" as the ontology concept.', # noqa + } + + +class CsvRowMgr: + """Csv row manager.""" + + def __init__(self, row_names: List) -> None: + """Initialize.""" + self.row_names = row_names + self.map = {} + + def put(self, key: str, val: str) -> None: + """Put.""" + if key not in self.row_names: + raise RuntimeError(f'{key} not found') + self.map[key] = val + + def get(self) -> List[str]: + """Get.""" + row = [] + for name in self.row_names: + if name in self.map.keys(): + row.append(self.map[name]) + else: + row.append('') + return row + + +class CisControlsHelper: + """Cis controls helper.""" + + def __init__(self, cis_controls: str) -> None: + """Initialize.""" + self.cis_controls = cis_controls + + def ctl_generator(self) -> Iterator[Dict]: + """Generate ctls until finished.""" + parts = self.cis_controls.split(';TITLE:') + # restore TITLE: + for n in range(len(parts)): + parts[n] = f'TITLE:{parts[n]}' + # process triples TITLE, CONTROL, DESCRIPTION + for part in parts: + if part: + s1 = part.split('DESCRIPTION:') + description = s1[1].strip() + s2 = s1[0].split('CONTROL:') + control = s2[1].strip() + control_version = control.split()[0] + control_id = control.split()[1] + s3 = s2[0].split('TITLE:') + title = s3[1].strip() + ctl = { + 'description': description, + 'control-version': control_version, + 'control-id': control_id, + 'title': title + } + yield ctl + + def get_ctl_list(self, ctl_pfx: str, profile_version: List[str]) -> List[str]: + """Get control list.""" + ctl_list = [] + if self.cis_controls: + for ctl in self.ctl_generator(): + if ctl['control-version'] not in profile_version: + continue + ctl_id = ctl['control-id'] + try: + float(ctl_id) + except Exception: + text = f'missing or invalid control-id: "{ctl_id}"' + raise RuntimeError(text) + ctl_list.append(f'{ctl_pfx}{ctl_id}') + return ctl_list + + +class XlsxToCsvHelper: + """Xlsx to csv helper.""" + + def __init__(self, config_object: SectionProxy, tmpdir: str) -> None: + """Initialize.""" + self.config_object = config_object + benchmark_file = self.config_object['benchmark-file'] + self.ipath = pathlib.Path(benchmark_file) + self.xpath = pathlib.Path(tmpdir) / self.ipath.name + path = pathlib.Path(tmpdir) / self.xpath.name + self.opath = path.with_suffix('.csv') + self.wb = load_workbook(self.xpath) + # worksheet + self.ws = self.wb[SheetHelper.get_sheetname()] + self._create_maps() + # excluded columns + default_columns_exclude = [f'"{head_recommendation_no}"', '"Profile"', '"Description"'] + columns_exclude = self.config_object.get('columns-exclude') + if columns_exclude: + columns_exclude = columns_exclude.strip().split(',') + else: + columns_exclude = default_columns_exclude + self._columns_exclude = [] + for col in columns_exclude: + name = PropertyHelper.normalize_name(col.replace('"', '')) + self._columns_exclude.append(name) + # benchmark control prefix + self._benchmark_control_prefix = self.config_object.get( + 'benchmark-control-prefix', default_benchmark_control_prefix + ) + if not self._benchmark_control_prefix.endswith('-'): + self._benchmark_control_prefix = f'{self._benchmark_control_prefix}-' + # benchmark rule prefix + self._benchmark_rule_prefix = self.config_object.get('benchmark-rule-prefix', default_benchmark_rule_prefix) + if not self._benchmark_rule_prefix.endswith('-'): + self._benchmark_rule_prefix = f'{self._benchmark_rule_prefix}-' + + def _create_maps(self) -> Dict: + """Create maps.""" + self._map_col_key_to_number = {} + self._map_name_to_col_key = {} + row = 1 + cols = self.ws.max_column + 1 + for col in range(row, cols): + cell = self.ws.cell(row, col) + if cell.value: + key = self._name_to_key(cell.value) + self._map_col_key_to_number[key] = col + self._map_name_to_col_key[cell.value] = key + + def _name_to_key(self, name: str) -> str: + """Name to key.""" + rval = name.lower() + return rval + + def _sanitize(self, value: str) -> str: + """Sanitize value.""" + rval = value + if value: + rval = value.replace('\n', ' ') + return rval + + def _get_map(self) -> Dict: + """Get map.""" + return self._map_name_to_col_key + + def get_all_columns(self) -> List: + """Get all columns.""" + return self._get_map().keys() + + def row_generator(self) -> Iterator[int]: + """Generate rows until max reached.""" + row = 2 + while row <= self.ws.max_row: + yield row + row += 1 + + def get(self, row: int, name: str) -> str: + """Get cell value for given row and column name.""" + key = self._name_to_key(name) + col = self._map_col_key_to_number[key] + cell = self.ws.cell(row, col) + return self._sanitize(cell.value) + + def is_same_rule(self, row_a: int, row_b: str) -> str: + """Is same rule.""" + rule_a = self.get(row_a, head_recommendation_no) + rule_b = self.get(row_b, head_recommendation_no) + rval = rule_a == rule_b + logger.debug(f'{rval} {rule_a} {rule_b}') + return rval + + def is_excluded_column(self, column: str) -> bool: + """Is excluded column.""" + for item in self._columns_exclude: + if item.lower() == column.lower(): + return True + return False + + def merge_row(self, prev_row: int, curr_row: int) -> bool: + """Merge row.""" + rval = False + if prev_row and self.is_same_rule(prev_row, curr_row): + prof2 = self.get(prev_row, 'Profile') + prof1 = self.get(curr_row, 'Profile') + prof = f'"{prof1}; {prof2}"' + self.csv_helper.delete_last_row() + # col Profile + self.csv_row_mgr.put('Profile', prof) + # add body row + row_body = self.csv_row_mgr.get() + self.csv_helper.add_row(row_body) + rval = True + return rval + + def _is_column(self, c1: str, c2: str) -> bool: + """Is column.""" + if c1.lower() == c2.lower(): + rval = True + else: + rval = False + return rval + + def heading_row_1(self) -> None: + """Heading row 1.""" + self.row_names = [] + for col_name in CsvHelper.columns().keys(): + name = PropertyHelper.normalize_name(col_name) + self.row_names.append(name) + + def heading_row_2(self) -> None: + """Heading row 2.""" + self.row_descs = [] + for col_desc in CsvHelper.columns().values(): + desc = col_desc + self.row_descs.append(desc) + # additional user columns + for col in self.get_all_columns(): + name = PropertyHelper.normalize_name(col) + if self.is_excluded_column(col): + continue + self.row_names.append(name) + self.row_descs.append(name) + # additional non-rule columns + for col in self.non_rule_helper.get_all_columns(): + name = PropertyHelper.normalize_name(col) + self.row_names.append(name) + self.row_descs.append(name) + + def _get_ctl_list(self, prev_row: int, curr_row: int) -> List[str]: + """Get_ctl_list.""" + ctl_list = [] + # if merged row, list is empty + if not self.merge_row(prev_row, curr_row): + # if non-rule row, list is empty + rec_no = self.get(curr_row, head_recommendation_no) + if rec_no is not None: + # get list + cis_controls = self.get(curr_row, 'CIS Controls') + cis_control_helper = CisControlsHelper(cis_controls) + ctl_list = cis_control_helper.get_ctl_list( + self._benchmark_control_prefix, self.config_object['profile-version'] + ) + return ctl_list + + def run(self) -> None: + """Run.""" + self.csv_helper = CsvHelper(self.opath) + self.non_rule_helper = NonRuleHelper(self.config_object, self) + # heading row 1 - names + self.heading_row_1() + # heading row 2 - descriptions + self.heading_row_2() + # add heading rows + self.csv_helper.add_row(self.row_names) + self.csv_helper.add_row(self.row_descs) + # body + user_columns = [] + for col in self.get_all_columns(): + if self.is_excluded_column(col): + continue + user_columns.append(col) + # process each data row of CIS Benchmark file + prev_row = None + for curr_row in self.row_generator(): + ctl_list = self._get_ctl_list(prev_row, curr_row) + if not ctl_list: + continue + # create new row + self.csv_row_mgr = CsvRowMgr(self.row_names) + # col Component_Title + self.csv_row_mgr.put('Component_Title', self.config_object['component-name']) + # col Component_Description + self.csv_row_mgr.put('Component_Description', self.config_object['component-description']) + # col Component_Type + self.csv_row_mgr.put('Component_Type', self.config_object['component-type']) + # col Rule_ID + rec_no = self.get(curr_row, head_recommendation_no) + rule_id = f'{self._benchmark_rule_prefix}{rec_no}' + self.csv_row_mgr.put('Rule_Id', rule_id) + # col Rule_Description + rule_desc = self.get(curr_row, 'Description') + self.csv_row_mgr.put('Rule_Description', rule_desc) + # col Profile_Source + self.csv_row_mgr.put('Profile_Source', self.config_object['profile-source']) + # col Profile_Description + self.csv_row_mgr.put('Profile_Description', self.config_object['profile-description']) + # col Control_Id_List + ctl_list = ' '.join(ctl_list) + self.csv_row_mgr.put('Control_Id_List', ctl_list) + # col Namespace + self.csv_row_mgr.put('Namespace', self.config_object['namespace']) + # col Profile + prof = self.get(curr_row, 'Profile') + self.csv_row_mgr.put('Profile', prof) + # col user + for col in user_columns: + val = self.get(curr_row, col) + if val: + if self._is_column(col, 'cis controls'): + val = f'"{val}"' + name = PropertyHelper.normalize_name(col) + self.csv_row_mgr.put(name, val) + # col group levels + self.non_rule_helper.add_group_levels(rec_no, self.csv_row_mgr) + # add body row + row_body = self.csv_row_mgr.get() + self.csv_helper.add_row(row_body) + # previous row + prev_row = curr_row + # write body + self.csv_helper.save() + + +class NonRuleHelper: + """Non-rule Helper.""" + + def __init__(self, config: SectionProxy, xlsx_helper: XlsxToCsvHelper) -> None: + """Initialize.""" + self.col_prefix = 'Group' + self.col_level = 'Level' + self.config = config + self.xlsx_helper = xlsx_helper + default_columns_carry_forward = [head_section_no, 'Title', 'Description'] + self.columns_carry_forward = self.config.get('columns-carry-forward', default_columns_carry_forward) + self.col_list = [] + self.sec_map = {} + if self.columns_carry_forward: + self._init_col_list() + self._init_sec_map() + + def _init_col_list(self) -> None: + """Init col list.""" + biggest = -1 + for row in self.xlsx_helper.row_generator(): + rec_no = self.xlsx_helper.get(row, head_recommendation_no) + if rec_no: + continue + sec_no = self.xlsx_helper.get(row, head_section_no) + count = sec_no.count('.') + if count > biggest: + biggest = count + for i in range(biggest + 1): + for col_name in self.columns_carry_forward: + name = f'{self.col_prefix}_{col_name}_{self.col_level}_{i}' + name = PropertyHelper.normalize_name(name) + self.col_list.append(name) + + def _init_sec_map(self) -> None: + """Init sec map.""" + for row in self.xlsx_helper.row_generator(): + rec_no = self.xlsx_helper.get(row, head_recommendation_no) + if rec_no: + continue + sec_no = self.xlsx_helper.get(row, head_section_no) + _map = {} + for name in self.columns_carry_forward: + _map[name] = self.xlsx_helper.get(row, name) + self.sec_map[sec_no] = _map + + def get_all_columns(self) -> List: + """Get all columns.""" + return self.col_list + + def add_group_levels(self, rec_no: str, csv_row_mgr: CsvRowMgr) -> None: + """Add group levels.""" + parts = rec_no.split('.')[:-1] + for i, part in enumerate(parts): + if not i: + sec_no = f'{part}' + else: + sec_no = f'{sec_no}.{part}' + if sec_no in self.sec_map.keys(): + _map = self.sec_map[sec_no] + for key in _map.keys(): + val = _map[key] + name = f'{self.col_prefix}_{key}_{self.col_level}_{i}' + name = PropertyHelper.normalize_name(name) + csv_row_mgr.put(name, val) + + +class CsvToJsonHelper: + """Csv to json helper.""" + + def __init__(self, config_object: Optional[configparser.SectionProxy], tmpdir: str) -> None: + """Initialize trestle task.""" + self.config_object = config_object + benchmark_file = self.config_object['benchmark-file'] + self.ipath = pathlib.Path(benchmark_file) + self.xpath = pathlib.Path(tmpdir) / self.ipath.name + path = pathlib.Path(tmpdir) / self.xpath.name + self.opath = path.with_suffix('.csv') + self.config_object['csv-file'] = str(self.opath) + self.csv_to_oscal_cd = CsvToOscalComponentDefinition(self.config_object) + self.config_object['title'] = self.config_object['benchmark-title'] + self.config_object['version'] = self.config_object['benchmark-version'] + + def run(self) -> None: + """Run.""" + return self.csv_to_oscal_cd.execute() diff --git a/trestle/tasks/csv_to_oscal_cd.py b/trestle/tasks/csv_to_oscal_cd.py index c8bf32dff..358dece29 100644 --- a/trestle/tasks/csv_to_oscal_cd.py +++ b/trestle/tasks/csv_to_oscal_cd.py @@ -1576,7 +1576,7 @@ class _CsvMgr(): def __init__(self, csv_path: pathlib.Path) -> None: """Initialize.""" self._csv = [] - with open(csv_path, 'r', newline='') as f: + with open(csv_path, 'r', newline='', encoding='utf8') as f: csv_reader = csv.reader(f, delimiter=',', quoting=csv.QUOTE_MINIMAL) for row in csv_reader: self._csv.append(row)