Skip to content

Commit

Permalink
rewrite and fix static function
Browse files Browse the repository at this point in the history
  • Loading branch information
ccoueffe committed Oct 15, 2024
1 parent 998842c commit defa1e1
Showing 1 changed file with 81 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ class Rule:
severity = "HIGH"

@classmethod
def match(cls, inventory):
def match(cls, data):
'''
function used by iac-validate
'''
vrf_lites = []
topology_switches = []
policies = []
Expand All @@ -36,16 +39,16 @@ def match(cls, inventory):
static_routes_compliance = []

# Get fabric switches
if inventory.get("vxlan"):
if inventory["vxlan"].get("topology"):
if inventory.get("vxlan").get("topology").get("switches"):
topology_switches = inventory.get("vxlan").get("topology").get("switches")
if data.get("vxlan"):
if data["vxlan"].get("topology"):
if data.get("vxlan").get("topology").get("switches"):
topology_switches = data.get("vxlan").get("topology").get("switches")

# Get vrf-lites policies
if inventory.get("vxlan", None):
if inventory["vxlan"].get("overlay_extensions", None):
if inventory["vxlan"].get("overlay_extensions").get("vrf_lites", None):
vrf_lites = inventory["vxlan"]["overlay_extensions"]["vrf_lites"]
if data.get("vxlan", None):
if data["vxlan"].get("overlay_extensions", None):
if data["vxlan"].get("overlay_extensions").get("vrf_lites", None):
vrf_lites = data["vxlan"]["overlay_extensions"]["vrf_lites"]

for policy in vrf_lites: # Check Global Level
if policy.get('name', None):
Expand All @@ -57,9 +60,11 @@ def match(cls, inventory):
if policy.get('switches'): # Check switch Level
for switch_policy in policy['switches']:
cls.check_switch_level(results, switch_policy, policy,
inventory["vxlan"]['global']['bgp_asn'],
data["vxlan"]['global']['bgp_asn'],
topology_switches, static_routes_compliance)

cls.check_route_compliance_across_policies(results, static_routes_compliance)

return results

@classmethod
Expand All @@ -69,9 +74,9 @@ def check_global_ospf_and_bgp(cls, results, policy):
'''
if "ospf" in policy and "bgp" in policy:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf ," +
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.bgp." +
"BGP and OSPF are defined in the same policy " +
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf ,"
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.bgp."
"BGP and OSPF are defined in the same policy "
"Please use two different policies")

@classmethod
Expand All @@ -84,11 +89,10 @@ def check_global_ospf_area(cls, results, policy):
for area in policy['ospf']['areas']:
if "id" in area and area.get('area_type'):
# Check if AREA 0 is not standard
if ((area['id'] == 0 or area['id'] == '0.0.0.0')
and area['area_type'] != 'standard'):
if ((area['id'] == 0 or area['id'] == '0.0.0.0') and (area['area_type'] != 'standard')):
results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf.areas.id.0. "
f"area_type is defined to {area['area_type']}" +
f"area_type is defined to {area['area_type']}. "
"Backbone area is always standard"
)

Expand Down Expand Up @@ -125,8 +129,8 @@ def check_switch_level(cls, results, switch_policy, policy,
# Check if OSPF and BGP is enabled
if ospf is True and bgp is True:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.switches.{switch_policy['name']}. " +
"BGP and OSPF are configured in the same policy at the switch level. " +
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.switches.{switch_policy['name']}. "
"BGP and OSPF are configured in the same policy at the switch level. "
"Please use two different policies")

# Check if switch exists in topology
Expand All @@ -140,9 +144,7 @@ def check_switch_level(cls, results, switch_policy, policy,
static_routes_compliance.append({"policy": policy['name'],
"vrf": policy['vrf'],
"switch": switch_policy['name'],
"routes": switch_policy['static_routes']})

cls.check_route_compliance_across_policies(results, static_routes_compliance)
"prefix": switch_policy['static_routes']})

@classmethod
def check_switch_in_topology(cls, results, switch, topology_switches, policy):
Expand All @@ -153,7 +155,7 @@ def check_switch_in_topology(cls, results, switch, topology_switches, policy):
pass
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch} " +
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch} "
"is not defined in vxlan.topology.switches"
)

Expand All @@ -166,7 +168,7 @@ def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None):
if "auth_type" in ospf and ospf['auth_type'] is not None:
if "auth_key" not in ospf:
results.append(
f"In the policy: {policy}, auth_type is {ospf['auth_type']} " +
f"In the policy: {policy}, auth_type is {ospf['auth_type']} "
"but auth_key is missing"
)

Expand All @@ -175,7 +177,7 @@ def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None):
if interface.startswith('Lo') and ospf['network_type'] == "broadcast":
results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.interfaces.{interface}.ospf. "
f"network_type: {ospf['network_type']}" +
f"network_type: {ospf['network_type']}"
" is not supported with Loopback"
)

Expand All @@ -200,7 +202,7 @@ def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn
if bgp_peer['remote_as'] != fabric_asn:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.bgp_peers.{bgp_peer['address']}.address_family_ipv4_unicast"
f"route_reflector_client: {bgp_peer['address_family_ipv4_unicast']['route_reflector_client']} " +
f"route_reflector_client: {bgp_peer['address_family_ipv4_unicast']['route_reflector_client']} "
"is not allowed in eBGP"
)

Expand All @@ -211,7 +213,7 @@ def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn
if bgp_peer['remote_as'] != fabric_asn:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.bgp_peers.{bgp_peer['address']}.address_family_ipv6_unicast"
f"route_reflector_client: {bgp_peer['address_family_ipv4_unicast']['route_reflector_client']} " +
f"route_reflector_client: {bgp_peer['address_family_ipv6_unicast']['route_reflector_client']} "
"is not allowed in eBGP"
)

Expand All @@ -220,62 +222,61 @@ def check_route_compliance_across_policies(cls, results, routes):
'''
Check routes compliance across policies
'''
good_route = []
for route in routes:
for route2 in routes:
if route['vrf'] == route2['vrf']:
if route['switch'] == route2['switch']:
for nb_pref in range(len(route['routes'])):
# Check if same number of prefixes
if len(route['routes']) == len(route2['routes']):
# Check if prefixes are equal
base_pref = route['routes'][nb_pref]['prefix']
list_pref = route2['routes']
if list(filter(lambda pref: pref['prefix'] == base_pref, list_pref)):
# Check route TAG
if route['routes'][nb_pref]['route_tag'] == route2['routes'][nb_pref]['route_tag']:
for nb_nh in range(len(route['routes'][nb_pref]['next_hops'])):
# Check if number of Next-Hops are equal
if len(route['routes'][nb_pref]['next_hops']) == len(route2['routes'][nb_pref]['next_hops']):
base_nh = route['routes'][nb_pref]['next_hops'][nb_nh]['ip']
list_nh = route2['routes'][nb_pref]['next_hops']
# Check if next-hop are is present in the list
if list(filter(lambda nh: nh['ip'] == base_nh, list_nh)):
pass
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"next_hops are different. "
f"Local: {route['routes'][nb_pref]['next_hops']} "
f"- Remote: {route2['routes'][nb_pref]['next_hops']}"
)
break
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"next_hops number is different. "
f"Local: {route['routes'][nb_pref]['next_hops']} "
f"- Remote: {route2['routes'][nb_pref]['next_hops']}"
)
break
if route['prefix'] is not None:
# Check if route contain mandatory parameter: prefix, next_hop for each prefix
bad_route = False
for index in range(len(route['prefix'])):
if ("prefix" not in route['prefix'][index]) or ("next_hops" not in route['prefix'][index]):
results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"prefix or next_hops is not defined. {route['prefix'][index]}"
)
bad_route = True
if bad_route is False:
good_route.append(route)
# Else no parameter found
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"static_routers is defined without paramater."
)

# Compare routes
for index in range(len(good_route) - 1):
for index2 in range(index + 1, len(good_route)):
if ((good_route[index]['vrf'] == good_route[index2]['vrf']) and (good_route[index]['switch'] == good_route[index2]['switch'])):
# Check prefixes are equal
for prefix_to_be_compared in good_route[index]['prefix']:
prefixes_for_comparison = [sub['prefix'] for sub in good_route[index2]['prefix']]
if prefix_to_be_compared['prefix'] in prefixes_for_comparison:
# Find the prefix to compare route_tag and Next_hops
for pref_for_comparison in good_route[index2]['prefix']:
if prefix_to_be_compared['prefix'] == pref_for_comparison['prefix']:
# Check if route_tag exist if equal
if ("route_tag" in prefix_to_be_compared and "route_tag" in pref_for_comparison and prefix_to_be_compared["route_tag"] == pref_for_comparison["route_tag"]):
next_hops_to_be_compared = sorted([sub['ip'] for sub in prefix_to_be_compared['next_hops']])
next_hops_for_comparison = sorted([sub['ip'] for sub in pref_for_comparison['next_hops']])
# Check if next_hops are equal for the same prefix.
if next_hops_to_be_compared != next_hops_for_comparison:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}.{next_hops_to_be_compared} "
f"Static routes are not consistent across policies. next_hop are different in "
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{pref_for_comparison['prefix']}.{next_hops_for_comparison}"
)
break
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"route_tag is different. "
f"Local: {route['routes'][nb_pref]['route_tag']} "
f"- Remote: {route2['routes'][nb_pref]['route_tag']}"
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. "
f"Static routes are not consistent across policies. route_tag are different in "
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}"
)
break
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"prefix is different. "
f"Local: {route['routes'][nb_pref]['prefix']} "
f"- Remote: {route2['routes'][nb_pref]['prefix']}"
)
break
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"Prefixes number is different. "
f"Local: {len(route['routes'])} - Remote: {len(route2['routes'])}"
)
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. "
f"Static routes are not consistent across policies. Prefix not found in "
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}"
)
break

0 comments on commit defa1e1

Please sign in to comment.