From defa1e106911002d86bedbb351c8083700ed026d Mon Sep 17 00:00:00 2001 From: ccoueffe Date: Tue, 15 Oct 2024 15:24:13 +0200 Subject: [PATCH] rewrite and fix static function --- .../502_policy_vrf_lite_cross_reference.py | 161 +++++++++--------- 1 file changed, 81 insertions(+), 80 deletions(-) diff --git a/roles/validate/files/rules/required_rules/502_policy_vrf_lite_cross_reference.py b/roles/validate/files/rules/required_rules/502_policy_vrf_lite_cross_reference.py index bb651693..f3e08395 100644 --- a/roles/validate/files/rules/required_rules/502_policy_vrf_lite_cross_reference.py +++ b/roles/validate/files/rules/required_rules/502_policy_vrf_lite_cross_reference.py @@ -27,7 +27,10 @@ class Rule: severity = "HIGH" @classmethod - def match(cls, inventory): + def match(cls, data): + ''' + function used by iac-validate + ''' vrf_lites = [] topology_switches = [] policies = [] @@ -36,16 +39,16 @@ def match(cls, inventory): static_routes_compliance = [] # Get fabric switches - if inventory.get("vxlan"): - if inventory["vxlan"].get("topology"): - if inventory.get("vxlan").get("topology").get("switches"): - topology_switches = inventory.get("vxlan").get("topology").get("switches") + if data.get("vxlan"): + if data["vxlan"].get("topology"): + if data.get("vxlan").get("topology").get("switches"): + topology_switches = data.get("vxlan").get("topology").get("switches") # Get vrf-lites policies - if inventory.get("vxlan", None): - if inventory["vxlan"].get("overlay_extensions", None): - if inventory["vxlan"].get("overlay_extensions").get("vrf_lites", None): - vrf_lites = inventory["vxlan"]["overlay_extensions"]["vrf_lites"] + if data.get("vxlan", None): + if data["vxlan"].get("overlay_extensions", None): + if data["vxlan"].get("overlay_extensions").get("vrf_lites", None): + vrf_lites = data["vxlan"]["overlay_extensions"]["vrf_lites"] for policy in vrf_lites: # Check Global Level if policy.get('name', None): @@ -57,9 +60,11 @@ def match(cls, inventory): if policy.get('switches'): # Check switch Level for switch_policy in policy['switches']: cls.check_switch_level(results, switch_policy, policy, - inventory["vxlan"]['global']['bgp_asn'], + data["vxlan"]['global']['bgp_asn'], topology_switches, static_routes_compliance) + cls.check_route_compliance_across_policies(results, static_routes_compliance) + return results @classmethod @@ -69,9 +74,9 @@ def check_global_ospf_and_bgp(cls, results, policy): ''' if "ospf" in policy and "bgp" in policy: results.append( - f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf ," + - f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.bgp." + - "BGP and OSPF are defined in the same policy " + + f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf ," + f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.bgp." + "BGP and OSPF are defined in the same policy " "Please use two different policies") @classmethod @@ -84,11 +89,10 @@ def check_global_ospf_area(cls, results, policy): for area in policy['ospf']['areas']: if "id" in area and area.get('area_type'): # Check if AREA 0 is not standard - if ((area['id'] == 0 or area['id'] == '0.0.0.0') - and area['area_type'] != 'standard'): + if ((area['id'] == 0 or area['id'] == '0.0.0.0') and (area['area_type'] != 'standard')): results.append( f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf.areas.id.0. " - f"area_type is defined to {area['area_type']}" + + f"area_type is defined to {area['area_type']}. " "Backbone area is always standard" ) @@ -125,8 +129,8 @@ def check_switch_level(cls, results, switch_policy, policy, # Check if OSPF and BGP is enabled if ospf is True and bgp is True: results.append( - f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.switches.{switch_policy['name']}. " + - "BGP and OSPF are configured in the same policy at the switch level. " + + f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.switches.{switch_policy['name']}. " + "BGP and OSPF are configured in the same policy at the switch level. " "Please use two different policies") # Check if switch exists in topology @@ -140,9 +144,7 @@ def check_switch_level(cls, results, switch_policy, policy, static_routes_compliance.append({"policy": policy['name'], "vrf": policy['vrf'], "switch": switch_policy['name'], - "routes": switch_policy['static_routes']}) - - cls.check_route_compliance_across_policies(results, static_routes_compliance) + "prefix": switch_policy['static_routes']}) @classmethod def check_switch_in_topology(cls, results, switch, topology_switches, policy): @@ -153,7 +155,7 @@ def check_switch_in_topology(cls, results, switch, topology_switches, policy): pass else: results.append( - f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch} " + + f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch} " "is not defined in vxlan.topology.switches" ) @@ -166,7 +168,7 @@ def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None): if "auth_type" in ospf and ospf['auth_type'] is not None: if "auth_key" not in ospf: results.append( - f"In the policy: {policy}, auth_type is {ospf['auth_type']} " + + f"In the policy: {policy}, auth_type is {ospf['auth_type']} " "but auth_key is missing" ) @@ -175,7 +177,7 @@ def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None): if interface.startswith('Lo') and ospf['network_type'] == "broadcast": results.append( f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.interfaces.{interface}.ospf. " - f"network_type: {ospf['network_type']}" + + f"network_type: {ospf['network_type']}" " is not supported with Loopback" ) @@ -200,7 +202,7 @@ def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn if bgp_peer['remote_as'] != fabric_asn: results.append( f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.bgp_peers.{bgp_peer['address']}.address_family_ipv4_unicast" - f"route_reflector_client: {bgp_peer['address_family_ipv4_unicast']['route_reflector_client']} " + + f"route_reflector_client: {bgp_peer['address_family_ipv4_unicast']['route_reflector_client']} " "is not allowed in eBGP" ) @@ -211,7 +213,7 @@ def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn if bgp_peer['remote_as'] != fabric_asn: results.append( f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.bgp_peers.{bgp_peer['address']}.address_family_ipv6_unicast" - f"route_reflector_client: {bgp_peer['address_family_ipv4_unicast']['route_reflector_client']} " + + f"route_reflector_client: {bgp_peer['address_family_ipv6_unicast']['route_reflector_client']} " "is not allowed in eBGP" ) @@ -220,62 +222,61 @@ def check_route_compliance_across_policies(cls, results, routes): ''' Check routes compliance across policies ''' + good_route = [] for route in routes: - for route2 in routes: - if route['vrf'] == route2['vrf']: - if route['switch'] == route2['switch']: - for nb_pref in range(len(route['routes'])): - # Check if same number of prefixes - if len(route['routes']) == len(route2['routes']): - # Check if prefixes are equal - base_pref = route['routes'][nb_pref]['prefix'] - list_pref = route2['routes'] - if list(filter(lambda pref: pref['prefix'] == base_pref, list_pref)): - # Check route TAG - if route['routes'][nb_pref]['route_tag'] == route2['routes'][nb_pref]['route_tag']: - for nb_nh in range(len(route['routes'][nb_pref]['next_hops'])): - # Check if number of Next-Hops are equal - if len(route['routes'][nb_pref]['next_hops']) == len(route2['routes'][nb_pref]['next_hops']): - base_nh = route['routes'][nb_pref]['next_hops'][nb_nh]['ip'] - list_nh = route2['routes'][nb_pref]['next_hops'] - # Check if next-hop are is present in the list - if list(filter(lambda nh: nh['ip'] == base_nh, list_nh)): - pass - else: - results.append( - f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " - f"next_hops are different. " - f"Local: {route['routes'][nb_pref]['next_hops']} " - f"- Remote: {route2['routes'][nb_pref]['next_hops']}" - ) - break - else: - results.append( - f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " - f"next_hops number is different. " - f"Local: {route['routes'][nb_pref]['next_hops']} " - f"- Remote: {route2['routes'][nb_pref]['next_hops']}" - ) - break + if route['prefix'] is not None: + # Check if route contain mandatory parameter: prefix, next_hop for each prefix + bad_route = False + for index in range(len(route['prefix'])): + if ("prefix" not in route['prefix'][index]) or ("next_hops" not in route['prefix'][index]): + results.append( + f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " + f"prefix or next_hops is not defined. {route['prefix'][index]}" + ) + bad_route = True + if bad_route is False: + good_route.append(route) + # Else no parameter found + else: + results.append( + f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " + f"static_routers is defined without paramater." + ) + + # Compare routes + for index in range(len(good_route) - 1): + for index2 in range(index + 1, len(good_route)): + if ((good_route[index]['vrf'] == good_route[index2]['vrf']) and (good_route[index]['switch'] == good_route[index2]['switch'])): + # Check prefixes are equal + for prefix_to_be_compared in good_route[index]['prefix']: + prefixes_for_comparison = [sub['prefix'] for sub in good_route[index2]['prefix']] + if prefix_to_be_compared['prefix'] in prefixes_for_comparison: + # Find the prefix to compare route_tag and Next_hops + for pref_for_comparison in good_route[index2]['prefix']: + if prefix_to_be_compared['prefix'] == pref_for_comparison['prefix']: + # Check if route_tag exist if equal + if ("route_tag" in prefix_to_be_compared and "route_tag" in pref_for_comparison and prefix_to_be_compared["route_tag"] == pref_for_comparison["route_tag"]): + next_hops_to_be_compared = sorted([sub['ip'] for sub in prefix_to_be_compared['next_hops']]) + next_hops_for_comparison = sorted([sub['ip'] for sub in pref_for_comparison['next_hops']]) + # Check if next_hops are equal for the same prefix. + if next_hops_to_be_compared != next_hops_for_comparison: + results.append( + f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}.{next_hops_to_be_compared} " + f"Static routes are not consistent across policies. next_hop are different in " + f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{pref_for_comparison['prefix']}.{next_hops_for_comparison}" + ) + break else: results.append( - f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " - f"route_tag is different. " - f"Local: {route['routes'][nb_pref]['route_tag']} " - f"- Remote: {route2['routes'][nb_pref]['route_tag']}" + f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. " + f"Static routes are not consistent across policies. route_tag are different in " + f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}" ) break - else: - results.append( - f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " - f"prefix is different. " - f"Local: {route['routes'][nb_pref]['prefix']} " - f"- Remote: {route2['routes'][nb_pref]['prefix']}" - ) - break - else: - results.append( - f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. " - f"Prefixes number is different. " - f"Local: {len(route['routes'])} - Remote: {len(route2['routes'])}" - ) + else: + results.append( + f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. " + f"Static routes are not consistent across policies. Prefix not found in " + f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}" + ) + break