diff --git a/.gitignore b/.gitignore index 68137b43..d29a9872 100644 --- a/.gitignore +++ b/.gitignore @@ -142,6 +142,7 @@ dmypy.json .DS_Store .AppleDouble .LSOverride +.icloud # Thumbnails ._* diff --git a/docs/user/hier_config/advanced-topics.md b/docs/user/hier_config/advanced-topics.md new file mode 100644 index 00000000..a553b67c --- /dev/null +++ b/docs/user/hier_config/advanced-topics.md @@ -0,0 +1,377 @@ +# Advanced Topics + +## Lineage Rules + +Lineage rules are rules that are written in YAML. They allow users to seek out very specific sections of configurations or even seek out very generalized lines within a configuration. For example, suppose you just wanted to seek out interface descriptions. Your lineage rule would look like: + +```yaml +- lineage: + - startswith: interface + - startswith: description +``` + +In the above example, a start of a lineage is defined with the **- lineage:** syntax. From there the interface is defined with the **- startswith: interface** syntax under the **- lineage:** umbrella. This tells hier_config to search for any configuration that starts with the string **interface** as the parent of a configuration line. When it finds an **interface** parent, it then looks at any child configuration line of the interface that starts with the string **description**. + +With lineage rules, you can get as deep into the children or as shallow as you need. Suppose you want to inspect the existence or absence of http, ssh, snmp, and logging within a configuration. This can be done with a single lineage rule, like so: + +```yaml +- lineage: + - startswith: + - ip ssh + - no ip ssh + - ip http + - no ip http + - snmp-server + - no snmp-server + - logging + - no logging +``` + +Or suppose, you want to inspect whether BGP IPv4 AFIs are activated. You can do this with the following: + +```yaml +- lineage: + - startswith: router bgp + - startswith: address-family ipv4 + - endswith: activate +``` + +In the above example, I utilized a different keyword to look for activated BGP neighbors. The keywords that can be utilized within lineage rules are: +- startswith +- endswith +- contains +- equals +- re_search + +You can also put all of the above examples together in the same set of lineage rules like so: + +```yaml +- lineage: + - startswith: interface + - startswith: description +- lineage: + - startswith: + - ip ssh + - no ip ssh + - ip http + - no ip http + - snmp-server + - no snmp-server + - logging + - no logging +- lineage: + - startswith: router bgp + - startswith: address-family ipv4 + - endswith: activate +``` + +When hier_config consumes the lineage rules, it consumes them as a list of lineage rules and processes them individually. + +## Working with Tags + +With a firm understanding of lineage rules, more complex use cases become available within hier_config. A powerful use case is the ability to tag specific sections of configuration and only display remediations based on those tags. This becomes very handy when you're attempting to execute a maintenance that only targets low risk configuration changes or isolate the more risky configuration changes to scrutinize their execution during a maintenance. + +Tagging expands on the use of the lineage rules by creating an **add_tags** keyword to a lineage rule. + +Suppose you had a running configuration that had an ntp configuration that looked like: + +```text +ntp server 192.0.2.1 prefer version 2 +``` + +However, your intended configuration utilized a publicly available NTP server on the internet: + +```text +ip name-server 1.1.1.1 +ip name-server 8.8.8.8 +ntp server time.nist.gov +``` + +You could create a lineage rule that targeted that specific remediation like this: + +```yaml +- lineage: + - startswith: + - ip name-server + - no ip name-server + - ntp + - no ntp + add_tags: ntp +``` + +Now we can modify the script above to load the tags and create a remediation of the said tags: + +```python +#!/usr/bin/env python3 + +# Import the hier_config Host library +from netutils.hier_config import Host + +# Create a hier_config Host object +host = Host(hostname="aggr-example.rtr", os="ios") + +# Load the tagged lineage rules +host.load_tags_from_file("./tests/fixtures/tags_ios.yml") + +# Load a running configuration from a file +host.load_running_config_from_file("./tests/fixtures/running_config.conf") + +# Load an intended configuration from a file +host.load_generated_config_from_file("./tests/fixtures/generated_config.conf") + +# Create the remediation steps +host.remediation_config() + +# Display the remediation steps for only the "ntp" tags +print(host.remediation_config_filtered_text(include_tags={"ntp"}, exclude_tags={})) +``` + +In the script, we made two changes. The first change is to load the tagged lineage rules: +`host.load_tags_from_file("./tests/fixtures/tags_ios.yml")`. +And the second is to filter the remediation steps by only including steps that are tagged with **ntp** via the **include_tags** argument. + +The remediation looks like: + +```text +no ntp server 192.0.2.1 prefer version 2 +ip name-server 1.1.1.1 +ip name-server 8.8.8.8 +ntp server time.nist.gov +``` + +## hier_config Options + +There are a number of options that can be loaded into hier_config to make it better conform to the nuances of your network device. By default, hier_config loads a set of [sane defaults](https://github.com/netdevops/hier_config/blob/master/hier_config/options.py) for Cisco IOS, IOS XE, IOS XR, NX-OS, and Arista EOS. + +Below are the configuration options available for manipulation. + +```python +base_options: dict = { + "style": None, + "negation": "no", + "syntax_style": "cisco", + "sectional_overwrite": [], + "sectional_overwrite_no_negate": [], + "ordering": [], + "indent_adjust": [], + "parent_allows_duplicate_child": [], + "sectional_exiting": [], + "full_text_sub": [], + "per_line_sub": [], + "idempotent_commands_blacklist": [], + "idempotent_commands": [], + "negation_default_when": [], + "negation_negate_with": [], +} +``` + +The default options can be completely overwritten and loaded from a yaml file, or individual components of the options can be manipulated to provide the functionality that is desired. + +Here is an example of manipulating the built-in options. + +```python +# Import the hier_config Host library +from netutils.hier_config import Host + +# Create a hier_config Host object +host = Host(hostname="aggr-example.rtr", os="ios") + +# Create an NTP negation ordered lineage rule +ordered_negate_ntp = {"lineage": [{"startswith": ["no ntp"], "order": 700}]} + +# Update the hier_config options "ordering" key. +host.hconfig_options["ordering"].append(ordered_negate_ntp) +``` + +Here is an example of completely overwriting the default options and loading in your own. + +```python +# import YAML +import yaml + +# Import the hier_config Host library +from netutils.hier_config import Host + +# Load the hier_config options into memory +with open("./tests/fixtures/options_ios.yml") as f: + options = yaml.load(f.read(), Loader=yaml.SafeLoader) + +# Create a hier_config Host object +host = Host(hostame="aggr-example.rtr", os="ios", hconfig_options=options) +``` + +In the following sections, I'll cover the available options. + +#### style + +The **style** defines the os family. Such as **ios**, **iosxr**, etc. + +Example: + +```yaml +style: ios +``` + +#### negation + +The **negation** defines how an os handles negation. The default is **no**. However, in some circumstances, the negation method is different. Comware, for instance uses **undo** as the negation method and set based syntax uses **delete** for negation. + +```yaml +negation: no +``` + +#### syntax_style + +**syntax_style** is used when using a configuration syntax that is different than Cisco ios-style configuration syntax. The only non-Cisco based syntax supported is **juniper**. Calling the juniper syntax style will call additional parsing methods when loading configurations into memory. + +Default: +```yaml +syntax_style: cisco +``` + +Juniper: +```yaml +syntax_style: juniper +``` + +#### sectional_overwrite_no_negate + +The sectional overwrite with no negate hier_config option will completely overwrite sections of configuration without negating them. This option is often used with the RPL sections of IOS XR devices that require that the entire RPL be re-created when making modifications to them, rather than editing individual lines within the RPL. + +An example of sectional overwrite with no negate is: + +```yaml +sectional_overwrite_no_negate: +- lineage: + - startswith: as-path-set +- lineage: + - startswith: prefix-set +- lineage: + - startswith: route-policy +- lineage: + - startswith: extcommunity-set +- lineage: + - startswith: community-set +``` + +#### sectional_overwrite + +Sectional overwrite is just like sectional overwrite with no negate, except that hier_config will negate a section of configuration and then completely re-create it. + +#### ordering + +Ordering is one of the most useful hier_config options. This allows you to use lineage rules to define the order in which remediation steps are presented to the user. For the ntp example above, the ntp server was negated (`no ntp server 192.0.2.1`) before the new ntp server was added. In most cases, this wouldn't be advantageous. Thus, ordering can be used to define the proper order to execute commands. + +All commands are assigned a default order weight of 500, with a usable order weight of 1 - 999. The smaller the weight value, the higher on the list of steps a command is to be executed. The larger the weight value, the lower on the list of steps a command is to be executed. To create an order in which new ntp servers are added before old ntp servers are removed, you can create an order lineage that weights the negation to the bottom. + +Example: + +```yaml +ordering: +- lineage: + - startswith: no ntp + order: 700 +``` + +With the above order lineage applied, the output of the above ntp example would look like: + +```text +ip name-server 1.1.1.1 +ip name-server 8.8.8.8 +ntp server time.nist.gov +no ntp server 192.0.2.1 prefer version 2 +``` + +#### indent_adjust + +coming soon... + +#### parent_allows_duplicate_child + +coming soon... + +#### sectional_exiting + +Sectional exiting features configuration sections that have a configuration syntax that defines the end of a configuration section. Examples of this are RPL (route policy language) configurations in IOS XR or peer policy and peer session configurations in IOS BGP sections. The sectional exiting configuration allows you to define the configuration syntax so that hier_config can render a remediation that properly exits those configurations. + +An example of sectional exiting is: + +```yaml +sectional_exiting: +- lineage: + - startswith: router bgp + - startswith: template peer-policy + exit_text: exit-peer-policy +- lineage: + - startswith: router bgp + - startswith: template peer-session + exit_text: exit-peer-session +``` + +#### full_text_sub + +Full text sub allows for substitutions of a multi-line string. Regular expressions are commonly used and allowed in this section. An example of this would be: + +```yaml +full_text_sub: +- search: "banner\s(exec|motd)\s(\S)\n(.*\n){1,}(\2)" + replace: "" +``` + +This example simply searches for a banner message in the configuration and replaces it with an empty string. + +#### per_line_sub + +Per line sub allows for substitutions of individual lines. This is commonly used to remove artifacts from a running configuration that don't provide any value when creating remediation steps. + +An example is removing lines such as: + +```text +Building configuration... + +Current configuration : 3781 bytes +``` + +Per line sub can be used to remove those lines: + +```yaml +per_line_sub: +- search: "Building configuration.*" + replace: "" +- search: "Current configuration.*" + replace: "" +``` + +#### idempotent_commands_blacklist + +coming soon... + +#### idempotent_commands + +Idempotent commands are commands that can just be overwritten and don't need negation. Lineage rules can be created to define those commands that are idempotent. + +An example of idempotent commands are: + +```yaml +idempotent_commands: +- lineage: + - startswith: vlan + - startswith: name +- lineage: + - startswith: interface + - startswith: description +``` + +The lineage rules above specify that defining a vlan name and updating an interface description are both idempotent commands. + +#### negation_default_when + +coming soon... + +#### negation_default_with + +coming soon... + +## Custom hier_config Workflows + +Coming soon... diff --git a/docs/user/hier_config/experimental-features.md b/docs/user/hier_config/experimental-features.md new file mode 100644 index 00000000..8f33d27a --- /dev/null +++ b/docs/user/hier_config/experimental-features.md @@ -0,0 +1,342 @@ +# Experimental Features + +Experimental features are those features that work, but haven't been thoroughly tested enough to feel confident to use in production. + +## Rollback Configuration + +Starting in version 2.0.2, a featured called rollback configuraiton was introduced. The rollback configuration is exactly what it sounds like. It renders a rollback configuration in the event that a remediation causes a hiccup when being deployed. The rollback configuration does the inverse on a remediation. Instead of a remediation being renedered based upon the generated config, a rollback remediation is rendered from the generated config based upon the running configuration. + +A rollback configuration can be rendered once the running and generated configurations are loaded. Below is an example. + +```bash +>>> from netutils.hier_config import Host +>>> host = Host(hostname="aggr-example.rtr", os="ios") +>>> host.load_running_config_from_file("./tests/fixtures/running_config.conf") +>>> host.load_generated_config_from_file("./tests/fixtures/generated_config.conf") +>>> rollback = host.rollback_config() +>>> for line in rollback.all_children_sorted(): +... print(line.cisco_style_text()) +... +no vlan 4 +no interface Vlan4 +vlan 3 + name switch_mgmt_10.0.4.0/24 +interface Vlan2 + no mtu 9000 + no ip access-group TEST in + shutdown +interface Vlan3 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 +>>> +``` + + +## Unified diff + +Starting in version 2.1.0, a featured called unified diff was introduced. It provides a similar output to difflib.unified_diff() but is aware of out of order lines and the parent child relationships present in the hier_config model of the configurations being diffed. + +This feature is useful in cases where you need to compare the differences of two network device configurations. Such as comparing the configs of redundant device pairs. Or, comparing running and intended configs. + +In its current state, this algorithm does not consider duplicate child differences. e.g. two instances `endif` in an IOS-XR route-policy. It also does not respect the order of commands where it may count, such as in ACLs. In the case of ACLs, they should contain sequence numbers if order is important. + +```bash +In [1]: list(running_config.unified_diff(generated_config)) +Out[1]: +['vlan 3', + ' - name switch_mgmt_10.0.4.0/24', + ' + name switch_mgmt_10.0.3.0/24', + 'interface Vlan2', + ' - shutdown', + ' + mtu 9000', + ' + ip access-group TEST in', + ' + no shutdown', + 'interface Vlan3', + ' - description switch_mgmt_10.0.4.0/24', + ' - ip address 10.0.4.1 255.255.0.0', + ' + description switch_mgmt_10.0.3.0/24', + ' + ip address 10.0.3.1 255.255.0.0', + '+ vlan 4', + ' + name switch_mgmt_10.0.4.0/24', + '+ interface Vlan4', + ' + mtu 9000', + ' + description switch_mgmt_10.0.4.0/24', + ' + ip address 10.0.4.1 255.255.0.0', + ' + ip access-group TEST in', + ' + no shutdown'] +``` + + + +## Future Config + +Starting in version 2.2.0, a featured called future config was introduced. It attempts to predict the running config after a change is applied. + +This feature is useful in cases where you need to determine what the configuration state will be after a change is applied. Such as: +- Ensuring that a configuration change was applied successfully to a device. + - i.e. Does the post-change config match the predicted future config? +- Providing a future state config that can be fed into batfish, or similar, to predict if a change will cause an impact. +- Building rollback configs. If you have the future config state, then generating a rollback config can be done by simply building the remediation config in the reverse direction `rollback = future.config_to_get_to(running)`. + - If you are building rollbacks for a series of config changes, you can feed the post-change-1 future config into the process for determining the post-change-2 future config e.g. + ```shell + post_change_1_config = running_config.future(change_1_config) + change_1_rollback_config = post_change_1_config.config_to_get_to(running_config) + post_change_2_config = post_change_1_config.future(change_2_config) + change_2_rollback_config = post_change_2_config.config_to_get_to(post_change_1_config) + ... + ``` + +In its current state, this algorithm does not consider: +- negate a numbered ACL when removing an item +- sectional exiting +- negate with +- idempotent command blacklist +- idempotent_acl_check +- and likely others + +```bash +In [1]: from netutils.hier_config import HConfig, Host + ...: + ...: + ...: host = Host("test.dfw1", "ios") + ...: running_config = HConfig(host) + ...: running_config.load_from_file("./tests/fixtures/running_config.conf") + ...: remediation_config = HConfig(host) + ...: remediation_config.load_from_file("./tests/fixtures/remediation_config_without_tags.conf") + ...: future_config = running_config.future(remediation_config) + ...: + ...: print("\n##### running config") + ...: for line in running_config.all_children(): + ...: print(line.cisco_style_text()) + ...: + ...: print("\n##### remediation config") + ...: for line in remediation_config.all_children(): + ...: print(line.cisco_style_text()) + ...: + ...: print("\n##### future config") + ...: for line in future_config.all_children(): + ...: print(line.cisco_style_text()) + ...: + +##### running config +hostname aggr-example.rtr +ip access-list extended TEST + 10 permit ip 10.0.0.0 0.0.0.7 any +vlan 2 + name switch_mgmt_10.0.2.0/24 +vlan 3 + name switch_mgmt_10.0.4.0/24 +interface Vlan2 + descripton switch_10.0.2.0/24 + ip address 10.0.2.1 255.255.255.0 + shutdown +interface Vlan3 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown + +##### remediation config +vlan 3 + name switch_mgmt_10.0.3.0/24 +vlan 4 + name switch_mgmt_10.0.4.0/24 +interface Vlan2 + mtu 9000 + ip access-group TEST in + no shutdown +interface Vlan3 + description switch_mgmt_10.0.3.0/24 + ip address 10.0.3.1 255.255.0.0 +interface Vlan4 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown + +##### future config +vlan 3 + name switch_mgmt_10.0.3.0/24 +vlan 4 + name switch_mgmt_10.0.4.0/24 +interface Vlan2 + mtu 9000 + ip access-group TEST in + descripton switch_10.0.2.0/24 + ip address 10.0.2.1 255.255.255.0 +interface Vlan3 + description switch_mgmt_10.0.3.0/24 + ip address 10.0.3.1 255.255.0.0 + mtu 9000 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown +interface Vlan4 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown +hostname aggr-example.rtr +ip access-list extended TEST + 10 permit ip 10.0.0.0 0.0.0.7 any +vlan 2 + name switch_mgmt_10.0.2.0/24 +``` + +## JunOS-style Syntax Remediation +"set" based operating systems can now be remediated in experimental capacity. Here is an example of a JunOS style remediation. + +``` +$ cat ./tests/fixtures/running_config_flat_junos.confset system host-name aggr-example.rtr + +set firewall family inet filter TEST term 1 from source-address 10.0.0.0/29 +set firewall family inet filter TEST term 1 then accept + +set vlans switch_mgmt_10.0.2.0/24 vlan-id 2 +set vlans switch_mgmt_10.0.2.0/24 l3-interface irb.2 + +set vlans switch_mgmt_10.0.4.0/24 vlan-id 3 +set vlans switch_mgmt_10.0.4.0/24 l3-interface irb.3 + +set interfaces irb unit 2 family inet address 10.0.2.1/24 +set interfaces irb unit 2 family inet description "switch_10.0.2.0/24" +set interfaces irb unit 2 family inet disable + +set interfaces irb unit 3 family inet address 10.0.4.1/16 +set interfaces irb unit 3 family inet filter input TEST +set interfaces irb unit 3 family inet mtu 9000 +set interfaces irb unit 3 family inet description "switch_mgmt_10.0.4.0/24" + + +$ python3 +Python 3.8.10 (default, Nov 22 2023, 10:22:35) +[GCC 9.4.0] on linux +Type "help", "copyright", "credits" or "license" for more information. +>>> import yaml +>>> from netutils.hier_config import Host +>>> +>>> host = Host('example.rtr', 'junos') +>>> +>>> # Build Hierarchical Configuration object for the Running Config +>>> host.load_running_config_from_file("./tests/fixtures/running_config_flat_junos.conf") +>>> +>>> # Build Hierarchical Configuration object for the Generated Config +>>> host.load_generated_config_from_file("./tests/fixtures/generated_config_flat_junos.conf") +>>> +>>> # Build and Print the all lines of the remediation config +>>> print(host.remediation_config_filtered_text({}, {})) +delete vlans switch_mgmt_10.0.4.0/24 vlan-id 3 +delete vlans switch_mgmt_10.0.4.0/24 l3-interface irb.3 +delete interfaces irb unit 2 family inet disable +delete interfaces irb unit 3 family inet address 10.0.4.1/16 +delete interfaces irb unit 3 family inet description "switch_mgmt_10.0.4.0/24" +set vlans switch_mgmt_10.0.3.0/24 vlan-id 3 +set vlans switch_mgmt_10.0.3.0/24 l3-interface irb.3 +set vlans switch_mgmt_10.0.4.0/24 vlan-id 4 +set vlans switch_mgmt_10.0.4.0/24 l3-interface irb.4 +set interfaces irb unit 2 family inet filter input TEST +set interfaces irb unit 2 family inet mtu 9000 +set interfaces irb unit 3 family inet address 10.0.3.1/16 +set interfaces irb unit 3 family inet description "switch_mgmt_10.0.3.0/24" +set interfaces irb unit 4 family inet address 10.0.4.1/16 +set interfaces irb unit 4 family inet filter input TEST +set interfaces irb unit 4 family inet mtu 9000 +set interfaces irb unit 4 family inet description "switch_mgmt_10.0.4.0/24" +``` + +Configurations loaded into Hier Config as Juniper-style syntax are converted to a flat `set` based configuration format. Remediations are then rendered using this `set` style syntax. + +``` +$ cat ./tests/fixtures/running_config_junos.conf +system { + host-name aggr-example.rtr; +} + +firewall { + family inet { + filter TEST { + term 1 { + from { + source-address 10.0.0.0/29; + } + then { + accept; + } + } + } + } +} + +vlans { + switch_mgmt_10.0.2.0/24 { + vlan-id 2; + l3-interface irb.2; + } + switch_mgmt_10.0.4.0/24 { + vlan-id 3; + l3-interface irb.3; + } +} + +interfaces { + irb { + unit 2 { + family inet { + address 10.0.2.1/24; + description "switch_10.0.2.0/24"; + disable; + } + } + unit 3 { + family inet { + address 10.0.4.1/16; + filter { + input TEST; + } + mtu 9000; + description "switch_mgmt_10.0.4.0/24"; + } + } + } +} + +$ python3 +Python 3.8.10 (default, Nov 22 2023, 10:22:35) +[GCC 9.4.0] on linux +Type "help", "copyright", "credits" or "license" for more information. +>>> import yaml +>>> from netutils.hier_config import Host +>>> +>>> host = Host('example.rtr', 'junos') +>>> +>>> # Build Hierarchical Configuration object for the Running Config +>>> host.load_running_config_from_file("./tests/fixtures/running_config_junos.conf") +>>> +>>> # Build Hierarchical Configuration object for the Generated Config +>>> host.load_generated_config_from_file("./tests/fixtures/generated_config_junos.conf") +>>> +>>> # Build and Print the all lines of the remediation config +>>> print(host.remediation_config_filtered_text({}, {})) +delete vlans switch_mgmt_10.0.4.0/24 vlan-id 3 +delete vlans switch_mgmt_10.0.4.0/24 l3-interface irb.3 +delete interfaces irb unit 2 family inet description "switch_10.0.2.0/24" +delete interfaces irb unit 2 family inet disable +delete interfaces irb unit 3 family inet address 10.0.4.1/16 +delete interfaces irb unit 3 family inet description "switch_mgmt_10.0.4.0/24" +set vlans switch_mgmt_10.0.3.0/24 vlan-id 3 +set vlans switch_mgmt_10.0.3.0/24 l3-interface irb.3 +set vlans switch_mgmt_10.0.4.0/24 vlan-id 4 +set vlans switch_mgmt_10.0.4.0/24 l3-interface irb.4 +set interfaces irb unit 2 family inet filter input TEST +set interfaces irb unit 2 family inet mtu 9000 +set interfaces irb unit 2 family inet description "switch_mgmt_10.0.2.0/24" +set interfaces irb unit 3 family inet address 10.0.3.1/16 +set interfaces irb unit 3 family inet description "switch_mgmt_10.0.3.0/24" +set interfaces irb unit 4 family inet address 10.0.4.1/16 +set interfaces irb unit 4 family inet filter input TEST +set interfaces irb unit 4 family inet mtu 9000 +set interfaces irb unit 4 family inet description "switch_mgmt_10.0.4.0/24" +``` \ No newline at end of file diff --git a/docs/user/hier_config/getting-started.md b/docs/user/hier_config/getting-started.md new file mode 100644 index 00000000..18b30f6c --- /dev/null +++ b/docs/user/hier_config/getting-started.md @@ -0,0 +1,124 @@ +# hier_config Up and Running + +Hierarchical Configuration doesn't communicate with devices themselves. It simply reads configuration data and creates a remediation plan based on the input from a running config and the input from a generated config. + +The very first thing that needs to happen is that a hier_config Host object needs to be initiated for a device. To do this, import the hier_config Host class. + +```python +from netutils.hier_config import Host +``` + +With the Host class imported, it can be utilized to create host objects. + +```python +host = Host(hostname="aggr-example.rtr", os="ios") +``` + +Once a host object has been created, the running configuration and generated configurations of a network device can be loaded into the host object. These configurations can be loaded in two ways. If you already have the configurations loaded as strings in memory, you can load them from the strings. + +*Example of loading configs from in memory strings*: +```python + +running_config = """hostname aggr-example.rtr +! +ip access-list extended TEST + 10 permit ip 10.0.0.0 0.0.0.7 any +! +vlan 2 + name switch_mgmt_10.0.2.0/24 +! +vlan 3 + name switch_mgmt_10.0.4.0/24 +! +interface Vlan2 + descripton switch_10.0.2.0/24 + ip address 10.0.2.1 255.255.255.0 + shutdown +! +interface Vlan3 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown""" + +generated_config = """hostname aggr-example.rtr +! +ip access-list extended TEST + 10 permit ip 10.0.0.0 0.0.0.7 any +! +vlan 2 + name switch_mgmt_10.0.2.0/24 +! +vlan 3 + name switch_mgmt_10.0.3.0/24 +! +vlan 4 + name switch_mgmt_10.0.4.0/24 +! +interface Vlan2 + mtu 9000 + descripton switch_10.0.2.0/24 + ip address 10.0.2.1 255.255.255.0 + ip access-group TEST in + no shutdown +! +interface Vlan3 + mtu 9000 + description switch_mgmt_10.0.3.0/24 + ip address 10.0.3.1 255.255.0.0 + ip access-group TEST in + no shutdown +! +interface Vlan4 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown""" + +host.load_running_config(config_text=running_config) +host.load_generated_config(config_text=generated_config) +``` + +The second method for loading configs into the host object is loading the configs from files. + +*Example of loading configs from files.* +```python +host.load_running_config_from_file("./tests/fixtures/running_config.conf") +host.load_generated_config_from_file("./tests/fixtures/generated_config.conf") +``` + +Once the configs are loaded into the host object, a remediation can be created. + +```python +host.remediation_config() +``` + +`host.remediation_config()` is loaded as a python object. To view the results of the remediation, call the `host.remediation_config_filtered_text(include_tags={}, exclude_tags={})` method. + +```python +print(host.remediation_config_filtered_text(include_tags={}, exclude_tags={})) +``` + +> If you're using the examples from the `/tests/fixtures` folder in the [github](https://github.com/netdevops/hier_config/) repository, you should see an output that resembles: + +```text +vlan 3 + name switch_mgmt_10.0.3.0/24 +vlan 4 + name switch_mgmt_10.0.4.0/24 +interface Vlan2 + mtu 9000 + ip access-group TEST in + no shutdown +interface Vlan3 + description switch_mgmt_10.0.3.0/24 + ip address 10.0.3.1 255.255.0.0 +interface Vlan4 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown +``` diff --git a/docs/user/hier_config/index.md b/docs/user/hier_config/index.md new file mode 100644 index 00000000..bf09b397 --- /dev/null +++ b/docs/user/hier_config/index.md @@ -0,0 +1,12 @@ +# Introduction + +Welcome to the Hierarchical Configuration documentation site. Hierarchical Configuration, also known as `hier_config`, is a python library is able to take a running configuration of a network device, compare it to its intended configuration, and build the remediation steps necessary to bring a device into spec with its intended configuration. + +Hierarchical Configuraiton has been used extensively on: + +- [x] Cisco IOS +- [x] Cisco IOSXR +- [x] Cisco NXOS +- [x] Arista EOS + +However, any NOS that utilizes a CLI syntax that is structured in a similar fasion to IOS should work mostly out of the box. \ No newline at end of file diff --git a/netutils/hier_config/__init__.py b/netutils/hier_config/__init__.py new file mode 100644 index 00000000..3b2b66e6 --- /dev/null +++ b/netutils/hier_config/__init__.py @@ -0,0 +1,8 @@ +from netutils.hier_config.base import HConfigBase +from netutils.hier_config.root import HConfig +from netutils.hier_config.child import HConfigChild +from netutils.hier_config.host import Host +from netutils.hier_config import text_match + + +__all__ = [HConfigBase, HConfigChild, HConfig, Host, text_match] diff --git a/netutils/hier_config/base.py b/netutils/hier_config/base.py new file mode 100644 index 00000000..fcbab532 --- /dev/null +++ b/netutils/hier_config/base.py @@ -0,0 +1,490 @@ +from __future__ import annotations +from typing import ( + Optional, + List, + Iterator, + Dict, + Tuple, + Union, + Set, + TYPE_CHECKING, + Type, +) +from logging import getLogger +from abc import ABC, abstractmethod +from functools import cached_property +from itertools import chain + +from netutils.hier_config import text_match + +if TYPE_CHECKING: + from .child import HConfigChild + from .root import HConfig + from .host import Host + +logger = getLogger(__name__) + + +class HConfigBase(ABC): # pylint: disable=too-many-public-methods + def __init__(self) -> None: + self.children: List[HConfigChild] = [] + self.children_dict: Dict[str, HConfigChild] = {} + self.host: Host + + def __str__(self) -> str: + return "\n".join(c.cisco_style_text() for c in self.all_children()) + + def __len__(self) -> int: + return len(list(self.all_children())) + + def __bool__(self) -> bool: + return True + + def __contains__(self, item: str) -> bool: + return item in self.children_dict + + def __eq__(self, other: object) -> bool: + if not isinstance(other, HConfigBase): + return NotImplemented + + if len(self.children) != len(other.children): + return False + + for self_child, other_child in zip(sorted(self.children), sorted(other.children)): + if self_child != other_child: + return False + + return True + + @abstractmethod + def _duplicate_child_allowed_check(self) -> bool: + pass + + @property + @abstractmethod + def options(self) -> dict: + pass + + @property + @abstractmethod + def root(self) -> HConfig: + pass + + @abstractmethod + def lineage(self) -> Iterator[HConfigChild]: + pass + + @abstractmethod + def depth(self) -> int: + pass + + @property + @abstractmethod + def logs(self) -> List[str]: + pass + + @property + @abstractmethod + def _child_class(self) -> Type[HConfigChild]: + pass + + def has_children(self) -> bool: + return bool(self.children) + + def add_children_deep(self, lines: List[str]) -> None: + """Add child instances of HConfigChild deeply""" + if lines: + child = self.add_child(lines.pop(0)) + child.add_children_deep(lines) + + def add_children(self, lines: List[str]) -> None: + """Add child instances of HConfigChild""" + for line in lines: + self.add_child(line) + + def add_child( + self, + text: str, + alert_on_duplicate: bool = False, + idx: Optional[int] = None, + force_duplicate: bool = False, + ) -> HConfigChild: + """Add a child instance of HConfigChild""" + + if idx is None: + idx = len(self.children) + # if child does not exist + if text not in self: + new_item = self._child_class(self, text) # type: ignore + self.children.insert(idx, new_item) + self.children_dict[text] = new_item + return new_item + # if child does exist and is allowed to be installed as a duplicate + if self._duplicate_child_allowed_check() or force_duplicate: + new_item = self._child_class(self, text) # type: ignore + self.children.insert(idx, new_item) + self.rebuild_children_dict() + return new_item + + # If the child is already present and the parent does not allow + # duplicate children, return the existing child + # Ignore duplicate remarks in ACLs + if alert_on_duplicate and not text.startswith("remark "): + self.logs.append(f"Found a duplicate section: {list(self.path()) + [text]}") + return self.children_dict[text] + + def path(self) -> Iterator[str]: + yield from () + + def add_deep_copy_of(self, child_to_add: HConfigChild, merged: bool = False) -> HConfigChild: + """Add a nested copy of a child to self""" + new_child = self.add_shallow_copy_of(child_to_add, merged=merged) + for child in child_to_add.children: + new_child.add_deep_copy_of(child, merged=merged) + + return new_child + + def to_tag_spec(self, tags: Set[str]) -> List[dict]: + """ + Returns the configuration as a tag spec definition + + This is handy when you have a segment of config and need to + generate a tag spec to tag configuration in another instance + """ + tag_spec = [] + for child in self.all_children(): + if not child.children: + child_spec = [{"equals": t} for t in child.path()] + tag_spec.append({"section": child_spec, "add_tags": tags}) + return tag_spec + + def del_child_by_text(self, text: str) -> None: + """Delete all children with the provided text""" + if text in self.children_dict: + self.children[:] = [c for c in self.children if c.text != text] + self.rebuild_children_dict() + + def del_child(self, child: HConfigChild) -> None: + """Delete a child from self.children and self.children_dict""" + try: + self.children.remove(child) + except ValueError: + pass + else: + self.rebuild_children_dict() + + def all_children_sorted_untagged(self) -> Iterator[HConfigChild]: + """Yield all children recursively that are untagged""" + yield from (c for c in self.all_children_sorted() if None in c.tags) + + def all_children_sorted(self) -> Iterator[HConfigChild]: + """Recursively find and yield all children sorted at each hierarchy""" + for child in sorted(self.children): + yield child + yield from child.all_children_sorted() + + def all_children_sorted_with_lineage_rules(self, rules: List[dict]) -> Iterator[HConfigChild]: + """Recursively find and yield all children sorted at each hierarchy given lineage rules""" + yielded = set() + matched: Set[HConfigChild] = set() + # pylint: disable=too-many-nested-blocks + for child in self.all_children_sorted(): + for ancestor in child.lineage(): + if ancestor in matched: + yield child + yielded.add(child) + break + else: + for rule in rules: + if child.lineage_test(rule, False): + matched.add(child) + for ancestor in child.lineage(): + if ancestor in yielded: + continue + yield ancestor + yielded.add(ancestor) + break + + def all_children(self) -> Iterator[HConfigChild]: + """Recursively find and yield all children at each hierarchy""" + for child in self.children: + yield child + yield from child.all_children() + + def get_child(self, test: str, expression: str) -> Optional[HConfigChild]: + """Find a child by text_match rule. If it is not found, return None""" + if test == "equals" and isinstance(expression, str): + return self.children_dict.get(expression, None) + + return next(self.get_children(test, expression), None) + + def get_child_deep(self, test_expression_pairs: List[Tuple[str, str]]) -> Optional[HConfigChild]: + """ + Find a child recursively with a list of test/expression pairs + + e.g. + + .. code:: python + + result = hier_obj.get_child_deep([('equals', 'control-plane'), + ('equals', 'service-policy input system-cpp-policy')]) + """ + + test, expression = test_expression_pairs.pop(0) + if test == "equals": + result = self.children_dict.get(expression, None) + if result and test_expression_pairs: + return result.get_child_deep(test_expression_pairs) + return result + + try: + result = next(self.get_children(test, expression)) + except StopIteration: + return None + if result and test_expression_pairs: + return result.get_child_deep(test_expression_pairs) + return result + + def get_children(self, test: str, expression: str) -> Iterator[HConfigChild]: + """Find all children matching a text_match rule and return them.""" + for child in self.children: + if text_match.dict_call(test, child.text, expression): + yield child + + def add_shallow_copy_of(self, child_to_add: HConfigChild, merged: bool = False) -> HConfigChild: + """Add a nested copy of a child_to_add to self.children""" + + new_child = self.add_child(child_to_add.text) + + if merged: + new_child.instances.append( + { + "hostname": child_to_add.host.hostname, + "comments": child_to_add.comments, + "tags": child_to_add.tags, + } + ) + new_child.comments.update(child_to_add.comments) + new_child.order_weight = child_to_add.order_weight + if child_to_add.is_leaf: + new_child.append_tags({t for t in child_to_add.tags if isinstance(t, str)}) + + return new_child + + def rebuild_children_dict(self) -> None: + """Rebuild self.children_dict""" + self.children_dict = {} + for child in self.children: + self.children_dict.setdefault(child.text, child) + + def delete_all_children(self) -> None: + """Delete all children""" + self.children.clear() + self.rebuild_children_dict() + + def unified_diff(self, target: Union[HConfig, HConfigChild]) -> Iterator[str]: + """ + provides a similar output to difflib.unified_diff() + + In its current state, this algorithm does not consider duplicate child differences. + e.g. two instances `endif` in an IOS-XR route-policy. It also does not respect the + order of commands where it may count, such as in ACLs. In the case of ACLs, they + should contain sequence numbers if order is important. + """ + # if a self child is missing from the target "- self_child.text" + for self_child in self.children: + self_iter = iter((f"{self_child.indentation}{self_child.text}",)) + if target_child := target.children_dict.get(self_child.text, None): + found = self_child.unified_diff(target_child) + if peek := next(found, None): + yield from chain(self_iter, (peek,), found) + else: + yield f"{self_child.indentation}- {self_child.text}" + yield from (f"{c.indentation}- {c.text}" for c in self_child.all_children_sorted()) + # if a target child is missing from self "+ target_child.text" + for target_child in target.children: + if target_child.text not in self.children_dict: + yield f"{target_child.indentation}+ {target_child.text}" + yield from (f"{c.indentation}+ {c.text}" for c in target_child.all_children_sorted()) + + def _future( + self, + config: Union[HConfig, HConfigChild], + future_config: Union[HConfig, HConfigChild], + ) -> None: + """ + The below cases still need to be accounted for: + - negate a numbered ACL when removing an item + - sectional exiting + - negate with + - idempotent command blacklist + - idempotent_acl_check + - and likely others + """ + negated_or_recursed = set() + for config_child in config.children: + # sectional_overwrite + if config_child.sectional_overwrite_check(): + future_config.add_deep_copy_of(config_child) + # sectional_overwrite_no_negate + elif config_child.sectional_overwrite_no_negate_check(): + future_config.add_deep_copy_of(config_child) + # Idempotent commands + elif self_child := config_child.idempotent_for(self.children): + future_config.add_deep_copy_of(config_child) + negated_or_recursed.add(self_child.text) + # config_child is already in self + elif self_child := self.get_child("equals", config_child.text): + future_child = future_config.add_shallow_copy_of(self_child) + # pylint: disable=protected-access + self_child._future(config_child, future_child) + negated_or_recursed.add(config_child.text) + # config_child is being negated + elif config_child.text.startswith(self._negation_prefix): + unnegated_command = config_child.text[len(self._negation_prefix) :] # noqa: E203 + if self.get_child("equals", unnegated_command): + negated_or_recursed.add(unnegated_command) + # Account for "no ..." commands in the running config + else: + future_config.add_shallow_copy_of(config_child) + # config_child is not in self and doesn't match a special case + else: + future_config.add_deep_copy_of(config_child) + + for self_child in self.children: + # self_child matched an above special case and should be ignored + if self_child.text in negated_or_recursed: + continue + # self_child was not modified above and should be present in the future config + future_config.add_deep_copy_of(self_child) + + def _with_tags(self, tags: Set[str], new_instance: Union[HConfig, HConfigChild]) -> Union[HConfig, HConfigChild]: + """ + Returns a new instance containing only sub-objects + with one of the tags in tags + """ + for child in self.children: + if tags.intersection(child.tags): + new_child = new_instance.add_shallow_copy_of(child) + # pylint: disable=protected-access + child._with_tags(tags, new_instance=new_child) + + return new_instance + + def _config_to_get_to( + self, target: Union[HConfig, HConfigChild], delta: Union[HConfig, HConfigChild] + ) -> Union[HConfig, HConfigChild]: + """ + Figures out what commands need to be executed to transition from self to target. + self is the source data structure(i.e. the running_config), + target is the destination(i.e. generated_config) + + """ + self._config_to_get_to_left(target, delta) + self._config_to_get_to_right(target, delta) + + return delta + + @staticmethod + def _strip_acl_sequence_number(hier_child: HConfigChild) -> str: + words = hier_child.text.split() + if words[0].isdecimal(): + words.pop(0) + return " ".join(words) + + def _difference( + self, + target: Union[HConfig, HConfigChild], + delta: Union[HConfig, HConfigChild], + in_acl: bool = False, + target_acl_children: Optional[Dict[str, HConfigChild]] = None, + ) -> Union[HConfig, HConfigChild]: + for self_child in self.children: + # Not dealing with negations and defaults for now + if self_child.text.startswith((self._negation_prefix, "default ")): + continue + + if in_acl: + # Ignore ACL sequence numbers + if not isinstance(target_acl_children, dict): + raise TypeError(f"{target_acl_children} is not a dict.") + else: + target_child = target_acl_children.get(self._strip_acl_sequence_number(self_child)) + else: + target_child = target.get_child("equals", self_child.text) + + if target_child is None: + delta.add_deep_copy_of(self_child) + else: + delta_child = delta.add_child(self_child.text) + sw_matches = tuple(f"ip{x} access-list " for x in ("", "v4", "v6")) + + if self_child.text.startswith(sw_matches): + # pylint: disable=protected-access + self_child._difference( + target_child, + delta_child, + in_acl=True, + target_acl_children={self._strip_acl_sequence_number(c): c for c in target_child.children}, + ) + else: + self_child._difference(target_child, delta_child) # pylint: disable=protected-access + + if not delta_child.children: + delta_child.delete() + + return delta + + def _config_to_get_to_left(self, target: Union[HConfig, HConfigChild], delta: Union[HConfig, HConfigChild]) -> None: + # find self.children that are not in target.children + # i.e. what needs to be negated or defaulted + # Also, find out if another command in self.children will overwrite + # i.e. be idempotent + for self_child in self.children: + if self_child.text in target: + continue + if self_child.is_idempotent_command(target.children): + continue + + # in other but not self + # add this node but not any children + if self_child.text.startswith("set") and self.options["negation"] == "delete": + self_child.text = self_child.text.replace("set ", "", 1) + + deleted = delta.add_child(self_child.text) + deleted.negate() + if self_child.children: + deleted.comments.add(f"removes {len(self_child.children) + 1} lines") + + def _config_to_get_to_right( + self, target: Union[HConfig, HConfigChild], delta: Union[HConfig, HConfigChild] + ) -> None: + # find what would need to be added to source_config to get to self + for target_child in target.children: + # if the child exist, recurse into its children + if self_child := self.get_child("equals", target_child.text): + # This creates a new HConfigChild object just in case there are some delta children + # Not very efficient, think of a way to not do this + subtree = delta.add_child(target_child.text) + # pylint: disable=protected-access + self_child._config_to_get_to(target_child, subtree) + if not subtree.children: + subtree.delete() + # Do we need to rewrite the child and its children as well? + elif self_child.sectional_overwrite_check(): + target_child.overwrite_with(self_child, delta, True) + elif self_child.sectional_overwrite_no_negate_check(): + target_child.overwrite_with(self_child, delta, False) + # the child is absent, add it + else: + new_item = delta.add_deep_copy_of(target_child) + # mark the new item and all of its children as new_in_config + new_item.new_in_config = True + for child in new_item.all_children(): + child.new_in_config = True + if new_item.children: + new_item.comments.add("new section") + + @cached_property + def _negation_prefix(self) -> str: + return str(self.options["negation"]) + " " diff --git a/netutils/hier_config/child.py b/netutils/hier_config/child.py new file mode 100644 index 00000000..a1499ab1 --- /dev/null +++ b/netutils/hier_config/child.py @@ -0,0 +1,486 @@ +from __future__ import annotations +from typing import ( + Optional, + Set, + Union, + Iterator, + List, + TYPE_CHECKING, + Type, + Iterable, + Tuple, +) +from logging import getLogger +from itertools import chain + +from netutils.hier_config.base import HConfigBase +from netutils.hier_config import text_match + +if TYPE_CHECKING: + from .root import HConfig + + +logger = getLogger(__name__) + + +# pylint: disable=too-many-instance-attributes,too-many-public-methods +class HConfigChild(HConfigBase): + def __init__(self, parent: Union[HConfig, HConfigChild], text: str): + super().__init__() + self.parent = parent + self.host = self.root.host + self._text: str = text.strip() + self.real_indent_level: int + # The intent is for self.order_weight values to range from 1 to 999 + # with the default weight being 500 + self.order_weight: int = 500 + self._tags: Set[str] = set() + self.comments: Set[str] = set() + self.new_in_config: bool = False + self.instances: List[dict] = [] + self.facts: dict = {} # To store externally inserted facts + + def __repr__(self) -> str: + return f"HConfigChild(HConfig{'' if self.parent is self.root else 'Child'}, {self.text})" + + def __lt__(self, other: HConfigChild) -> bool: + return self.order_weight < other.order_weight + + def __hash__(self) -> int: + return id(self) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, HConfigChild): + return NotImplemented + + if ( + self.text != other.text + or self.tags != other.tags + or self.comments != other.comments + or self.new_in_config != other.new_in_config + ): + return False + return super().__eq__(other) + + def __ne__(self, other: object) -> bool: + return not self.__eq__(other) + + @property + def text(self) -> str: + return self._text + + @text.setter + def text(self, value: str) -> None: + """ + Used for when self.text is changed after the object + is instantiated to rebuild the children dictionary + """ + self._text = value.strip() + self.parent.rebuild_children_dict() + + @property + def root(self) -> HConfig: + """returns the HConfig object at the base of the tree""" + return self.parent.root + + @property + def logs(self) -> List[str]: + return self.root.logs + + @property + def options(self) -> dict: + return self.root.options + + @property + def _child_class(self) -> Type[HConfigChild]: + return HConfigChild + + def depth(self) -> int: + """Returns the distance to the root HConfig object i.e. indent level""" + return self.parent.depth() + 1 + + def move(self, new_parent: Union[HConfig, HConfigChild]) -> None: + """ + move one HConfigChild object to different HConfig parent object + + .. code:: python + + hier1 = HConfig(host=host) + interface1 = hier1.add_child('interface Vlan2') + interface1.add_child('ip address 10.0.0.1 255.255.255.252') + + hier2 = Hconfig(host) + + interface1.move(hier2) + + :param new_parent: HConfigChild object -> type list + :return: None + """ + new_parent.children.append(self) + new_parent.rebuild_children_dict() + self.delete() + + def lineage(self) -> Iterator[HConfigChild]: + """Yields the lineage of parent objects, up to but excluding the root""" + yield from self.parent.lineage() + yield self + + def path(self) -> Iterator[str]: + """Return a list of the text instance variables from self.lineage""" + for hier_object in self.lineage(): + yield hier_object.text + + def cisco_style_text(self, style: str = "without_comments", tag: Optional[str] = None) -> str: + """Return a Cisco style formated line i.e. indentation_level + text ! comments""" + + comments = [] + if style == "without_comments": + pass + elif style == "merged": + # count the number of instances that have the tag + instance_count = 0 + instance_comments: Set[str] = set() + for instance in self.instances: + if tag is None or tag in instance["tags"]: + instance_count += 1 + instance_comments.update(instance["comments"]) + + # should the word 'instance' be plural? + word = "instance" if instance_count == 1 else "instances" + + comments.append(f"{instance_count} {word}") + comments.extend(instance_comments) + elif style == "with_comments": + comments.extend(self.comments) + + comments_str = f" !{', '.join(sorted(comments))}" if comments else "" + return f"{self.indentation}{self.text}{comments_str}" + + @property + def indentation(self) -> str: + return " " * (self.depth() - 1) + + def delete(self) -> None: + """Delete the current object from its parent""" + self.parent.del_child(self) + + def append_tag(self, tag: str) -> None: + """ + Add a tag to self._tags on all leaf nodes + """ + if self.is_branch: + for child in self.children: + child.append_tag(tag) + else: + self._tags.add(tag) + + def append_tags(self, tags: Union[str, List[str], Set[str]]) -> None: + """ + Add tags to self._tags on all leaf nodes + """ + tags = self._to_set(tags) + if self.is_branch: + for child in self.children: + child.append_tags(tags) + else: + self._tags.update(tags) + + def remove_tag(self, tag: str) -> None: + """ + Remove a tag from self._tags on all leaf nodes + """ + if self.is_branch: + for child in self.children: + child.remove_tag(tag) + else: + self._tags.remove(tag) + + def remove_tags(self, tags: Union[str, List[str], Set[str]]) -> None: + """ + Remove tags from self._tags on all leaf nodes + """ + tags = self._to_set(tags) + if self.is_branch: + for child in self.children: + child.remove_tags(tags) + else: + self._tags.difference_update(tags) + + def negate(self) -> HConfigChild: + """Negate self.text""" + for rule in self.options["negation_negate_with"]: + if self.lineage_test(rule): + self.text = rule["use"] + return self + + for rule in self.options["negation_default_when"]: + if self.lineage_test(rule): + return self._default() + + return self._swap_negation() + + @property + def is_leaf(self) -> bool: + """returns True if there are no children and is not an instance of HConfig""" + return not self.is_branch + + @property + def is_branch(self) -> bool: + """returns True if there are children or is an instance of HConfig""" + return bool(self.children) + + @property + def tags(self) -> Set[Optional[str]]: + """Recursive access to tags on all leaf nodes""" + if self.is_branch: + found_tags = set() + for child in self.children: + found_tags.update(child.tags) + return found_tags + + # The getter can return a set containing None + # while the setter only accepts a set containing strs. + # mypy doesn't like this + return self._tags or {None} # type: ignore + + @tags.setter + def tags(self, value: Set[str]) -> None: + """Recursive access to tags on all leaf nodes""" + if self.is_branch: + for child in self.children: + # see comment in getter + child.tags = value # type: ignore + else: + self._tags = value + + def is_idempotent_command(self, other_children: Iterable[HConfigChild]) -> bool: + """Determine if self.text is an idempotent change.""" + # Blacklist commands from matching as idempotent + for rule in self.options["idempotent_commands_blacklist"]: + if self.lineage_test(rule, True): + return False + + # Handles idempotent acl entry identification + if self._idempotent_acl_check(): + if self.host.os in {"iosxr"}: + self_sn = self.text.split(" ", 1)[0] + for other_child in other_children: + other_sn = other_child.text.split(" ", 1)[0] + if self_sn == other_sn: + return True + + # Idempotent command identification + return bool(self.idempotent_for(other_children)) + + def idempotent_for(self, other_children: Iterable[HConfigChild]) -> Optional[HConfigChild]: + for rule in self.options["idempotent_commands"]: + if self.lineage_test(rule, True): + for other_child in other_children: + if other_child.lineage_test(rule, True): + return other_child + return None + + def sectional_overwrite_no_negate_check(self) -> bool: + """ + Check self's text to see if negation should be handled by + overwriting the section without first negating it + """ + for rule in self.options["sectional_overwrite_no_negate"]: + if self.lineage_test(rule): + return True + return False + + def sectional_overwrite_check(self) -> bool: + """Determines if self.text matches a sectional overwrite rule""" + for rule in self.options["sectional_overwrite"]: + if self.lineage_test(rule): + return True + return False + + def overwrite_with( + self, + other: HConfigChild, + delta: Union[HConfig, HConfigChild], + negate: bool = True, + ) -> None: + """Deletes delta.child[self.text], adds a deep copy of self to delta""" + if other.children != self.children: + if negate: + delta.del_child_by_text(self.text) + deleted = delta.add_child(self.text).negate() + deleted.comments.add("dropping section") + if self.children: + delta.del_child_by_text(self.text) + new_item = delta.add_deep_copy_of(self) + new_item.comments.add("re-create section") + + def line_inclusion_test(self, include_tags: Set[str], exclude_tags: Set[str]) -> bool: + """ + Given the line_tags, include_tags, and exclude_tags, + determine if the line should be included + """ + include_line = False + + if include_tags: + include_line = bool(self.tags.intersection(include_tags)) + if exclude_tags and (include_line or not include_tags): + include_line = not bool(self.tags.intersection(exclude_tags)) + + return include_line + + def all_children_sorted_by_tags(self, include_tags: Set[str], exclude_tags: Set[str]) -> Iterator[HConfigChild]: + """Yield all children recursively that match include/exclude tags""" + if self.is_leaf: + if self.line_inclusion_test(include_tags, exclude_tags): + yield self + else: + self_iter = iter((self,)) + for child in sorted(self.children): + included_children = child.all_children_sorted_by_tags(include_tags, exclude_tags) + if peek := next(included_children, None): + yield from chain(self_iter, (peek,), included_children) + + def lineage_test(self, rule: dict, strip_negation: bool = False) -> bool: + """A generic test against a lineage of HConfigChild objects""" + if rule.get("match_leaf", False): + lineage_obj: Iterator[HConfigChild] = (o for o in (self,)) + lineage_depth = 1 + else: + lineage_obj = self.lineage() + lineage_depth = self.depth() + + rule_lineage_len = len(rule["lineage"]) + if rule_lineage_len != lineage_depth: + return False + + matches = 0 + for lineage_rule, section in zip(rule["lineage"], lineage_obj): + object_rules, text_match_rules = self._explode_lineage_rule(lineage_rule) + + if not self._lineage_eval_object_rules(object_rules, section): + return False + + # This removes negations for each section but honestly, + # we really only need to do this on the last one + if strip_negation: + if section.text.startswith(self._negation_prefix): + text = section.text[len(self._negation_prefix) :] # noqa: E203 + elif section.text.startswith("default "): + text = section.text[8:] + else: + text = section.text + else: + text = section.text + + if self._lineage_eval_text_match_rules(text_match_rules, text): + matches += 1 + continue + return False + + return matches == rule_lineage_len + + def _swap_negation(self) -> HConfigChild: + """Swap negation of a self.text""" + if self.text.startswith(self._negation_prefix): + self.text = self.text[len(self._negation_prefix) :] # noqa: E203 + else: + self.text = self._negation_prefix + self.text + + return self + + def _default(self) -> HConfigChild: + """Default self.text""" + if self.text.startswith(self._negation_prefix): + self.text = "default " + self.text[len(self._negation_prefix) :] # noqa: E203 + else: + self.text = "default " + self.text + return self + + def _idempotent_acl_check(self) -> bool: + """ + Handle conditional testing to determine if idempotent acl handling for iosxr should be used + """ + if self.host.os in {"iosxr"}: + if isinstance(self.parent, HConfigChild): + acl = ("ipv4 access-list ", "ipv6 access-list ") + if self.parent.text.startswith(acl): + return True + return False + + @staticmethod + def _explode_lineage_rule(rule: dict) -> Tuple[list, list]: + text_match_rules: List[dict] = [] + object_rules = [] + for test, expression in rule.items(): + if test in {"new_in_config", "negative_intersection_tags"}: + object_rules.append({"test": test, "expression": expression}) + elif test == "equals": + if isinstance(expression, list): + text_match_rules.append({"test": test, "expression": set(expression)}) + else: + text_match_rules.append({"test": test, "expression": {expression}}) + elif test in {"startswith", "endswith"}: + if isinstance(expression, list): + text_match_rules.append({"test": test, "expression": tuple(expression)}) + else: + text_match_rules.append({"test": test, "expression": (expression,)}) + elif isinstance(expression, list): + text_match_rules += [{"test": test, "expression": e} for e in expression] + else: + text_match_rules += [{"test": test, "expression": expression}] + return object_rules, text_match_rules + + def _lineage_eval_object_rules(self, rules: list, section: HConfigChild) -> bool: + """ + Evaluate a list of lineage object rules. + + All object rules must match in order to return True + + """ + matches = 0 + for rule in rules: + if rule["test"] == "new_in_config": + if rule["expression"] == section.new_in_config: + matches += 1 + continue + return False + if rule["test"] == "negative_intersection_tags": + rule["expression"] = self._to_list(rule["expression"]) + if not set(rule["expression"]).intersection(section.tags): + matches += 1 + continue + return False + return matches == len(rules) + + @staticmethod + def _lineage_eval_text_match_rules(rules: list, text: str) -> bool: + """ + Evaluate a list of lineage text_match rules. + + Only one text_match rule must match in order to return True + """ + for rule in rules: + if text_match.dict_call(rule["test"], text, rule["expression"]): + return True + return False + + @staticmethod + def _to_list(obj: Union[list, object]) -> list: + return obj if isinstance(obj, list) else [obj] + + @staticmethod + def _to_set(items: Union[str, List[str], Set[str]]) -> Set[str]: + # There's code out in the wild that passes List[str] or str, need to normalize for now + if isinstance(items, list): + return set(items) + if isinstance(items, str): + return {items} + # Assume it's a set of str + return items + + def _duplicate_child_allowed_check(self) -> bool: + """Determine if duplicate(identical text) children are allowed under the parent""" + for rule in self.options["parent_allows_duplicate_child"]: + if self.lineage_test(rule): + return True + return False diff --git a/netutils/hier_config/exceptions.py b/netutils/hier_config/exceptions.py new file mode 100644 index 00000000..0c5b5891 --- /dev/null +++ b/netutils/hier_config/exceptions.py @@ -0,0 +1,20 @@ +"""Hier Config Exceptions.""" + + +class HostAttrError(Exception): + """Hier Config Host attribute exception.""" + + def __init__(self, expression, message): + """Initialize exception.""" + self.expression = expression + self.message = message + super().__init__(f"{expression}: {message}") + + +class HierConfigError(Exception): + """Hier Config generic exception.""" + + def __init__(self, message): + """Initialize exception.""" + self.message = message + super().__init__(f"{message}") diff --git a/netutils/hier_config/host.py b/netutils/hier_config/host.py new file mode 100644 index 00000000..1ea5c03e --- /dev/null +++ b/netutils/hier_config/host.py @@ -0,0 +1,178 @@ +from functools import lru_cache +from typing import List, Set, Union, Optional + +import yaml + +from netutils.hier_config.root import HConfig +from netutils.hier_config.options import options_for + + +class Host: + """ + A host object is a convenient way to loading host inventory + items into a single object. + + The default is to load "hostname", "os", and "options" to the host object, + however, it can easily be extended for developer needs. + + .. code:: python + + import yaml + from hier_config.host import Host + + options = yaml.load(open("./tests/fixtures/options_ios.yml"), loader=yaml.SafeLoader()) + host = Host("example.rtr", "ios", options) + + # Example of loading running config and generated configs into a host object + host.load_running_config_from_file("./tests/files/running_config.conf) + host.load_generated_config_from_file("./tests/files/generated_config.conf) + + # Example of loading hier-config tags into a host object + host.load_tags("./tests/fixtures/tags_ios.yml") + + # Example of creating a remediation config without a tag targeting specific config + host.remediation_config() + + # Example of creating a remediation config with a tag ("safe") targeting a specific config. + host.remediation_config_filtered_text({"safe"}, set()}) + """ + + def __init__( # pylint: disable=dangerous-default-value + self, + hostname: str, + os: str, + hconfig_options: dict = {}, + ): + self.hostname = hostname + self.os = os + self.hconfig_options = hconfig_options if hconfig_options else options_for(self.os) + self._hconfig_tags: List[dict] = [] + self._running_config: Optional[HConfig] = None + self._generated_config: Optional[HConfig] = None + + def __repr__(self) -> str: + return f"Host(hostname={self.hostname})" + + @property + def running_config(self) -> Optional[HConfig]: + """running configuration property""" + if self._running_config is None: + self._running_config = self._get_running_config() + return self._running_config + + @property + def generated_config(self) -> Optional[HConfig]: + """generated configuration property""" + if self._generated_config is None: + self._generated_config = self._get_generated_config() + return self._generated_config + + @lru_cache() + def remediation_config(self) -> HConfig: + """ + Once self.running_config and self.generated_config have been created, + create self.remediation_config + """ + if self.running_config and self.generated_config: + remediation = self.running_config.config_to_get_to(self.generated_config) + else: + raise AttributeError("Missing host.running_config or host.generated_config") + + remediation.add_sectional_exiting() + remediation.set_order_weight() + remediation.add_tags(self.hconfig_tags) + + return remediation + + @lru_cache() + def rollback_config(self) -> HConfig: + """ + Once a self.running_config and self.generated_config have been created, + generate a self.rollback_config + """ + if self.running_config and self.generated_config: + rollback = self.generated_config.config_to_get_to(self.running_config) + else: + raise AttributeError("Missing host.running_config or host.generated_config") + + rollback.add_sectional_exiting() + rollback.set_order_weight() + rollback.add_tags(self.hconfig_tags) + + return rollback + + @property + def hconfig_tags(self) -> List[dict]: + """hier-config tags property""" + return self._hconfig_tags + + def load_running_config_from_file(self, file: str) -> None: + config = self._load_from_file(file) + if not isinstance(config, str): + raise TypeError + self.load_running_config(config) + + def load_running_config(self, config_text: str) -> None: + self._running_config = self._load_config(config_text) + + def load_generated_config_from_file(self, file: str) -> None: + config = self._load_from_file(file) + if not isinstance(config, str): + raise TypeError + self.load_generated_config(config) + + def load_generated_config(self, config_text: str) -> None: + self._generated_config = self._load_config(config_text) + + def remediation_config_filtered_text(self, include_tags: Set[str], exclude_tags: Set[str]) -> str: + config = self.remediation_config() + if include_tags or exclude_tags: + children = config.all_children_sorted_by_tags(include_tags, exclude_tags) + else: + children = config.all_children_sorted() + + return "\n".join(c.cisco_style_text() for c in children) + + def load_tags(self, tags: list) -> None: + """ + Loads lineage rules that set tags + + Example: + Specify to load lineage rules from a dictionary. + + .. code:: python + + tags = [{"lineage": [{"startswith": "interface"}], "add_tags": "interfaces"}] + host.load_tags(tags) + + :param tags: tags + """ + self._hconfig_tags = tags + + def load_tags_from_file(self, file: str) -> None: + tags_from_file = self._load_from_file(file, True) + if not isinstance(tags_from_file, list): + raise TypeError + self.load_tags(tags_from_file) + + def _load_config(self, config_text: str) -> HConfig: + hier = HConfig(host=self) + hier.load_from_string(config_text) + return hier + + @staticmethod + def _load_from_file(name: str, parse_yaml: bool = False) -> Union[list, dict, str]: + """Opens a config file and loads it as a string.""" + with open(name) as file: # pylint: disable=unspecified-encoding + content = file.read() + + if parse_yaml: + content = yaml.safe_load(content) + + return content + + def _get_running_config(self) -> HConfig: + return NotImplemented + + def _get_generated_config(self) -> HConfig: + return NotImplemented diff --git a/netutils/hier_config/options.py b/netutils/hier_config/options.py new file mode 100644 index 00000000..de4b87ab --- /dev/null +++ b/netutils/hier_config/options.py @@ -0,0 +1,709 @@ +base_options: dict = { + "style": None, + "negation": "no", + "syntax_style": "cisco", + "sectional_overwrite": [], + "sectional_overwrite_no_negate": [], + "ordering": [], + "indent_adjust": [], + "parent_allows_duplicate_child": [], + "sectional_exiting": [], + "full_text_sub": [], + "per_line_sub": [], + "idempotent_commands_blacklist": [], + "idempotent_commands": [], + "negation_default_when": [], + "negation_negate_with": [], +} +ios_options: dict = { + "style": "ios", + "ordering": [ + {"lineage": [{"startswith": "no vlan filter"}], "order": 700}, + { + "lineage": [ + {"startswith": "interface"}, + {"startswith": "no shutdown"}, + ], + "order": 700, + }, + ], + "sectional_exiting": [ + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "template peer-policy"}, + ], + "exit_text": "exit-peer-policy", + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "template peer-session"}, + ], + "exit_text": "exit-peer-session", + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "address-family"}, + ], + "exit_text": "exit-address-family", + }, + ], + "per_line_sub": [ + {"search": "^Building configuration.*", "replace": ""}, + {"search": "^Current configuration.*", "replace": ""}, + {"search": "^! Last configuration change.*", "replace": ""}, + {"search": "^! NVRAM config last updated.*", "replace": ""}, + {"search": "^ntp clock-period .*", "replace": ""}, + {"search": "^version.*", "replace": ""}, + {"search": "^ logging event link-status$", "replace": ""}, + {"search": "^ logging event subif-link-status$", "replace": ""}, + {"search": "^\\s*ipv6 unreachables disable$", "replace": ""}, + {"search": "^end$", "replace": ""}, + {"search": "^\\s*[#!].*", "replace": ""}, + {"search": "^ no ip address", "replace": ""}, + {"search": "^ exit-peer-policy", "replace": ""}, + {"search": "^ exit-peer-session", "replace": ""}, + {"search": "^ exit-address-family", "replace": ""}, + {"search": "^crypto key generate rsa general-keys.*$", "replace": ""}, + ], + "idempotent_commands": [ + {"lineage": [{"startswith": "vlan"}, {"startswith": "name"}]}, + {"lineage": [{"startswith": "interface"}, {"startswith": "description"}]}, + {"lineage": [{"startswith": "interface"}, {"startswith": "ip address"}]}, + ], +} +iosxe_options: dict = { + "style": "ios", + "sectional_overwrite": [{"lineage": [{"startswith": "ipv6 access-list"}]}], + "sectional_exiting": [ + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "template peer-policy"}, + ], + "exit_text": "exit-peer-policy", + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "template peer-session"}, + ], + "exit_text": "exit-peer-session", + }, + { + "lineage": [{"startswith": "router bgp"}, {"startswith": "address-family"}], + "exit_text": "exit-address-family", + }, + ], + "per_line_sub": [ + {"search": "^Building configuration.*", "replace": ""}, + {"search": "^Current configuration.*", "replace": ""}, + {"search": "^! Last configuration change.*", "replace": ""}, + {"search": "^! NVRAM config last updated.*", "replace": ""}, + {"search": "^ntp clock-period .*", "replace": ""}, + {"search": "^version.*", "replace": ""}, + {"search": "^ logging event link-status$", "replace": ""}, + {"search": "^ logging event subif-link-status$", "replace": ""}, + {"search": "^\\s*ipv6 unreachables disable$", "replace": ""}, + {"search": "^end$", "replace": ""}, + {"search": "^ no ip address", "replace": ""}, + {"search": "^ exit-peer-policy", "replace": ""}, + {"search": "^ exit-peer-session", "replace": ""}, + {"search": "^ exit-address-family", "replace": ""}, + ], + "idempotent_commands": [ + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": ["log-adjacency-changes"]}, + ] + }, + {"lineage": [{"startswith": "router ospf"}, {"startswith": ["router-id"]}]}, + { + "lineage": [ + {"startswith": "ipv6 router ospf"}, + {"startswith": ["log-adjacency-changes"]}, + ] + }, + { + "lineage": [ + {"startswith": "ipv6 router ospf"}, + {"startswith": ["router-id"]}, + ] + }, + {"lineage": [{"startswith": "router bgp"}, {"startswith": "bgp router-id"}]}, + { + "lineage": [ + {"startswith": "router bgp"}, + {"re_search": "neighbor \\S+ description"}, + ] + }, + {"lineage": [{"startswith": ["hostname"]}]}, + {"lineage": [{"contains": ["source-interface", "trap-source"]}]}, + {"lineage": [{"startswith": ["snmp-server community"]}]}, + {"lineage": [{"startswith": ["mac address-table aging-time"]}]}, + {"lineage": [{"startswith": ["aaa authentication"]}]}, + {"lineage": [{"startswith": ["aaa authorization"]}]}, + {"lineage": [{"startswith": ["errdisable recovery"]}]}, + {"lineage": [{"startswith": "line"}, {"startswith": ["access-class"]}]}, + {"lineage": [{"startswith": "line"}, {"startswith": ["ipv6 access-class"]}]}, + {"lineage": [{"startswith": "interface"}, {"startswith": ["ip ospf cost"]}]}, + {"lineage": [{"startswith": "interface"}, {"startswith": ["ipv6 ospf cost"]}]}, + { + "lineage": [ + {"startswith": "interface"}, + {"re_search": ["standby \\d authentication"]}, + ] + }, + { + "lineage": [ + {"startswith": "interface"}, + {"re_search": ["standby \\d priority"]}, + ] + }, + {"lineage": [{"startswith": "username admin "}]}, + { + "lineage": [ + {"startswith": "policy-map system-cpp-policy"}, + {"startswith": "class"}, + {"startswith": "police"}, + ] + }, + {"lineage": [{"startswith": "banner"}]}, + {"lineage": [{"startswith": "logging facility"}]}, + {"lineage": [{"startswith": "ip tftp source-interface"}]}, + {"lineage": [{"startswith": "snmp-server trap-source"}]}, + {"lineage": [{"startswith": "power redundancy-mode"}]}, + ], +} +iosxr_options: dict = { + "style": "iosxr", + "ordering": [ + {"lineage": [{"startswith": "vrf "}], "order": 300}, + {"lineage": [{"startswith": "no vrf "}], "order": 700}, + ], + "sectional_overwrite": [{"lineage": [{"startswith": "template"}]}], + "sectional_overwrite_no_negate": [ + {"lineage": [{"startswith": "as-path-set"}]}, + {"lineage": [{"startswith": "prefix-set"}]}, + {"lineage": [{"startswith": "route-policy"}]}, + {"lineage": [{"startswith": "extcommunity-set"}]}, + {"lineage": [{"startswith": "community-set"}]}, + ], + "parent_allows_duplicate_child": [{"lineage": [{"startswith": "route-policy"}]}], + "sectional_exiting": [ + {"lineage": [{"startswith": "route-policy"}], "exit_text": "end-policy"}, + {"lineage": [{"startswith": "prefix-set"}], "exit_text": "end-set"}, + {"lineage": [{"startswith": "policy-map"}], "exit_text": "end-policy-map"}, + {"lineage": [{"startswith": "class-map"}], "exit_text": "end-class-map"}, + {"lineage": [{"startswith": "community-set"}], "exit_text": "end-set"}, + {"lineage": [{"startswith": "extcommunity-set"}], "exit_text": "end-set"}, + {"lineage": [{"equals": "rsvp"}], "exit_text": "exit"}, + {"lineage": [{"equals": "mpls traffic-eng"}], "exit_text": "exit"}, + {"lineage": [{"startswith": "mpls ldp"}], "exit_text": "exit"}, + {"lineage": [{"startswith": "router ospf"}], "exit_text": "exit"}, + {"lineage": [{"startswith": "router ospfv3"}], "exit_text": "exit"}, + {"lineage": [{"startswith": "template"}], "exit_text": "end-template"}, + {"lineage": [{"startswith": "interface"}], "exit_text": "root"}, + {"lineage": [{"startswith": "router bgp"}], "exit_text": "root"}, + ], + "indent_adjust": [{"start_expression": "^\\s*template", "end_expression": "^\\s*end-template"}], + "per_line_sub": [ + {"search": "^Building configuration.*", "replace": ""}, + {"search": "^Current configuration.*", "replace": ""}, + {"search": "^ntp clock-period .*", "replace": ""}, + {"search": ".*speed.*", "replace": ""}, + {"search": ".*duplex.*", "replace": ""}, + {"search": ".*negotiation auto.*", "replace": ""}, + {"search": ".*parity none.*", "replace": ""}, + {"search": "^end-policy$", "replace": " end-policy"}, + {"search": "^end-set$", "replace": " end-set"}, + {"search": "^end$", "replace": ""}, + {"search": "^\\s*[#!].*", "replace": ""}, + ], + "idempotent_commands": [ + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "vrf"}, + {"startswith": "address-family"}, + {"startswith": "additional-paths selection route-policy"}, + ] + }, + {"lineage": [{"startswith": "router bgp"}, {"startswith": "bgp router-id"}]}, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "neighbor-group"}, + {"startswith": "address-family"}, + {"startswith": "soft-reconfiguration inbound"}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "vrf"}, + {"startswith": "neighbor"}, + {"startswith": "address-family"}, + {"startswith": ["soft-reconfiguration inbound", "maximum-prefix"]}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "vrf"}, + {"startswith": "neighbor"}, + {"startswith": ["password", "description"]}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "neighbor"}, + {"startswith": ["description", "password"]}, + ] + }, + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": "area"}, + {"startswith": "interface"}, + {"startswith": "cost"}, + ] + }, + {"lineage": [{"startswith": "router ospf"}, {"startswith": "router-id"}]}, + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": "area"}, + {"startswith": "message-digest-key"}, + ] + }, + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": "max-metric router-lsa"}, + ] + }, + {"lineage": [{"equals": "l2vpn"}, {"startswith": "router-id"}]}, + {"lineage": [{"re_search": "logging \\d+.\\d+.\\d+.\\d+ vrf MGMT"}]}, + { + "lineage": [ + {"equals": "line default"}, + {"startswith": "access-class ingress"}, + ] + }, + {"lineage": [{"equals": "line default"}, {"startswith": "transport input"}]}, + {"lineage": [{"startswith": "hostname"}]}, + {"lineage": [{"startswith": "logging source-interface"}]}, + {"lineage": [{"startswith": "interface"}, {"startswith": "ipv4 address"}]}, + {"lineage": [{"startswith": "snmp-server community"}]}, + {"lineage": [{"startswith": "snmp-server location"}]}, + {"lineage": [{"equals": "line console"}, {"startswith": "exec-timeout"}]}, + { + "lineage": [ + {"equals": "mpls ldp"}, + {"startswith": "session protection duration"}, + ] + }, + {"lineage": [{"equals": "mpls ldp"}, {"startswith": "igp sync delay"}]}, + {"lineage": [{"startswith": "interface"}, {"startswith": ["mtu"]}]}, + {"lineage": [{"startswith": "banner"}]}, + ], +} +nxos_options: dict = { + "style": "nxos", + "per_line_sub": [ + {"search": "^Building configuration.*", "replace": ""}, + {"search": "^Current configuration.*", "replace": ""}, + {"search": "^ntp clock-period .*", "replace": ""}, + {"search": "^snmp-server location ", "replace": "snmp-server location "}, + {"search": "^version.*", "replace": ""}, + {"search": "^boot (system|kickstart) .*", "replace": ""}, + {"search": "!.*", "replace": ""}, + ], + "idempotent_commands_blacklist": [ + { + "lineage": [ + {"startswith": "interface"}, + {"re_search": "ip address.*secondary"}, + ] + } + ], + "idempotent_commands": [ + { + "lineage": [ + { + "startswith": [ + "power redundancy-mode", + "cli alias name wr ", + "aaa authentication login console", + "port-channel load-balance", + "hostname", + "ip tftp source-interface", + "ip telnet source-interface", + "ip tacacs source-interface", + "logging source-interface", + ], + "re_search": "^spanning-tree vlan ([\\d,-]+) priority", + } + ] + }, + {"lineage": [{"startswith": ["hardware access-list tcam region ifacl"]}]}, + {"lineage": [{"startswith": ["hardware access-list tcam region vacl"]}]}, + {"lineage": [{"startswith": ["hardware access-list tcam region qos"]}]}, + {"lineage": [{"startswith": ["hardware access-list tcam region racl"]}]}, + {"lineage": [{"startswith": ["hardware access-list tcam region ipv6-racl"]}]}, + {"lineage": [{"startswith": ["hardware access-list tcam region e-ipv6-racl"]}]}, + {"lineage": [{"startswith": ["hardware access-list tcam region l3qos"]}]}, + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": "vrf"}, + {"startswith": ["maximum-paths", "log-adjacency-changes"]}, + ] + }, + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": ["maximum-paths", "log-adjacency-changes"]}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "vrf"}, + {"startswith": "address-family"}, + {"startswith": ["maximum-paths"]}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "address-family"}, + {"startswith": ["maximum-paths"]}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "template"}, + {"startswith": "address-family"}, + {"startswith": "send-community"}, + ] + }, + { + "lineage": [ + {"startswith": "interface"}, + {"re_search": "^hsrp \\d+"}, + {"startswith": ["ip", "priority", "authentication md5 key-string"]}, + ] + }, + { + "lineage": [ + {"startswith": "interface"}, + { + "startswith": [ + "ip address", + "duplex", + "speed", + "switchport mode", + "switchport access vlan", + "switchport trunk native vlan", + "switchport trunk allowed vlan", + "udld port", + "ip ospf cost", + "ipv6 link-local", + "ospfv3 cost", + ] + }, + ] + }, + {"lineage": [{"startswith": "interface"}, {"startswith": "mtu"}]}, + {"lineage": [{"equals": "line console"}, {"startswith": "exec-timeout"}]}, + { + "lineage": [ + {"startswith": "line vty"}, + { + "startswith": [ + "transport input", + "ipv6 access-class", + "access-class", + ] + }, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + { + "startswith": "bgp router-id", + "re_search": "neighbor \\S+ description", + }, + ] + }, + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": ["router-id", "log-adjacency-changes"]}, + ] + }, + { + "lineage": [ + {"startswith": "ipv6 router ospf"}, + {"startswith": ["router-id", "log-adjacency-changes"]}, + ] + }, + { + "lineage": [ + { + "startswith": [ + "mac address-table aging-time", + "snmp-server community", + "snmp-server location", + ] + } + ] + }, + {"lineage": [{"startswith": "vpc domain"}, {"startswith": "role priority"}]}, + {"lineage": [{"startswith": "banner"}]}, + {"lineage": [{"startswith": "username admin password 5"}]}, + { + "lineage": [ + {"equals": "policy-map type control-plane copp-system-policy"}, + {"startswith": "class"}, + {"startswith": "police"}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "vrf"}, + {"startswith": "neighbor"}, + {"startswith": "address-family"}, + {"startswith": "soft-reconfiguration inbound"}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "vrf"}, + {"startswith": "neighbor"}, + {"startswith": "password"}, + ] + }, + ], + "negation_default_when": [ + { + "lineage": [ + {"startswith": "interface"}, + { + "startswith": "ip ospf bfd", + "re_search": "standby \\d+ authentication md5 key-string", + }, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "neighbor"}, + {"startswith": "address-family"}, + {"equals": "send-community"}, + ] + }, + { + "lineage": [ + {"startswith": "interface"}, + {"contains": "ip ospf passive-interface"}, + ] + }, + { + "lineage": [ + {"startswith": "interface"}, + {"contains": "ospfv3 passive-interface"}, + ] + }, + ], + "negation_negate_with": [ + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "address-family"}, + {"startswith": "maximum-paths ibgp"}, + ], + "use": "default maximum-paths ibgp", + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "vrf"}, + {"startswith": "address-family"}, + {"startswith": "maximum-paths ibgp"}, + ], + "use": "default maximum-paths ibgp", + }, + { + "lineage": [{"equals": "line vty"}, {"startswith": "session-limit"}], + "use": "session-limit 32", + }, + ], +} +eos_options: dict = { + "style": "eos", + "sectional_exiting": [ + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "template peer-policy"}, + ], + "exit_text": "exit-peer-policy", + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"startswith": "template peer-session"}, + ], + "exit_text": "exit-peer-session", + }, + { + "lineage": [{"startswith": "router bgp"}, {"startswith": "address-family"}], + "exit_text": "exit-address-family", + }, + ], + "per_line_sub": [ + {"search": "^Building configuration.*", "replace": ""}, + {"search": "^Current configuration.*", "replace": ""}, + {"search": "^! Last configuration change.*", "replace": ""}, + {"search": "^! NVRAM config last updated.*", "replace": ""}, + {"search": "^ntp clock-period .*", "replace": ""}, + {"search": "^version.*", "replace": ""}, + {"search": "^ logging event link-status$", "replace": ""}, + {"search": "^ logging event subif-link-status$", "replace": ""}, + {"search": "^\\s*ipv6 unreachables disable$", "replace": ""}, + {"search": "^end$", "replace": ""}, + {"search": "^\\s*[#!].*", "replace": ""}, + {"search": "^ no ip address", "replace": ""}, + {"search": "^ exit-peer-policy", "replace": ""}, + {"search": "^ exit-peer-session", "replace": ""}, + {"search": "^ exit-address-family", "replace": ""}, + ], + "idempotent_commands": [ + {"lineage": [{"startswith": "hostname"}]}, + {"lineage": [{"startswith": "logging source-interface"}]}, + {"lineage": [{"startswith": "interface"}, {"startswith": "ip address"}]}, + { + "lineage": [ + {"startswith": "line vty"}, + { + "startswith": [ + "transport input", + "access-class", + "ipv6 access-class", + ] + }, + ] + }, + { + "lineage": [ + {"startswith": "interface"}, + {"re_search": "standby \\d+ (priority|authentication md5)"}, + ] + }, + {"lineage": [{"startswith": "router bgp"}, {"startswith": "bgp router-id"}]}, + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": ["router-id", "max-lsa", "maximum-paths"]}, + ] + }, + {"lineage": [{"startswith": "ipv6 router ospf"}, {"startswith": "router-id"}]}, + { + "lineage": [ + {"startswith": "router ospf"}, + {"startswith": "log-adjacency-changes"}, + ] + }, + { + "lineage": [ + {"startswith": "ipv6 router ospf"}, + {"startswith": "log-adjacency-changes"}, + ] + }, + { + "lineage": [ + {"startswith": "router bgp"}, + {"re_search": "neighbor \\S+ description"}, + ] + }, + {"lineage": [{"startswith": "snmp-server community"}]}, + {"lineage": [{"startswith": "snmp-server location"}]}, + {"lineage": [{"equals": "line con 0"}, {"startswith": "exec-timeout"}]}, + { + "lineage": [ + {"startswith": "interface"}, + {"startswith": "ip ospf message-digest-key"}, + ] + }, + {"lineage": [{"startswith": "logging buffered"}]}, + {"lineage": [{"startswith": "tacacs-server key"}]}, + {"lineage": [{"startswith": "logging facility"}]}, + {"lineage": [{"startswith": "vlan internal allocation policy"}]}, + {"lineage": [{"startswith": "username admin"}]}, + {"lineage": [{"startswith": "snmp-server user"}]}, + {"lineage": [{"startswith": "banner"}]}, + {"lineage": [{"startswith": "ntp source"}]}, + {"lineage": [{"startswith": "management"}, {"startswith": "idle-timeout"}]}, + {"lineage": [{"startswith": "aaa authentication enable default group tacacs+"}]}, + { + "lineage": [ + {"equals": "control-plane"}, + {"equals": "ip access-group CPP in"}, + ] + }, + {"lineage": [{"startswith": "interface"}, {"startswith": "mtu"}]}, + {"lineage": [{"startswith": "snmp-server source-interface"}]}, + {"lineage": [{"startswith": "ip tftp client source-interface"}]}, + ], + "negation_default_when": [ + { + "lineage": [ + {"startswith": "interface"}, + {"equals": "logging event link-status"}, + ] + } + ], +} + + +junos_options: dict = { + "style": "junos", + "negation": "delete", + "syntax_style": "juniper", +} + + +vyos_options: dict = { + "style": "vyos", + "negation": "delete", + "syntax_style": "juniper", +} + + +def options_for(os: str) -> dict: + """Create base options on an OS level.""" + options: dict = { + "ios": ios_options, + "iosxe": iosxe_options, + "iosxr": iosxr_options, + "nxos": nxos_options, + "eos": eos_options, + "junos": junos_options, + "vyos": vyos_options, + } + + if options.get(os): + return {**base_options, **options[os]} + + return {**base_options, "style": os} diff --git a/netutils/hier_config/root.py b/netutils/hier_config/root.py new file mode 100644 index 00000000..6f9aa7f5 --- /dev/null +++ b/netutils/hier_config/root.py @@ -0,0 +1,478 @@ +from __future__ import annotations +from itertools import islice +import re +from pathlib import Path +from typing import Optional, Set, Union, Iterator, List, TYPE_CHECKING, Tuple, Type +from logging import getLogger + +from netutils.hier_config.exceptions import HostAttrError, HierConfigError +from netutils.hier_config.base import HConfigBase +from netutils.hier_config.child import HConfigChild + +if TYPE_CHECKING: + from .host import Host + +logger = getLogger(__name__) + + +class HConfig(HConfigBase): # pylint: disable=too-many-public-methods + """ + A class for representing and comparing Cisco configurations in a + hierarchical tree data structure. + + Example usage: + + .. code:: python + + # Setup basic environment + + from hier_config import HConfig, Host + import yaml + + options = yaml.safe_load(open('./tests/fixtures/options_ios.yml')) + host = Host('example.rtr', 'ios', options) + + # Build HConfig object for the Running Config + + running_config_hier = HConfig(host=host) + running_config_hier.load_from_file('./tests/fixtures/running_config.conf') + + # Build Hierarchical Configuration object for the Generated Config + + generated_config_hier = HConfig(host=host) + generated_config_hier.load_from_file('./tests/fixtures/generated_config.conf') + + # Build Hierarchical Configuration object for the Remediation Config + + remediation_config_hier = running_config_hier.config_to_get_to(generated_config_hier) + + for line in remediation_config_hier.all_children(): + print(line.cisco_style_text()) + + See: + + ./tests/fixtures/tags_ios.yml and ./tests/fixtures/options_ios.yml + + for test examples of options and tags. + """ + + def __init__(self, host: Host): + super().__init__() + if not all([hasattr(host, "hostname"), hasattr(host, "os"), hasattr(host, "hconfig_options")]): + raise HostAttrError(host, "Missing attributes - hostname, os, hconfig_options.") + + self.host = host + self.parent = self + self.real_indent_level = -1 + + self.options.setdefault("negation", "no") + self.options.setdefault("syntax_style", "cisco") + self._logs: List[str] = [] + + def __repr__(self) -> str: + return f"HConfig(host={self.host})" + + def __hash__(self) -> int: + return id(self) + + @property + def root(self) -> HConfig: + """returns the HConfig object at the base of the tree""" + return self + + @property + def options(self) -> dict: + return self.host.hconfig_options + + @property + def is_leaf(self) -> bool: + """returns True if there are no children and is not an instance of HConfig""" + return False + + @property + def logs(self) -> List[str]: + return self._logs + + @property + def is_branch(self) -> bool: + """returns True if there are children or is an instance of HConfig""" + return True + + @property + def _child_class(self) -> Type[HConfigChild]: + return HConfigChild + + @property + def tags(self) -> Set[Optional[str]]: + """Recursive access to tags on all leaf nodes""" + found_tags: Set[Optional[str]] = set() + for child in self.children: + found_tags.update(child.tags) + return found_tags + + @tags.setter + def tags(self, value: Set[str]) -> None: + """Recursive access to tags on all leaf nodes""" + for child in self.children: + child.tags = value # type: ignore + + def merge(self, other: HConfig) -> None: + """Merges two HConfig objects""" + for child in other.children: + self.add_deep_copy_of(child, merged=True) + + def lineage(self) -> Iterator[HConfigChild]: + """ + Yields the lineage of parent objects, up to but excluding the root + """ + yield from () + + def load_from_file(self, file_path: Union[str, Path]) -> None: + """Load configuration text from a file""" + with open(file_path) as file: # pylint: disable=unspecified-encoding + config_text = file.read() + self.load_from_string(config_text) + + def load_from_string(self, config_text: str) -> None: + """Create Hierarchical Configuration nested objects from text""" + config_text = self._convert_to_set_commands(config_text) + + for sub in self.options["full_text_sub"]: + config_text = re.sub(sub["search"], sub["replace"], config_text) + + self._load_from_string_lines(config_text) + + if self.host.os == "ios": + self._remove_acl_remarks() + self._add_acl_sequence_numbers() + self._rm_ipv6_acl_sequence_numbers() + + def load_from_dump(self, dump: List[dict]) -> None: + """Load an HConfig dump""" + last_item: Union[HConfig, HConfigChild] = self + for item in dump: + # parent is the root + if item["depth"] == 1: + parent: Union[HConfig, HConfigChild] = self + # has the same parent + elif last_item.depth() == item["depth"]: + parent = last_item.parent + # is a child object + elif last_item.depth() + 1 == item["depth"]: + parent = last_item + # has a parent somewhere closer to the root but not the root + else: + # last_item.lineage() = (a, b, c, d, e), new_item['depth'] = 2, + # parent = a + parent = next(islice(last_item.lineage(), item["depth"] - 2, item["depth"] - 1)) + # also accept 'line' + # obj = parent.add_child(item.get('text', item['line']), force_duplicate=True) + obj = parent.add_child(item["text"], force_duplicate=True) + obj.tags = set(item["tags"]) + obj.comments = set(item["comments"]) + obj.new_in_config = item["new_in_config"] + last_item = obj + + def dump(self, lineage_rules: Optional[List[dict]] = None) -> List[dict]: + """Dump a list of loaded HConfig data""" + if lineage_rules: + children = self.all_children_sorted_with_lineage_rules(lineage_rules) + else: + children = self.all_children_sorted() + + output = [] + for child in children: + output.append( + { + "depth": child.depth(), + "text": child.text, + "tags": list(child.tags), + "comments": list(child.comments), + "new_in_config": child.new_in_config, + } + ) + + return output + + def add_tags(self, tag_rules: list, strip_negation: bool = False) -> None: + """ + Handler for tagging sections of Hierarchical Configuration data structure + for inclusion and exclusion. + """ + for rule in tag_rules: + for child in self.all_children(): + if child.lineage_test(rule, strip_negation): + if "add_tags" in rule: + child.append_tags(rule["add_tags"]) + if "remove_tags" in rule: + child.remove_tags(rule["remove_tags"]) + + def depth(self) -> int: + """Returns the distance to the root HConfig object i.e. indent level""" + return 0 + + def difference(self, target: HConfig) -> HConfig: + """ + Creates a new HConfig object with the config from self that is not in target + + Example usage: + whats in the config.lines v.s. in running config + i.e. did all my configuration changes get written to the running config + + :param target: HConfig - The configuration to check against + :return: HConfig - missing config additions + """ + delta = HConfig(host=self.host) + difference = self._difference(target, delta) + # Makes mypy happy + if not isinstance(difference, HConfig): + raise TypeError + return difference + + def config_to_get_to(self, target: HConfig, delta: Optional[HConfig] = None) -> HConfig: + """ + Figures out what commands need to be executed to transition from self to target. + self is the source data structure(i.e. the running_config), + target is the destination(i.e. generated_config) + + """ + if delta is None: + delta = HConfig(host=self.host) + + root_config = self._config_to_get_to(target, delta) + if not isinstance(root_config, HConfig): + raise TypeError + + return root_config + + def add_ancestor_copy_of(self, parent_to_add: HConfigChild) -> Union[HConfig, HConfigChild]: + """ + Add a copy of the ancestry of parent_to_add to self + and return the deepest child which is equivalent to parent_to_add + """ + base: Union[HConfig, HConfigChild] = self + for parent in parent_to_add.lineage(): + base = base.add_shallow_copy_of(parent) + + return base + + def set_order_weight(self) -> None: + """Sets self.order integer on all children""" + for child in self.all_children(): + for rule in self.options["ordering"]: + if child.lineage_test(rule): + child.order_weight = rule["order"] + + def add_sectional_exiting(self) -> None: + """ + Adds the sectional exiting text as a child + """ + for child in self.all_children(): + for rule in self.options["sectional_exiting"]: + if child.lineage_test(rule): + exit_line = child.get_child("equals", rule["exit_text"]) + if exit_line is None: + exit_line = child.add_child(rule["exit_text"]) + + exit_line.tags = child.tags + exit_line.order_weight = 999 + + def future(self, config: HConfig) -> HConfig: + """ + EXPERIMENTAL - predict the future config after config is applied to self + + The quality of the this method's output will in part depend on how well + the OS options are tuned. Ensuring that idempotency rules are accurate is + especially important. + """ + future_config = HConfig(host=self.host) + self._future(config, future_config) + return future_config + + def with_tags(self, tags: Set[str]) -> HConfig: + """ + Returns a new instance containing only sub-objects + with one of the tags in tags + """ + new_instance = HConfig(self.host) + result = self._with_tags(tags, new_instance) + # Makes mypy happy + if not isinstance(result, HConfig): + raise ValueError + return new_instance + + def all_children_sorted_by_tags(self, include_tags: Set[str], exclude_tags: Set[str]) -> Iterator[HConfigChild]: + """Yield all children recursively that match include/exclude tags""" + for child in sorted(self.children): + yield from child.all_children_sorted_by_tags(include_tags, exclude_tags) + + @staticmethod + def _load_from_string_lines_end_of_banner_test( + config_line: str, banner_end_lines: Set[str], banner_end_contains: List[str] + ) -> bool: + if config_line.startswith("^"): + return True + if config_line in banner_end_lines: + return True + if any([c in config_line for c in banner_end_contains]): + return True + return False + + # pylint: disable=too-many-locals,too-many-branches,too-many-statements + def _load_from_string_lines(self, config_text: str) -> None: + current_section: Union[HConfig, HConfigChild] = self + most_recent_item: Union[HConfig, HConfigChild] = current_section + indent_adjust = 0 + end_indent_adjust = [] + temp_banner = [] + banner_end_lines = {"EOF", "%", "!"} + banner_end_contains: List[str] = [] + in_banner = False + + for line in config_text.splitlines(): + # Process banners in configuration into one line + if in_banner: + if line != "!": + temp_banner.append(line) + + # Test if this line is the end of a banner + if self._load_from_string_lines_end_of_banner_test(str(line), banner_end_lines, banner_end_contains): + in_banner = False + most_recent_item = self.add_child("\n".join(temp_banner), True) + most_recent_item.real_indent_level = 0 + current_section = self + temp_banner = [] + continue + + # Test if this line is the start of a banner and not an empty banner + # Empty banners matching the below expression have been seen on NX-OS + if line.startswith("banner ") and line != "banner motd ##": + in_banner = True + temp_banner.append(line) + banner_words = line.split() + try: + banner_end_contains.append(banner_words[2]) + banner_end_lines.add(banner_words[2][:1]) + banner_end_lines.add(banner_words[2][:2]) + except IndexError: + pass + continue + + actual_indent = len(line) - len(line.lstrip()) + line = " " * actual_indent + " ".join(line.split()) + for sub in self.options["per_line_sub"]: + line = re.sub(sub["search"], sub["replace"], line) + line = line.rstrip() + + # If line is now empty, move to the next + if not line: + continue + + # Determine indentation level + this_indent = len(line) - len(line.lstrip()) + indent_adjust + + line = line.lstrip() + + # Walks back up the tree + while this_indent <= current_section.real_indent_level: + current_section = current_section.parent + + # Walks down the tree by one step + if this_indent > most_recent_item.real_indent_level: + current_section = most_recent_item + + most_recent_item = current_section.add_child(line, True) + most_recent_item.real_indent_level = this_indent + + for expression in self.options["indent_adjust"]: + if re.search(expression["start_expression"], line): + indent_adjust += 1 + end_indent_adjust.append(expression["end_expression"]) + break + if end_indent_adjust and re.search(end_indent_adjust[0], line): + indent_adjust -= 1 + del end_indent_adjust[0] + + if in_banner: + raise HierConfigError("we are still in a banner for some reason.") + + def _add_acl_sequence_numbers(self) -> None: + """ + Add ACL sequence numbers for use on configurations with a style of 'ios' + """ + ipv4_acl_sw = "ip access-list" + # ipv6_acl_sw = ('ipv6 access-list') + if self.host.os in ["ios"]: + acl_line_sw: Tuple[str, ...] = ("permit", "deny") + else: + acl_line_sw = ("permit", "deny", "remark") + for child in self.children: + if child.text.startswith(ipv4_acl_sw): + sequence_number = 10 + for sub_child in child.children: + if sub_child.text.startswith(acl_line_sw): + sub_child.text = f"{sequence_number} {sub_child.text}" + sequence_number += 10 + + def _rm_ipv6_acl_sequence_numbers(self) -> None: + """If there are sequence numbers in the IPv6 ACL, remove them""" + for acl in self.get_children("startswith", "ipv6 access-list "): + for entry in acl.children: + if entry.text.startswith("sequence"): + entry.text = " ".join(entry.text.split()[2:]) + + def _remove_acl_remarks(self) -> None: + for acl in self.get_children("startswith", "ip access-list "): + for entry in acl.children: + if entry.text.startswith("remark"): + acl.children.remove(entry) + + def _duplicate_child_allowed_check(self) -> bool: + """Determine if duplicate(identical text) children are allowed under the parent""" + return False + + def _convert_to_set_commands(self, config_str: str) -> str: + """ + Convert a Junupier style config string into a list of set commands. + + Args: + config_str (str): The config string to convert to set commands + Returns: + config_str (str): Configuration string + """ + if self.options["syntax_style"] == "juniper": + lines = config_str.split("\n") + path: List[str] = [] + set_commands: List[str] = [] + + for line in lines: + stripped_line = line.strip() + + # Skip empty lines + if not stripped_line: + continue + + # Strip ; from the end of the line + if stripped_line.endswith(";"): + stripped_line = stripped_line.replace(";", "") + + # Count the number of spaces at the beginning to determine the level + level = line.find(stripped_line) // 4 + + # Adjust the current path based on the level + path = path[:level] + + # If the line ends with '{' or '}', it starts a new block + if stripped_line.endswith(("{", "}")): + path.append(stripped_line[:-1].strip()) + elif stripped_line.startswith(("set", "delete")): + # It's already a set command, so just add it to the list + set_commands.append(stripped_line) + else: + # It's a command line, construct the full command + command = "set " + " ".join(path) + " " + stripped_line + set_commands.append(command) + + config_str = "\n".join(set_commands) + + return config_str diff --git a/netutils/hier_config/text_match.py b/netutils/hier_config/text_match.py new file mode 100644 index 00000000..dc511b42 --- /dev/null +++ b/netutils/hier_config/text_match.py @@ -0,0 +1,57 @@ +import re +from typing import Tuple, Union, Set + + +def equals(text: str, expression: Union[str, Set[str]]) -> bool: + """Text equivalence test""" + if isinstance(expression, str): + return text == expression + return text in expression + + +def startswith(text: str, expression: Union[str, Tuple[str, ...]]) -> bool: + """Text starts with test""" + return text.startswith(expression) + + +def endswith(text: str, expression: Union[str, Tuple[str, ...]]) -> bool: + """Text ends with test""" + return text.endswith(expression) + + +def contains(text: str, expression: str) -> bool: + """Text contains test""" + return expression in text + + +def anything(text: str, expression: str) -> bool: # pylint: disable=unused-argument + """Always returns True""" + return True + + +def nothing(text: str, expression: str) -> bool: # pylint: disable=unused-argument + """Always returns False""" + return False + + +def re_search(text: str, expression: str) -> bool: + """ + Test regex match. This method is comparatively + very slow and should be avoided where possible. + """ + return re.search(expression, text) is not None + + +def dict_call(test: str, text: str, expression: str) -> bool: + """ + Allows test methods to be called easily from variables + """ + return { + "equals": equals, + "startswith": startswith, + "endswith": endswith, + "contains": contains, + "re_search": re_search, + "anything": anything, + "nothing": nothing, + }[test](text, expression) diff --git a/tests/unit/hier_config/__init__.py b/tests/unit/hier_config/__init__.py new file mode 100644 index 00000000..e5004546 --- /dev/null +++ b/tests/unit/hier_config/__init__.py @@ -0,0 +1 @@ +# coverage needs this file to be present in the tests directory for some reason. diff --git a/tests/unit/hier_config/conftest.py b/tests/unit/hier_config/conftest.py new file mode 100644 index 00000000..343f11c8 --- /dev/null +++ b/tests/unit/hier_config/conftest.py @@ -0,0 +1,73 @@ +from os import path + +import yaml +import pytest + + +@pytest.fixture(scope="module") +def generated_config_junos(): + return open(f"{_fixture_dir()}/generated_config_junos.conf").read() + + +@pytest.fixture(scope="module") +def running_config_junos(): + return open(f"{_fixture_dir()}/running_config_junos.conf").read() + + +@pytest.fixture(scope="module") +def generated_config_flat_junos(): + return open(f"{_fixture_dir()}/generated_config_flat_junos.conf").read() + + +@pytest.fixture(scope="module") +def running_config_flat_junos(): + return open(f"{_fixture_dir()}/running_config_flat_junos.conf").read() + + +@pytest.fixture(scope="module") +def remediation_config_flat_junos(): + return open(f"{_fixture_dir()}/remediation_config_flat_junos.conf").read() + + +@pytest.fixture(scope="module") +def options_junos(): + return yaml.safe_load(open(f"{_fixture_dir()}/options_junos.yml").read()) + + +@pytest.fixture(scope="module") +def generated_config(): + return open(f"{_fixture_dir()}/generated_config.conf").read() + + +@pytest.fixture(scope="module") +def running_config(): + return open(f"{_fixture_dir()}/running_config.conf").read() + + +@pytest.fixture(scope="module") +def remediation_config_with_safe_tags(): + return open(f"{_fixture_dir()}/remediation_config_with_safe_tags.conf").read() + + +@pytest.fixture(scope="module") +def remediation_config_without_tags(): + return open(f"{_fixture_dir()}/remediation_config_without_tags.conf").read() + + +@pytest.fixture(scope="module") +def options_ios(): + return yaml.safe_load(open(f"{_fixture_dir()}/options_ios.yml").read()) + + +@pytest.fixture(scope="module") +def tags_ios(): + return yaml.safe_load(open(f"{_fixture_dir()}/tags_ios.yml").read()) + + +@pytest.fixture(scope="module") +def options_negate_with_undo(): + return yaml.safe_load(open(f"{_fixture_dir()}/options_negate_with_undo.yml").read()) + + +def _fixture_dir(): + return path.join(path.dirname(path.realpath(__file__)), "fixtures") diff --git a/tests/unit/hier_config/fixtures/generated_config.conf b/tests/unit/hier_config/fixtures/generated_config.conf new file mode 100644 index 00000000..f86b8faa --- /dev/null +++ b/tests/unit/hier_config/fixtures/generated_config.conf @@ -0,0 +1,34 @@ +hostname aggr-example.rtr +! +ip access-list extended TEST + 10 permit ip 10.0.0.0 0.0.0.7 any +! +vlan 2 + name switch_mgmt_10.0.2.0/24 +! +vlan 3 + name switch_mgmt_10.0.3.0/24 +! +vlan 4 + name switch_mgmt_10.0.4.0/24 +! +interface Vlan2 + mtu 9000 + descripton switch_10.0.2.0/24 + ip address 10.0.2.1 255.255.255.0 + ip access-group TEST in + no shutdown +! +interface Vlan3 + mtu 9000 + description switch_mgmt_10.0.3.0/24 + ip address 10.0.3.1 255.255.0.0 + ip access-group TEST in + no shutdown +! +interface Vlan4 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown diff --git a/tests/unit/hier_config/fixtures/generated_config_flat_junos.conf b/tests/unit/hier_config/fixtures/generated_config_flat_junos.conf new file mode 100644 index 00000000..77592f07 --- /dev/null +++ b/tests/unit/hier_config/fixtures/generated_config_flat_junos.conf @@ -0,0 +1,28 @@ +set system host-name aggr-example.rtr + +set firewall family inet filter TEST term 1 from source-address 10.0.0.0/29 +set firewall family inet filter TEST term 1 then accept + +set vlans switch_mgmt_10.0.2.0/24 vlan-id 2 +set vlans switch_mgmt_10.0.2.0/24 l3-interface irb.2 + +set vlans switch_mgmt_10.0.3.0/24 vlan-id 3 +set vlans switch_mgmt_10.0.3.0/24 l3-interface irb.3 + +set vlans switch_mgmt_10.0.4.0/24 vlan-id 4 +set vlans switch_mgmt_10.0.4.0/24 l3-interface irb.4 + +set interfaces irb unit 2 family inet address 10.0.2.1/24 +set interfaces irb unit 2 family inet filter input TEST +set interfaces irb unit 2 family inet mtu 9000 +set interfaces irb unit 2 family inet description "switch_10.0.2.0/24" + +set interfaces irb unit 3 family inet address 10.0.3.1/16 +set interfaces irb unit 3 family inet filter input TEST +set interfaces irb unit 3 family inet mtu 9000 +set interfaces irb unit 3 family inet description "switch_mgmt_10.0.3.0/24" + +set interfaces irb unit 4 family inet address 10.0.4.1/16 +set interfaces irb unit 4 family inet filter input TEST +set interfaces irb unit 4 family inet mtu 9000 +set interfaces irb unit 4 family inet description "switch_mgmt_10.0.4.0/24" \ No newline at end of file diff --git a/tests/unit/hier_config/fixtures/generated_config_junos.conf b/tests/unit/hier_config/fixtures/generated_config_junos.conf new file mode 100644 index 00000000..5327abbd --- /dev/null +++ b/tests/unit/hier_config/fixtures/generated_config_junos.conf @@ -0,0 +1,66 @@ +system { + host-name aggr-example.rtr; +} + +firewall { + family inet { + filter TEST { + term 1 { + from { + source-address 10.0.0.0/29; + } + then accept; + } + } + } +} + +vlans { + switch_mgmt_10.0.2.0/24 { + vlan-id 2; + l3-interface irb.2; + } + switch_mgmt_10.0.3.0/24 { + vlan-id 3; + l3-interface irb.3; + } + switch_mgmt_10.0.4.0/24 { + vlan-id 4; + l3-interface irb.4; + } +} + +interfaces { + irb { + unit 2 { + family inet { + address 10.0.2.1/24; + filter { + input TEST; + } + mtu 9000; + description "switch_mgmt_10.0.2.0/24"; + } + } + unit 3 { + family inet { + address 10.0.3.1/16; + filter { + input TEST; + } + mtu 9000; + description "switch_mgmt_10.0.3.0/24"; + } + } + unit 4 { + family inet { + address 10.0.4.1/16; + filter { + input TEST; + } + mtu 9000; + description "switch_mgmt_10.0.4.0/24"; + } + } + } +} diff --git a/tests/unit/hier_config/fixtures/options_ios.yml b/tests/unit/hier_config/fixtures/options_ios.yml new file mode 100644 index 00000000..b0cf9078 --- /dev/null +++ b/tests/unit/hier_config/fixtures/options_ios.yml @@ -0,0 +1,123 @@ +--- +# Indicates the style of the configuration +style: ios + +negation: "no" + +syntax_style: "cisco" + +# if there is a delta, negate the parents and re-write the parents with children +sectional_overwrite: [] + +# if there is a delta, overwrite these parents instead of one of their children +sectional_overwrite_no_negate: [] + +# The default order value is 500, with a range between 1 - 999. +# Commands with smaller order values float to the top in the order of execution. +# Commands with larger order values float to the bottom in the order of execution. +# Syntax Example: +# - lineage: +# - startswith: +# - no route-map +# order: 600 +ordering: +- lineage: + - startswith: no vlan filter + order: 700 +- lineage: + - startswith: interface + - startswith: no shutdown + order: 700 + +# adds +1 indent to lines following start_expression and removes the +1 indent for lines following end_expression +indent_adjust: [] + +parent_allows_duplicate_child: [] + +sectional_exiting: +# This rule is used in the hierarchical_configuration unit test for .add_section_exiting() +- lineage: + - startswith: router bgp + - startswith: template peer-policy + exit_text: exit-peer-policy +- lineage: + - startswith: router bgp + - startswith: template peer-session + exit_text: exit-peer-session +- lineage: + - startswith: router bgp + - startswith: address-family + exit_text: exit-address-family + +# substitions against the full multi-line config text +full_text_sub: [] +#- search: 'banner\s(exec|motd)\s(\S)\n(.*\n){1,}(\2)' +# replace: '' +#- search: 'banner\s(exec|motd)\s(\S.).+\n(.*\n){1,}.*(\2)' +# replace: '' +#- search: 'banner\s(exec|motd)\s(\S.)\n(.*\n){1,}(\2)' +# replace: '' + +# substitions against each line of the config text +per_line_sub: +- search: ^Building configuration.* + replace: '' +- search: ^Current configuration.* + replace: '' +- search: ^! Last configuration change.* + replace: '' +- search: ^! NVRAM config last updated.* + replace: '' +- search: ^ntp clock-period .* + replace: '' +- search: ^version.* + replace: '' +- search: ^ logging event link-status$ + replace: '' +- search: ^ logging event subif-link-status$ + replace: '' +- search: ^\s*ipv6 unreachables disable$ + replace: '' +- search: ^end$ + replace: '' +- search: '^\s*[#!].*' + replace: '' +- search: ^ no ip address + replace: '' +- search: ^ exit-peer-policy + replace: '' +- search: ^ exit-peer-session + replace: '' +- search: ^ exit-address-family + replace: '' +- search: ^crypto key generate rsa general-keys.*$ + replace: '' + +idempotent_commands_blacklist: [] + +# These commands do not require negation, they simply overwrite themselves +# Example Syntax +# - lineage: +# - startswith: interface +# - startswith: description +idempotent_commands: +- lineage: + - startswith: vlan + - startswith: name +- lineage: + - startswith: interface + - startswith: description +- lineage: + - startswith: interface + - startswith: ip address + +# Default when expression: list of expressions +negation_default_when: [] + +# Negate substitutions: expression -> negate with +# Example Syntax: +# - lineage: +# - startswith: route-map +# - startswith: description +# use: no description +negation_negate_with: [] diff --git a/tests/unit/hier_config/fixtures/options_junos.yml b/tests/unit/hier_config/fixtures/options_junos.yml new file mode 100644 index 00000000..5edf7dc8 --- /dev/null +++ b/tests/unit/hier_config/fixtures/options_junos.yml @@ -0,0 +1,64 @@ +--- +# Indicates the style of the configuration +style: junos + +# negation prefix +negation: "delete" + +syntax_style: "juniper" + +# if there is a delta, negate the parents and re-write the parents with children +sectional_overwrite: [] + +# if there is a delta, overwrite these parents instead of one of their children +sectional_overwrite_no_negate: [] + +# The default order value is 500, with a range between 1 - 999. +# Commands with smaller order values float to the top in the order of execution. +# Commands with larger order values float to the bottom in the order of execution. +# Syntax Example: +# - lineage: +# - startswith: +# - no route-map +# order: 600 +ordering: [] + +# adds +1 indent to lines following start_expression and removes the +1 indent for lines following end_expression +indent_adjust: [] + +parent_allows_duplicate_child: [] + +sectional_exiting: [] +# This rule is used in the hierarchical_configuration unit test for .add_section_exiting() + +# substitions against the full multi-line config text +full_text_sub: [] +#- search: 'banner\s(exec|motd)\s(\S)\n(.*\n){1,}(\2)' +# replace: '' +#- search: 'banner\s(exec|motd)\s(\S.).+\n(.*\n){1,}.*(\2)' +# replace: '' +#- search: 'banner\s(exec|motd)\s(\S.)\n(.*\n){1,}(\2)' +# replace: '' + +# substitions against each line of the config text +per_line_sub: [] + +idempotent_commands_blacklist: [] + +# These commands do not require negation, they simply overwrite themselves +# Example Syntax +# - lineage: +# - startswith: interface +# - startswith: description +idempotent_commands: [] + +# Default when expression: list of expressions +negation_default_when: [] + +# Negate substitutions: expression -> negate with +# Example Syntax: +# - lineage: +# - startswith: route-map +# - startswith: description +# use: no description +negation_negate_with: [] diff --git a/tests/unit/hier_config/fixtures/options_negate_with_undo.yml b/tests/unit/hier_config/fixtures/options_negate_with_undo.yml new file mode 100644 index 00000000..99589b05 --- /dev/null +++ b/tests/unit/hier_config/fixtures/options_negate_with_undo.yml @@ -0,0 +1,21 @@ +--- +# Indicates the style of the configuration +style: comware5 + +# negation prefix +negation: 'undo' + +syntax_style: "cisco" + +sectional_overwrite: [] +sectional_overwrite_no_negate: [] +ordering: [] +indent_adjust: [] +parent_allows_duplicate_child: [] +sectional_exiting: [] +full_text_sub: [] +per_line_sub: [] +idempotent_commands_blacklist: [] +idempotent_commands: [] +negation_default_when: [] +negation_negate_with: [] \ No newline at end of file diff --git a/tests/unit/hier_config/fixtures/remediation_config_flat_junos.conf b/tests/unit/hier_config/fixtures/remediation_config_flat_junos.conf new file mode 100644 index 00000000..33e01795 --- /dev/null +++ b/tests/unit/hier_config/fixtures/remediation_config_flat_junos.conf @@ -0,0 +1,19 @@ +delete vlans switch_mgmt_10.0.4.0/24 vlan-id 3 +delete vlans switch_mgmt_10.0.4.0/24 l3-interface irb.3 +delete interfaces irb unit 2 family inet description "switch_10.0.2.0/24" +delete interfaces irb unit 2 family inet disable +delete interfaces irb unit 3 family inet address 10.0.4.1/16 +delete interfaces irb unit 3 family inet description "switch_mgmt_10.0.4.0/24" +set vlans switch_mgmt_10.0.3.0/24 vlan-id 3 +set vlans switch_mgmt_10.0.3.0/24 l3-interface irb.3 +set vlans switch_mgmt_10.0.4.0/24 vlan-id 4 +set vlans switch_mgmt_10.0.4.0/24 l3-interface irb.4 +set interfaces irb unit 2 family inet filter input TEST +set interfaces irb unit 2 family inet mtu 9000 +set interfaces irb unit 2 family inet description "switch_mgmt_10.0.2.0/24" +set interfaces irb unit 3 family inet address 10.0.3.1/16 +set interfaces irb unit 3 family inet description "switch_mgmt_10.0.3.0/24" +set interfaces irb unit 4 family inet address 10.0.4.1/16 +set interfaces irb unit 4 family inet filter input TEST +set interfaces irb unit 4 family inet mtu 9000 +set interfaces irb unit 4 family inet description "switch_mgmt_10.0.4.0/24" \ No newline at end of file diff --git a/tests/unit/hier_config/fixtures/remediation_config_with_safe_tags.conf b/tests/unit/hier_config/fixtures/remediation_config_with_safe_tags.conf new file mode 100644 index 00000000..fa97c343 --- /dev/null +++ b/tests/unit/hier_config/fixtures/remediation_config_with_safe_tags.conf @@ -0,0 +1,4 @@ +interface Vlan3 + description switch_mgmt_10.0.3.0/24 +interface Vlan4 + description switch_mgmt_10.0.4.0/24 \ No newline at end of file diff --git a/tests/unit/hier_config/fixtures/remediation_config_without_tags.conf b/tests/unit/hier_config/fixtures/remediation_config_without_tags.conf new file mode 100644 index 00000000..e1dfed02 --- /dev/null +++ b/tests/unit/hier_config/fixtures/remediation_config_without_tags.conf @@ -0,0 +1,17 @@ +vlan 3 + name switch_mgmt_10.0.3.0/24 +vlan 4 + name switch_mgmt_10.0.4.0/24 +interface Vlan2 + mtu 9000 + ip access-group TEST in + no shutdown +interface Vlan3 + description switch_mgmt_10.0.3.0/24 + ip address 10.0.3.1 255.255.0.0 +interface Vlan4 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown \ No newline at end of file diff --git a/tests/unit/hier_config/fixtures/running_config.conf b/tests/unit/hier_config/fixtures/running_config.conf new file mode 100644 index 00000000..9ff2ce05 --- /dev/null +++ b/tests/unit/hier_config/fixtures/running_config.conf @@ -0,0 +1,22 @@ +hostname aggr-example.rtr +! +ip access-list extended TEST + 10 permit ip 10.0.0.0 0.0.0.7 any +! +vlan 2 + name switch_mgmt_10.0.2.0/24 +! +vlan 3 + name switch_mgmt_10.0.4.0/24 +! +interface Vlan2 + descripton switch_10.0.2.0/24 + ip address 10.0.2.1 255.255.255.0 + shutdown +! +interface Vlan3 + mtu 9000 + description switch_mgmt_10.0.4.0/24 + ip address 10.0.4.1 255.255.0.0 + ip access-group TEST in + no shutdown diff --git a/tests/unit/hier_config/fixtures/running_config_flat_junos.conf b/tests/unit/hier_config/fixtures/running_config_flat_junos.conf new file mode 100644 index 00000000..c94da491 --- /dev/null +++ b/tests/unit/hier_config/fixtures/running_config_flat_junos.conf @@ -0,0 +1,19 @@ +set system host-name aggr-example.rtr + +set firewall family inet filter TEST term 1 from source-address 10.0.0.0/29 +set firewall family inet filter TEST term 1 then accept + +set vlans switch_mgmt_10.0.2.0/24 vlan-id 2 +set vlans switch_mgmt_10.0.2.0/24 l3-interface irb.2 + +set vlans switch_mgmt_10.0.4.0/24 vlan-id 3 +set vlans switch_mgmt_10.0.4.0/24 l3-interface irb.3 + +set interfaces irb unit 2 family inet address 10.0.2.1/24 +set interfaces irb unit 2 family inet description "switch_10.0.2.0/24" +set interfaces irb unit 2 family inet disable + +set interfaces irb unit 3 family inet address 10.0.4.1/16 +set interfaces irb unit 3 family inet filter input TEST +set interfaces irb unit 3 family inet mtu 9000 +set interfaces irb unit 3 family inet description "switch_mgmt_10.0.4.0/24" \ No newline at end of file diff --git a/tests/unit/hier_config/fixtures/running_config_junos.conf b/tests/unit/hier_config/fixtures/running_config_junos.conf new file mode 100644 index 00000000..df57fcbd --- /dev/null +++ b/tests/unit/hier_config/fixtures/running_config_junos.conf @@ -0,0 +1,51 @@ +system { + host-name aggr-example.rtr; +} + +firewall { + family inet { + filter TEST { + term 1 { + from { + source-address 10.0.0.0/29; + } + then { + accept; + } + } + } + } +} + +vlans { + switch_mgmt_10.0.2.0/24 { + vlan-id 2; + l3-interface irb.2; + } + switch_mgmt_10.0.4.0/24 { + vlan-id 3; + l3-interface irb.3; + } +} + +interfaces { + irb { + unit 2 { + family inet { + address 10.0.2.1/24; + description "switch_10.0.2.0/24"; + disable; + } + } + unit 3 { + family inet { + address 10.0.4.1/16; + filter { + input TEST; + } + mtu 9000; + description "switch_mgmt_10.0.4.0/24"; + } + } + } +} diff --git a/tests/unit/hier_config/fixtures/tags_ios.yml b/tests/unit/hier_config/fixtures/tags_ios.yml new file mode 100644 index 00000000..4d4b0a9d --- /dev/null +++ b/tests/unit/hier_config/fixtures/tags_ios.yml @@ -0,0 +1,32 @@ +--- +- lineage: + - equals: + - no ip http secure-server + - no ip http server + - vlan + - no vlan + add_tags: safe +- lineage: + - startswith: interface Vlan + - startswith: + - description + add_tags: safe +- lineage: + - startswith: + - ip access-list + - no ip access-list + - access-list + - no access-list + add_tags: manual +- lineage: + - startswith: interface Vlan + - startswith: + - ip address + - no ip address + - mtu + - no mtu + - ip access-group + - no ip access-group + - shutdown + - no shutdown + add_tags: manual diff --git a/tests/unit/hier_config/test_hier_config.py b/tests/unit/hier_config/test_hier_config.py new file mode 100644 index 00000000..ecdd1fd5 --- /dev/null +++ b/tests/unit/hier_config/test_hier_config.py @@ -0,0 +1,530 @@ +import tempfile +import os +import types + +import pytest + +from netutils.hier_config import HConfig, Host + + +# pylint: ignore=too-many-public-methods +class TestHConfig: + @pytest.fixture(autouse=True) + def setup(self, options_ios): + self.os = "ios" + self.host_a = Host("example1.rtr", self.os, options_ios) + self.host_b = Host("example2.rtr", self.os, options_ios) + + def test_bool(self): + config = HConfig(host=self.host_a) + assert config + + def test_merge(self): + hier1 = HConfig(host=self.host_a) + hier1.add_child("interface Vlan2") + hier2 = HConfig(host=self.host_b) + hier2.add_child("interface Vlan3") + + assert len(list(hier1.all_children())) == 1 + assert len(list(hier2.all_children())) == 1 + + hier1.merge(hier2) + + assert len(list(hier1.all_children())) == 2 + + def test_load_from_file(self): + hier = HConfig(host=self.host_a) + config = "interface Vlan2\n ip address 1.1.1.1 255.255.255.0" + + with tempfile.NamedTemporaryFile(mode="r+", delete=False) as myfile: + myfile.file.write(config) + myfile.file.flush() + myfile.close() + hier.load_from_file(myfile.name) + os.remove(myfile.name) + + assert len(list(hier.all_children())) == 2 + + def test_load_from_config_text(self): + hier = HConfig(host=self.host_a) + config = "interface Vlan2\n ip address 1.1.1.1 255.255.255.0" + + hier.load_from_string(config) + assert len(list(hier.all_children())) == 2 + + def test_dump_and_load_from_dump_and_compare(self): + hier_pre_dump = HConfig(host=self.host_a) + a1 = hier_pre_dump.add_child("a1") + b2 = a1.add_child("b2") + + b2.order_weight = 400 + b2.tags.add("test") + b2.comments.add("test comment") + b2.new_in_config = True + + dump = hier_pre_dump.dump() + + hier_post_dump = HConfig(host=self.host_a) + hier_post_dump.load_from_dump(dump) + + assert hier_pre_dump == hier_post_dump + + def test_add_tags(self): + hier = HConfig(host=self.host_a) + tag_rules = [{"lineage": [{"equals": "interface Vlan2"}], "add_tags": "test"}] + child = hier.add_child("interface Vlan2") + + hier.add_tags(tag_rules) + + assert {"test"} == child.tags + + def test_all_children_sorted_by_lineage_rules(self, tags_ios): + hier = HConfig(host=self.host_a) + svi = hier.add_child("interface Vlan2") + svi.add_child("description switch-mgmt-10.0.2.0/24") + + mgmt = hier.add_child("interface FastEthernet0") + mgmt.add_child("description mgmt-192.168.0.0/24") + + assert len(list(hier.all_children())) == 4 + assert isinstance(hier.all_children(), types.GeneratorType) + + assert len(list(hier.all_children_sorted_with_lineage_rules(tags_ios))) == 2 + + assert isinstance( + hier.all_children_sorted_with_lineage_rules(tags_ios), + types.GeneratorType, + ) + + def test_add_ancestor_copy_of(self): + hier1 = HConfig(host=self.host_a) + interface = hier1.add_child("interface Vlan2") + interface.add_children(["description switch-mgmt-192.168.1.0/24", "ip address 192.168.1.0/24"]) + hier1.add_ancestor_copy_of(interface) + + assert len(list(hier1.all_children())) == 3 + assert isinstance(hier1.all_children(), types.GeneratorType) + + def test_has_children(self): + hier = HConfig(host=self.host_a) + assert not hier.has_children() + hier.add_child("interface Vlan2") + assert hier.has_children() + + def test_depth(self): + hier = HConfig(host=self.host_a) + interface = hier.add_child("interface Vlan2") + ip_address = interface.add_child("ip address 192.168.1.1 255.255.255.0") + assert ip_address.depth() == 2 + + def test_get_child(self): + hier = HConfig(host=self.host_a) + hier.add_child("interface Vlan2") + child = hier.get_child("equals", "interface Vlan2") + assert child.text == "interface Vlan2" + + def test_get_child_deep(self): + hier = HConfig(host=self.host_a) + interface = hier.add_child("interface Vlan2") + interface.add_child("ip address 192.168.1.1 255.255.255.0") + child = hier.get_child_deep( + [ + ("equals", "interface Vlan2"), + ("equals", "ip address 192.168.1.1 255.255.255.0"), + ] + ) + assert child is not None + + def test_get_children(self): + hier = HConfig(host=self.host_a) + hier.add_child("interface Vlan2") + hier.add_child("interface Vlan3") + children = list(hier.get_children("startswith", "interface")) + assert len(children) == 2 + for child in children: + assert child.text.startswith("interface Vlan") + + def test_move(self): + hier1 = HConfig(host=self.host_a) + interface1 = hier1.add_child("interface Vlan2") + interface1.add_child("192.168.0.1/30") + + assert len(list(hier1.all_children())) == 2 + + hier2 = HConfig(host=self.host_b) + + assert len(list(hier2.all_children())) == 0 + + interface1.move(hier2) + + assert len(list(hier1.all_children())) == 0 + assert len(list(hier2.all_children())) == 2 + + def test_del_child_by_text(self): + hier = HConfig(host=self.host_a) + hier.add_child("interface Vlan2") + hier.del_child_by_text("interface Vlan2") + + assert len(list(hier.all_children())) == 0 + + def test_del_child(self): + hier1 = HConfig(host=self.host_a) + hier1.add_child("interface Vlan2") + + assert len(list(hier1.all_children())) == 1 + + hier1.del_child(hier1.get_child("startswith", "interface")) + + assert len(list(hier1.all_children())) == 0 + + def test_rebuild_children_dict(self): + hier1 = HConfig(host=self.host_a) + interface = hier1.add_child("interface Vlan2") + interface.add_children(["description switch-mgmt-192.168.1.0/24", "ip address 192.168.1.0/24"]) + delta_a = hier1 + hier1.rebuild_children_dict() + delta_b = hier1 + + assert list(delta_a.all_children()) == list(delta_b.all_children()) + + def test_add_children(self): + interface_items1 = [ + "description switch-mgmt 192.168.1.0/24", + "ip address 192.168.1.1/24", + ] + hier1 = HConfig(host=self.host_a) + interface1 = hier1.add_child("interface Vlan2") + interface1.add_children(interface_items1) + + assert len(list(hier1.all_children())) == 3 + + interface_items2 = ["description switch-mgmt 192.168.1.0/24"] + hier2 = HConfig(host=self.host_a) + interface2 = hier2.add_child("interface Vlan2") + interface2.add_children(interface_items2) + + assert len(list(hier2.all_children())) == 2 + + def test_add_child(self): + hier = HConfig(host=self.host_a) + interface = hier.add_child("interface Vlan2") + assert interface.depth() == 1 + assert interface.text == "interface Vlan2" + assert not isinstance(interface, list) + + def test_add_deep_copy_of(self): + hier1 = HConfig(host=self.host_a) + interface1 = hier1.add_child("interface Vlan2") + interface1.add_children(["description switch-mgmt-192.168.1.0/24", "ip address 192.168.1.0/24"]) + + hier2 = HConfig(host=self.host_b) + hier2.add_deep_copy_of(interface1) + + assert len(list(hier2.all_children())) == 3 + assert isinstance(hier2.all_children(), types.GeneratorType) + + def test_lineage(self): + """This is covered by test_path""" + pass + + def test_path(self): + hier = HConfig(host=self.host_a) + config_a = hier.add_child("a") + config_aa = config_a.add_child("aa") + config_aaa = config_aa.add_child("aaa") + assert list(config_aaa.path()) == ["a", "aa", "aaa"] + + def test_cisco_style_text(self): + hier = HConfig(host=self.host_a) + interface = hier.add_child("interface Vlan2") + ip_address = interface.add_child("ip address 192.168.1.1 255.255.255.0") + assert ip_address.cisco_style_text() == " ip address 192.168.1.1 255.255.255.0" + assert isinstance(ip_address.cisco_style_text(), str) + assert not isinstance(ip_address.cisco_style_text(), list) + + def test_all_children_sorted_untagged(self): + config = HConfig(host=self.host_a) + interface = config.add_child("interface Vlan2") + ip_address_a = interface.add_child("ip address 192.168.1.1/24") + ip_address_a.append_tags("a") + ip_address_none = interface.add_child("ip address 192.168.2.1/24") + + assert ip_address_none is list(config.all_children_sorted_untagged())[1] + assert len(list(config.all_children_sorted_untagged())) == 2 + assert ip_address_none is list(config.all_children_sorted_untagged())[1] + + def test_all_children_sorted_by_tags(self): + config = HConfig(host=self.host_a) + config_a = config.add_child("a") + config_aa = config_a.add_child("aa") + config_a.add_child("ab") + config_aaa = config_aa.add_child("aaa") + config_aab = config_aa.add_child("aab") + config_aaa.append_tags("aaa") + config_aab.append_tags("aab") + + case_1_matches = [c.text for c in config.all_children_sorted_by_tags({"aaa"}, set())] + assert ["a", "aa", "aaa"] == case_1_matches + case_2_matches = [c.text for c in config.all_children_sorted_by_tags(set(), {"aab"})] + assert ["a", "aa", "aaa", "ab"] == case_2_matches + case_3_matches = [c.text for c in config.all_children_sorted_by_tags({"aaa"}, {"aab"})] + assert ["a", "aa", "aaa"] == case_3_matches + + def test_all_children_sorted(self): + hier = HConfig(host=self.host_a) + interface = hier.add_child("interface Vlan2") + interface.add_child("standby 1 ip 10.15.11.1") + assert len(list(hier.all_children_sorted())) == 2 + + def test_all_children(self): + hier = HConfig(host=self.host_a) + interface = hier.add_child("interface Vlan2") + interface.add_child("standby 1 ip 10.15.11.1") + assert len(list(hier.all_children())) == 2 + + def test_delete(self): + hier = HConfig(host=self.host_a) + config_a = hier.add_child("a") + config_a.delete() + assert not hier.children + + def test_set_order_weight(self): + hier = HConfig(host=self.host_a) + child = hier.add_child("no vlan filter") + hier.set_order_weight() + assert child.order_weight == 700 + + def test_add_sectional_exiting(self): + hier = HConfig(host=self.host_a) + bgp = hier.add_child("router bgp 64500") + template = bgp.add_child("template peer-policy") + hier.add_sectional_exiting() + sectional_exit = template.get_child("equals", "exit-peer-policy") + assert sectional_exit is not None + + def test_to_tag_spec(self): + pass + + def test_tags(self): + config = HConfig(host=self.host_a) + interface = config.add_child("interface Vlan2") + ip_address = interface.add_child("ip address 192.168.1.1/24") + assert None in interface.tags + assert None in ip_address.tags + ip_address.append_tags("a") + assert "a" in interface.tags + assert "a" in ip_address.tags + assert "b" not in interface.tags + assert "b" not in ip_address.tags + + def test_append_tags(self): + config = HConfig(host=self.host_a) + interface = config.add_child("interface Vlan2") + ip_address = interface.add_child("ip address 192.168.1.1/24") + ip_address.append_tags("test_tag") + assert "test_tag" in config.tags + assert "test_tag" in interface.tags + assert "test_tag" in ip_address.tags + + def test_remove_tags(self): + config = HConfig(host=self.host_a) + interface = config.add_child("interface Vlan2") + ip_address = interface.add_child("ip address 192.168.1.1/24") + ip_address.append_tags("test_tag") + assert "test_tag" in config.tags + assert "test_tag" in interface.tags + assert "test_tag" in ip_address.tags + ip_address.remove_tags("test_tag") + assert "test_tag" not in config.tags + assert "test_tag" not in interface.tags + assert "test_tag" not in ip_address.tags + + def test_with_tags(self): + pass + + def test_negate(self): + hier = HConfig(host=self.host_a) + interface = hier.add_child("interface Vlan2") + interface.negate() + assert interface.text == "no interface Vlan2" + + def test_config_to_get_to(self): + running_config_hier = HConfig(host=self.host_a) + interface = running_config_hier.add_child("interface Vlan2") + interface.add_child("ip address 192.168.1.1/24") + generated_config_hier = HConfig(host=self.host_a) + generated_config_hier.add_child("interface Vlan3") + remediation_config_hier = running_config_hier.config_to_get_to(generated_config_hier) + assert len(list(remediation_config_hier.all_children())) == 2 + + def test_config_to_get_to_right(self): + running_config_hier = HConfig(host=self.host_a) + running_config_hier.add_child("do not add me") + generated_config_hier = HConfig(host=self.host_a) + generated_config_hier.add_child("do not add me") + generated_config_hier.add_child("add me") + delta = HConfig(host=self.host_a) + running_config_hier._config_to_get_to_right(generated_config_hier, delta) + assert "do not add me" not in delta + assert "add me" in delta + + def test_sectional_overwrite_no_negate_check(self): + pass + + def test_sectional_overwrite_check(self): + pass + + def test_overwrite_with(self): + pass + + def test_add_shallow_copy_of(self): + base_config = HConfig(host=self.host_a) + + config_a = HConfig(host=self.host_a) + interface_a = config_a.add_child("interface Vlan2") + interface_a.append_tags({"ta", "tb"}) + interface_a.comments.add("ca") + interface_a.order_weight = 200 + + config_b = HConfig(host=self.host_b) + interface_b = config_b.add_child("interface Vlan2") + interface_b.append_tags({"tc"}) + interface_b.comments.add("cc") + interface_b.order_weight = 201 + + copied_interface = base_config.add_shallow_copy_of(interface_a, merged=True) + assert copied_interface.tags == {"ta", "tb"} + assert copied_interface.comments == {"ca"} + assert copied_interface.order_weight == 200 + assert copied_interface.instances == [ + { + "hostname": interface_a.host.hostname, + "comments": interface_a.comments, + "tags": interface_a.tags, + } + ] + + copied_interface = base_config.add_shallow_copy_of(interface_b, merged=True) + + assert copied_interface.tags == {"ta", "tb", "tc"} + assert copied_interface.comments == {"ca", "cc"} + assert copied_interface.order_weight == 201 + assert copied_interface.instances == [ + { + "hostname": interface_a.host.hostname, + "comments": interface_a.comments, + "tags": interface_a.tags, + }, + { + "hostname": interface_b.host.hostname, + "comments": interface_b.comments, + "tags": interface_b.tags, + }, + ] + + def test_line_inclusion_test(self): + config = HConfig(host=self.host_a) + interface = config.add_child("interface Vlan2") + ip_address_ab = interface.add_child("ip address 192.168.2.1/24") + ip_address_ab.append_tags(["a", "b"]) + + assert not ip_address_ab.line_inclusion_test({"a"}, {"b"}) + assert not ip_address_ab.line_inclusion_test(set(), {"a"}) + assert ip_address_ab.line_inclusion_test({"a"}, set()) + assert not ip_address_ab.line_inclusion_test(set(), set()) + + def test_lineage_test(self): + pass + + def test_future_config(self): + running_config = HConfig(host=self.host_a) + running_config.add_children_deep(["a", "aa", "aaa", "aaaa"]) + running_config.add_children_deep(["a", "ab", "aba", "abaa"]) + config = HConfig(host=self.host_a) + config.add_children_deep(["a", "ac"]) + config.add_children_deep(["a", "no ab"]) + config.add_children_deep(["a", "no az"]) + + future_config = running_config.future(config) + assert list(c.cisco_style_text() for c in future_config.all_children()) == [ + "a", + " ac", # config lines are added first + " no az", + " aa", # self lines not in config are added last + " aaa", + " aaaa", + ] + + def test_difference1(self): + rc = ["a", " a1", " a2", " a3", "b"] + step = ["a", " a1", " a2", " a3", " a4", " a5", "b", "c", "d", " d1"] + rc_hier = HConfig(host=self.host_a) + rc_hier.load_from_string("\n".join(rc)) + step_hier = HConfig(host=self.host_a) + step_hier.load_from_string("\n".join(step)) + + difference = step_hier.difference(rc_hier) + difference_children = list(c.cisco_style_text() for c in difference.all_children_sorted()) + + assert len(difference_children) == 6 + assert "c" in difference + assert "d" in difference + assert "a4" in difference.get_child("equals", "a") + assert "a5" in difference.get_child("equals", "a") + assert "d1" in difference.get_child("equals", "d") + + @staticmethod + def test_difference2(options_ios): + host = Host(hostname="test_host", os="ios", hconfig_options=options_ios) + rc = ["a", " a1", " a2", " a3", "b"] + step = ["a", " a1", " a2", " a3", " a4", " a5", "b", "c", "d", " d1"] + rc_hier = HConfig(host=host) + rc_hier.load_from_string("\n".join(rc)) + step_hier = HConfig(host=host) + step_hier.load_from_string("\n".join(step)) + + difference = step_hier.difference(rc_hier) + difference_children = list(c.cisco_style_text() for c in difference.all_children_sorted()) + assert len(difference_children) == 6 + + @staticmethod + def test_difference3(options_ios): + host = Host(hostname="test_host", os="ios", hconfig_options=options_ios) + rc = ["ip access-list extended test", " 10 a", " 20 b"] + step = ["ip access-list extended test", " 10 a", " 20 b", " 30 c"] + rc_hier = HConfig(host=host) + rc_hier.load_from_string("\n".join(rc)) + step_hier = HConfig(host=host) + step_hier.load_from_string("\n".join(step)) + + difference = step_hier.difference(rc_hier) + difference_children = list(c.cisco_style_text() for c in difference.all_children_sorted()) + assert difference_children == ["ip access-list extended test", " 30 c"] + + @staticmethod + def test_unified_diff(options_ios): + host = Host(hostname="test_host", os="ios", hconfig_options=options_ios) + config_a = HConfig(host=host) + config_b = HConfig(host=host) + # deep differences + config_a.add_children_deep(["a", "aa", "aaa", "aaaa"]) + config_b.add_children_deep(["a", "aa", "aab", "aaba"]) + # these children will be the same and should not appear in the diff + config_a.add_children_deep(["b", "ba", "baa"]) + config_b.add_children_deep(["b", "ba", "baa"]) + # root level differences + config_a.add_children_deep(["c", "ca"]) + config_b.add_child("d") + + diff = list(config_a.unified_diff(config_b)) + assert diff == [ + "a", + " aa", + " - aaa", + " - aaaa", + " + aab", + " + aaba", + "- c", + " - ca", + "+ d", + ] diff --git a/tests/unit/hier_config/test_hier_options.py b/tests/unit/hier_config/test_hier_options.py new file mode 100644 index 00000000..cd3ff912 --- /dev/null +++ b/tests/unit/hier_config/test_hier_options.py @@ -0,0 +1,15 @@ +import pytest + +from netutils.hier_config.options import options_for, base_options + + +class TestHConfigOptions: + @pytest.fixture(autouse=True) + def setup(self, options_ios, options_junos): + self.ios_options = options_ios + self.junos_options = options_junos + + def test_options(self): + assert self.ios_options == options_for("ios") + assert self.junos_options == options_for("junos") + assert {**base_options, **{"style": "example"}} == options_for("example") diff --git a/tests/unit/hier_config/test_host.py b/tests/unit/hier_config/test_host.py new file mode 100644 index 00000000..69dec267 --- /dev/null +++ b/tests/unit/hier_config/test_host.py @@ -0,0 +1,66 @@ +import pytest + +from netutils.hier_config.host import Host + + +class TestHost: + @pytest.fixture(autouse=True) + def setup(self, options_ios): + self.host = Host("example.rtr", "ios", options_ios) + self.host_bltn_opts = Host("example.rtr", "ios") + + def test_load_config_from(self, running_config, generated_config): + self.host.load_running_config(running_config) + self.host.load_generated_config(generated_config) + self.host_bltn_opts.load_running_config(running_config) + self.host_bltn_opts.load_generated_config(generated_config) + + assert len(self.host.generated_config) > 0 + assert len(self.host.running_config) > 0 + assert len(self.host_bltn_opts.running_config) > 0 + assert len(self.host_bltn_opts.generated_config) > 0 + + def test_load_remediation(self, running_config, generated_config): + self.host.load_running_config(running_config) + self.host.load_generated_config(generated_config) + self.host.remediation_config() + self.host_bltn_opts.load_running_config(running_config) + self.host_bltn_opts.load_generated_config(generated_config) + self.host_bltn_opts.remediation_config() + + assert len(self.host.remediation_config().children) > 0 + assert len(self.host_bltn_opts.remediation_config().children) > 0 + + def test_load_rollback(self, running_config, generated_config): + self.host.load_running_config(running_config) + self.host.load_generated_config(generated_config) + self.host.rollback_config() + self.host_bltn_opts.load_running_config(running_config) + self.host_bltn_opts.load_generated_config(generated_config) + self.host_bltn_opts.rollback_config() + + assert len(self.host.rollback_config().children) > 0 + assert len(self.host_bltn_opts.rollback_config().children) > 0 + + def test_load_tags(self, tags_ios): + self.host.load_tags(tags_ios) + assert len(self.host.hconfig_tags) > 0 + + def test_filter_remediation( + self, + running_config, + generated_config, + tags_ios, + remediation_config_with_safe_tags, + remediation_config_without_tags, + ): + self.host.load_running_config(running_config) + self.host.load_generated_config(generated_config) + self.host.load_tags(tags_ios) + + rem1 = self.host.remediation_config_filtered_text(set(), set()) + rem2 = self.host.remediation_config_filtered_text({"safe"}, set()) + + assert rem1 != rem2 + assert rem1 == remediation_config_without_tags + assert rem2 == remediation_config_with_safe_tags diff --git a/tests/unit/hier_config/test_juniper_syntax.py b/tests/unit/hier_config/test_juniper_syntax.py new file mode 100644 index 00000000..6a885679 --- /dev/null +++ b/tests/unit/hier_config/test_juniper_syntax.py @@ -0,0 +1,46 @@ +import pytest + +from netutils.hier_config.host import Host + + +class TestJuniperSyntax: + @pytest.fixture(autouse=True) + def setUpClass( + self, + options_junos, + running_config_junos, + running_config_flat_junos, + generated_config_junos, + generated_config_flat_junos, + remediation_config_flat_junos, + ): + self.os = "junos" + self.host = Host("example1.rtr", self.os, options_junos) + self.running_config_str = "set vlans switch_mgmt_10.0.2.0/24 vlan-id 2" + self.generated_config_str = "set vlans switch_mgmt_10.0.3.0/24 vlan-id 3" + self.remediation_str = ( + "delete vlans switch_mgmt_10.0.2.0/24 vlan-id 2\nset vlans switch_mgmt_10.0.3.0/24 vlan-id 3" + ) + self.running_config_junos = running_config_junos + self.running_config_flat_junos = running_config_flat_junos + self.generated_config_junos = generated_config_junos + self.generated_config_flat_junos = generated_config_flat_junos + self.remediation_config_flat_junos = remediation_config_flat_junos + + def test_junos_basic_remediation(self): + self.host.load_running_config(self.running_config_str) + self.host.load_generated_config(self.generated_config_str) + self.host.remediation_config() + assert self.remediation_str == str(self.host.remediation_config()) + + def test_junos_convert_to_set(self): + self.host.load_running_config(self.running_config_junos) + self.host.load_generated_config(self.generated_config_junos) + assert self.remediation_config_flat_junos == str(self.host.remediation_config()) + + def test_flat_junos_remediation(self): + self.host.load_running_config(self.running_config_flat_junos) + self.host.load_generated_config(self.generated_config_flat_junos) + remediation_list = self.remediation_config_flat_junos.splitlines() + for line in str(self.host.remediation_config()).splitlines(): + assert line in remediation_list diff --git a/tests/unit/hier_config/test_negate_with_undo.py b/tests/unit/hier_config/test_negate_with_undo.py new file mode 100644 index 00000000..2c486480 --- /dev/null +++ b/tests/unit/hier_config/test_negate_with_undo.py @@ -0,0 +1,19 @@ +import pytest + +from netutils.hier_config.host import Host + + +class TestNegateWithUndo: + @pytest.fixture(autouse=True) + def setUpClass(self, options_negate_with_undo): + self.os = "comware5" + self.running_config = "test_for_undo\nundo test_for_redo" + self.generated_config = "undo test_for_undo\ntest_for_redo" + self.remediation = "undo test_for_undo\ntest_for_redo" + self.host = Host("example1.rtr", self.os, options_negate_with_undo) + + def test_merge(self): + self.host.load_running_config(self.running_config) + self.host.load_generated_config(self.generated_config) + self.host.remediation_config() + assert self.remediation == str(self.host.remediation_config()) diff --git a/tests/unit/hier_config/test_text_match.py b/tests/unit/hier_config/test_text_match.py new file mode 100644 index 00000000..8c83fa64 --- /dev/null +++ b/tests/unit/hier_config/test_text_match.py @@ -0,0 +1,45 @@ +from netutils.hier_config import text_match + + +text = " ip address 192.168.100.1/24" +expression1 = text +expression2 = " ip address" +expression3 = "ip access-list" +expression4 = "/30" + + +def test_equals(): + assert text_match.equals(text, expression1) + assert not text_match.equals(text, expression2) + + +def test_startswith(): + assert text_match.startswith(text, expression2) + assert not text_match.startswith(text, expression3) + + +def test_endswith(): + assert text_match.endswith(text, expression1) + assert not text_match.endswith(text, expression4) + + +def test_contains(): + assert text_match.contains(text, expression2) + assert not text_match.contains(text, expression3) + + +def test_re_search(): + assert text_match.re_search(text, expression2) + assert not text_match.re_search(text, expression3) + + +def test_anything(): + assert text_match.anything(text, expression1) + assert text_match.anything(text, expression2) + + +def test_nothing(): + assert not text_match.nothing(text, expression1) + assert not text_match.nothing(text, expression2) + assert not text_match.nothing(text, expression3) + assert not text_match.nothing(text, expression4) diff --git a/tests/unit/hier_config/test_various.py b/tests/unit/hier_config/test_various.py new file mode 100644 index 00000000..408980ad --- /dev/null +++ b/tests/unit/hier_config/test_various.py @@ -0,0 +1,44 @@ +from netutils.hier_config import HConfig, Host + + +def test_issue104() -> None: + running_config_raw = "tacacs-server deadtime 3\n" "tacacs-server host 192.168.1.99 key 7 Test12345\n" + generated_config_raw = ( + "tacacs-server host 192.168.1.98 key 0 Test135 timeout 3\n" + "tacacs-server host 192.168.100.98 key 0 test135 timeout 3\n" + ) + + host = Host(hostname="test", os="nxos") + running_config = HConfig(host=host) + running_config.load_from_string(running_config_raw) + generated_config = HConfig(host=host) + generated_config.load_from_string(generated_config_raw) + rem = running_config.config_to_get_to(generated_config) + expected_rem_lines = { + "no tacacs-server deadtime 3", + "no tacacs-server host 192.168.1.99 key 7 Test12345", + "tacacs-server host 192.168.1.98 key 0 Test135 timeout 3", + "tacacs-server host 192.168.100.98 key 0 test135 timeout 3", + } + rem_lines = {line.cisco_style_text() for line in rem.all_children()} + assert expected_rem_lines == rem_lines + + +def test_issue_113() -> None: + running_config_raw = ( + "interface Ethernet1/1\n" " description test\n" " ip address 192.0.2.1 255.255.255.0\n" " switchport\n" + ) + generated_config_raw = "interface Ethernet1/1\n" " ip address 192.0.2.1 255.255.255.0\n" " switchport\n" + + host = Host(hostname="test", os="ios") + running_config = HConfig(host=host) + running_config.load_from_string(running_config_raw) + generated_config = HConfig(host=host) + generated_config.load_from_string(generated_config_raw) + rem = running_config.config_to_get_to(generated_config) + expected_rem_lines = { + "interface Ethernet1/1", + " no description test", + } + rem_lines = {line.cisco_style_text() for line in rem.all_children()} + assert expected_rem_lines == rem_lines