diff --git a/roles/validate/files/rules/required_rules/502_policy_vrf_lite_cross_reference.py b/roles/validate/files/rules/required_rules/502_policy_vrf_lite_cross_reference.py index f3e08395..8290e7ee 100644 --- a/roles/validate/files/rules/required_rules/502_policy_vrf_lite_cross_reference.py +++ b/roles/validate/files/rules/required_rules/502_policy_vrf_lite_cross_reference.py @@ -25,6 +25,7 @@ class Rule: id = "502" description = "Verify VRF-Lites Cross Reference Between Policies, Groups, and Switches" severity = "HIGH" + results = [] @classmethod def match(cls, data): @@ -34,7 +35,6 @@ def match(cls, data): vrf_lites = [] topology_switches = [] policies = [] - results = [] switch_policy = [] static_routes_compliance = [] @@ -54,33 +54,33 @@ def match(cls, data): if policy.get('name', None): policies.append(policy['name']) - cls.check_global_ospf_and_bgp(results, policy) - cls.check_global_ospf_area(results, policy) + cls.check_global_ospf_and_bgp(policy) + cls.check_global_ospf_area(policy) if policy.get('switches'): # Check switch Level for switch_policy in policy['switches']: - cls.check_switch_level(results, switch_policy, policy, + cls.check_switch_level(switch_policy, policy, data["vxlan"]['global']['bgp_asn'], topology_switches, static_routes_compliance) - cls.check_route_compliance_across_policies(results, static_routes_compliance) + cls.check_route_compliance_across_policies(static_routes_compliance) - return results + return cls.results @classmethod - def check_global_ospf_and_bgp(cls, results, policy): + def check_global_ospf_and_bgp(cls, policy): ''' Check if OSPF and BGP is enabled in the global policy ''' if "ospf" in policy and "bgp" in policy: - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf ," f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.bgp." "BGP and OSPF are defined in the same policy " "Please use two different policies") @classmethod - def check_global_ospf_area(cls, results, policy): + def check_global_ospf_area(cls, policy): ''' Check OSPF if backbone area is standard ''' @@ -90,14 +90,14 @@ def check_global_ospf_area(cls, results, policy): if "id" in area and area.get('area_type'): # Check if AREA 0 is not standard if ((area['id'] == 0 or area['id'] == '0.0.0.0') and (area['area_type'] != 'standard')): - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf.areas.id.0. " f"area_type is defined to {area['area_type']}. " "Backbone area is always standard" ) @classmethod - def check_switch_level(cls, results, switch_policy, policy, + def check_switch_level(cls, switch_policy, policy, infra_bgp_asn, topology_switches, static_routes_compliance): ''' Check switch level @@ -109,8 +109,7 @@ def check_switch_level(cls, results, switch_policy, policy, if "bgp" in switch_policy or "bgp_peers" in switch_policy: bgp = True if "bgp_peers" in switch_policy: - cls.check_switch_bgp_route_reflector(results, - switch=switch_policy['name'], + cls.check_switch_bgp_route_reflector(switch=switch_policy['name'], bgp_peers=switch_policy['bgp_peers'], fabric_asn=infra_bgp_asn, policy=policy['name']) @@ -120,22 +119,20 @@ def check_switch_level(cls, results, switch_policy, policy, for interface in switch_policy['interfaces']: if interface.get('ospf'): ospf = True - cls.check_switch_ospf(results, - interface['ospf'], + cls.check_switch_ospf(interface['ospf'], switch_policy['name'], interface['name'], policy['name']) # Check if OSPF and BGP is enabled if ospf is True and bgp is True: - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.switches.{switch_policy['name']}. " "BGP and OSPF are configured in the same policy at the switch level. " "Please use two different policies") # Check if switch exists in topology - cls.check_switch_in_topology(results, - switch_policy['name'], + cls.check_switch_in_topology(switch_policy['name'], topology_switches, policy['name']) @@ -147,27 +144,27 @@ def check_switch_level(cls, results, switch_policy, policy, "prefix": switch_policy['static_routes']}) @classmethod - def check_switch_in_topology(cls, results, switch, topology_switches, policy): + def check_switch_in_topology(cls, switch, topology_switches, policy): ''' Check if switch is in the topology ''' if list(filter(lambda topo: topo['name'] == switch, topology_switches)): pass else: - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch} " "is not defined in vxlan.topology.switches" ) @classmethod - def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None): + def check_switch_ospf(cls, ospf, switch, interface=None, policy=None): ''' Check OSPF parameters ''' # Check if key exists if authentication is enabled if "auth_type" in ospf and ospf['auth_type'] is not None: if "auth_key" not in ospf: - results.append( + cls.results.append( f"In the policy: {policy}, auth_type is {ospf['auth_type']} " "but auth_key is missing" ) @@ -175,7 +172,7 @@ def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None): # Check if Network type for Loopback interface is not Broadcast if ospf.get('network_type'): if interface.startswith('Lo') and ospf['network_type'] == "broadcast": - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.interfaces.{interface}.ospf. " f"network_type: {ospf['network_type']}" " is not supported with Loopback" @@ -184,13 +181,13 @@ def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None): # Check if Adversise-subnet is used only for Loopback in ospf if ospf.get('advertise_subnet'): if not interface.startswith('Lo') and ospf['advertise_subnet']: - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.interfaces.{interface}.ospf. " f"advertise_subnet: True is only supported with Loopback" ) @classmethod - def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn, policy): + def check_switch_bgp_route_reflector(cls, switch, bgp_peers, fabric_asn, policy): ''' Check if route-reflector is enabled in eBGP ''' @@ -200,7 +197,7 @@ def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn if bgp_peer['address_family_ipv4_unicast'].get('route_reflector_client'): if bgp_peer['address_family_ipv4_unicast']['route_reflector_client'] is True: if bgp_peer['remote_as'] != fabric_asn: - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.bgp_peers.{bgp_peer['address']}.address_family_ipv4_unicast" f"route_reflector_client: {bgp_peer['address_family_ipv4_unicast']['route_reflector_client']} " "is not allowed in eBGP" @@ -211,14 +208,14 @@ def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn if bgp_peer['address_family_ipv6_unicast'].get('route_reflector_client'): if bgp_peer['address_family_ipv6_unicast']['route_reflector_client'] is True: if bgp_peer['remote_as'] != fabric_asn: - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.bgp_peers.{bgp_peer['address']}.address_family_ipv6_unicast" f"route_reflector_client: {bgp_peer['address_family_ipv6_unicast']['route_reflector_client']} " "is not allowed in eBGP" ) @classmethod - def check_route_compliance_across_policies(cls, results, routes): + def check_route_compliance_across_policies(cls, routes): ''' Check routes compliance across policies ''' @@ -229,7 +226,7 @@ def check_route_compliance_across_policies(cls, results, routes): bad_route = False for index in range(len(route['prefix'])): if ("prefix" not in route['prefix'][index]) or ("next_hops" not in route['prefix'][index]): - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " f"prefix or next_hops is not defined. {route['prefix'][index]}" ) @@ -238,7 +235,7 @@ def check_route_compliance_across_policies(cls, results, routes): good_route.append(route) # Else no parameter found else: - results.append( + cls.results.append( f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " f"static_routers is defined without paramater." ) @@ -255,28 +252,35 @@ def check_route_compliance_across_policies(cls, results, routes): for pref_for_comparison in good_route[index2]['prefix']: if prefix_to_be_compared['prefix'] == pref_for_comparison['prefix']: # Check if route_tag exist if equal - if ("route_tag" in prefix_to_be_compared and "route_tag" in pref_for_comparison and prefix_to_be_compared["route_tag"] == pref_for_comparison["route_tag"]): + if ("route_tag" in prefix_to_be_compared and "route_tag" in pref_for_comparison and + prefix_to_be_compared["route_tag"] == pref_for_comparison["route_tag"]): next_hops_to_be_compared = sorted([sub['ip'] for sub in prefix_to_be_compared['next_hops']]) next_hops_for_comparison = sorted([sub['ip'] for sub in pref_for_comparison['next_hops']]) # Check if next_hops are equal for the same prefix. if next_hops_to_be_compared != next_hops_for_comparison: - results.append( - f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}.{next_hops_to_be_compared} " + cls.results.append( + f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches." + f"{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}.{next_hops_to_be_compared} " f"Static routes are not consistent across policies. next_hop are different in " - f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{pref_for_comparison['prefix']}.{next_hops_for_comparison}" + f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches." + f"{good_route[index2]['switch']}.static_routes.{pref_for_comparison['prefix']}.{next_hops_for_comparison}" ) break else: - results.append( - f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. " + cls.results.append( + f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches." + f"{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. " f"Static routes are not consistent across policies. route_tag are different in " - f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}" + f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches." + f"{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}" ) break else: - results.append( - f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. " + cls.results.append( + f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches." + f"{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. " f"Static routes are not consistent across policies. Prefix not found in " - f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}" + f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches." + f"{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}" ) break