Skip to content

Commit

Permalink
move result variable
Browse files Browse the repository at this point in the history
  • Loading branch information
ccoueffe committed Oct 15, 2024
1 parent defa1e1 commit 5d4b466
Showing 1 changed file with 44 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Rule:
id = "502"
description = "Verify VRF-Lites Cross Reference Between Policies, Groups, and Switches"
severity = "HIGH"
results = []

@classmethod
def match(cls, data):
Expand All @@ -34,7 +35,6 @@ def match(cls, data):
vrf_lites = []
topology_switches = []
policies = []
results = []
switch_policy = []
static_routes_compliance = []

Expand All @@ -54,33 +54,33 @@ def match(cls, data):
if policy.get('name', None):
policies.append(policy['name'])

cls.check_global_ospf_and_bgp(results, policy)
cls.check_global_ospf_area(results, policy)
cls.check_global_ospf_and_bgp(policy)
cls.check_global_ospf_area(policy)

if policy.get('switches'): # Check switch Level
for switch_policy in policy['switches']:
cls.check_switch_level(results, switch_policy, policy,
cls.check_switch_level(switch_policy, policy,
data["vxlan"]['global']['bgp_asn'],
topology_switches, static_routes_compliance)

cls.check_route_compliance_across_policies(results, static_routes_compliance)
cls.check_route_compliance_across_policies(static_routes_compliance)

return results
return cls.results

@classmethod
def check_global_ospf_and_bgp(cls, results, policy):
def check_global_ospf_and_bgp(cls, policy):
'''
Check if OSPF and BGP is enabled in the global policy
'''
if "ospf" in policy and "bgp" in policy:
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf ,"
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.bgp."
"BGP and OSPF are defined in the same policy "
"Please use two different policies")

@classmethod
def check_global_ospf_area(cls, results, policy):
def check_global_ospf_area(cls, policy):
'''
Check OSPF if backbone area is standard
'''
Expand All @@ -90,14 +90,14 @@ def check_global_ospf_area(cls, results, policy):
if "id" in area and area.get('area_type'):
# Check if AREA 0 is not standard
if ((area['id'] == 0 or area['id'] == '0.0.0.0') and (area['area_type'] != 'standard')):
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.ospf.areas.id.0. "
f"area_type is defined to {area['area_type']}. "
"Backbone area is always standard"
)

@classmethod
def check_switch_level(cls, results, switch_policy, policy,
def check_switch_level(cls, switch_policy, policy,
infra_bgp_asn, topology_switches, static_routes_compliance):
'''
Check switch level
Expand All @@ -109,8 +109,7 @@ def check_switch_level(cls, results, switch_policy, policy,
if "bgp" in switch_policy or "bgp_peers" in switch_policy:
bgp = True
if "bgp_peers" in switch_policy:
cls.check_switch_bgp_route_reflector(results,
switch=switch_policy['name'],
cls.check_switch_bgp_route_reflector(switch=switch_policy['name'],
bgp_peers=switch_policy['bgp_peers'],
fabric_asn=infra_bgp_asn,
policy=policy['name'])
Expand All @@ -120,22 +119,20 @@ def check_switch_level(cls, results, switch_policy, policy,
for interface in switch_policy['interfaces']:
if interface.get('ospf'):
ospf = True
cls.check_switch_ospf(results,
interface['ospf'],
cls.check_switch_ospf(interface['ospf'],
switch_policy['name'],
interface['name'],
policy['name'])

# Check if OSPF and BGP is enabled
if ospf is True and bgp is True:
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy['name']}.switches.{switch_policy['name']}. "
"BGP and OSPF are configured in the same policy at the switch level. "
"Please use two different policies")

# Check if switch exists in topology
cls.check_switch_in_topology(results,
switch_policy['name'],
cls.check_switch_in_topology(switch_policy['name'],
topology_switches,
policy['name'])

Expand All @@ -147,35 +144,35 @@ def check_switch_level(cls, results, switch_policy, policy,
"prefix": switch_policy['static_routes']})

@classmethod
def check_switch_in_topology(cls, results, switch, topology_switches, policy):
def check_switch_in_topology(cls, switch, topology_switches, policy):
'''
Check if switch is in the topology
'''
if list(filter(lambda topo: topo['name'] == switch, topology_switches)):
pass
else:
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch} "
"is not defined in vxlan.topology.switches"
)

@classmethod
def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None):
def check_switch_ospf(cls, ospf, switch, interface=None, policy=None):
'''
Check OSPF parameters
'''
# Check if key exists if authentication is enabled
if "auth_type" in ospf and ospf['auth_type'] is not None:
if "auth_key" not in ospf:
results.append(
cls.results.append(
f"In the policy: {policy}, auth_type is {ospf['auth_type']} "
"but auth_key is missing"
)

# Check if Network type for Loopback interface is not Broadcast
if ospf.get('network_type'):
if interface.startswith('Lo') and ospf['network_type'] == "broadcast":
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.interfaces.{interface}.ospf. "
f"network_type: {ospf['network_type']}"
" is not supported with Loopback"
Expand All @@ -184,13 +181,13 @@ def check_switch_ospf(cls, results, ospf, switch, interface=None, policy=None):
# Check if Adversise-subnet is used only for Loopback in ospf
if ospf.get('advertise_subnet'):
if not interface.startswith('Lo') and ospf['advertise_subnet']:
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.interfaces.{interface}.ospf. "
f"advertise_subnet: True is only supported with Loopback"
)

@classmethod
def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn, policy):
def check_switch_bgp_route_reflector(cls, switch, bgp_peers, fabric_asn, policy):
'''
Check if route-reflector is enabled in eBGP
'''
Expand All @@ -200,7 +197,7 @@ def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn
if bgp_peer['address_family_ipv4_unicast'].get('route_reflector_client'):
if bgp_peer['address_family_ipv4_unicast']['route_reflector_client'] is True:
if bgp_peer['remote_as'] != fabric_asn:
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.bgp_peers.{bgp_peer['address']}.address_family_ipv4_unicast"
f"route_reflector_client: {bgp_peer['address_family_ipv4_unicast']['route_reflector_client']} "
"is not allowed in eBGP"
Expand All @@ -211,14 +208,14 @@ def check_switch_bgp_route_reflector(cls, results, switch, bgp_peers, fabric_asn
if bgp_peer['address_family_ipv6_unicast'].get('route_reflector_client'):
if bgp_peer['address_family_ipv6_unicast']['route_reflector_client'] is True:
if bgp_peer['remote_as'] != fabric_asn:
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{policy}.switches.{switch}.bgp_peers.{bgp_peer['address']}.address_family_ipv6_unicast"
f"route_reflector_client: {bgp_peer['address_family_ipv6_unicast']['route_reflector_client']} "
"is not allowed in eBGP"
)

@classmethod
def check_route_compliance_across_policies(cls, results, routes):
def check_route_compliance_across_policies(cls, routes):
'''
Check routes compliance across policies
'''
Expand All @@ -229,7 +226,7 @@ def check_route_compliance_across_policies(cls, results, routes):
bad_route = False
for index in range(len(route['prefix'])):
if ("prefix" not in route['prefix'][index]) or ("next_hops" not in route['prefix'][index]):
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"prefix or next_hops is not defined. {route['prefix'][index]}"
)
Expand All @@ -238,7 +235,7 @@ def check_route_compliance_across_policies(cls, results, routes):
good_route.append(route)
# Else no parameter found
else:
results.append(
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{route['policy']}.switches.{route['switch']}.static_routes. "
f"static_routers is defined without paramater."
)
Expand All @@ -255,28 +252,35 @@ def check_route_compliance_across_policies(cls, results, routes):
for pref_for_comparison in good_route[index2]['prefix']:
if prefix_to_be_compared['prefix'] == pref_for_comparison['prefix']:
# Check if route_tag exist if equal
if ("route_tag" in prefix_to_be_compared and "route_tag" in pref_for_comparison and prefix_to_be_compared["route_tag"] == pref_for_comparison["route_tag"]):
if ("route_tag" in prefix_to_be_compared and "route_tag" in pref_for_comparison and
prefix_to_be_compared["route_tag"] == pref_for_comparison["route_tag"]):
next_hops_to_be_compared = sorted([sub['ip'] for sub in prefix_to_be_compared['next_hops']])
next_hops_for_comparison = sorted([sub['ip'] for sub in pref_for_comparison['next_hops']])
# Check if next_hops are equal for the same prefix.
if next_hops_to_be_compared != next_hops_for_comparison:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}.{next_hops_to_be_compared} "
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches."
f"{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}.{next_hops_to_be_compared} "
f"Static routes are not consistent across policies. next_hop are different in "
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{pref_for_comparison['prefix']}.{next_hops_for_comparison}"
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches."
f"{good_route[index2]['switch']}.static_routes.{pref_for_comparison['prefix']}.{next_hops_for_comparison}"
)
break
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. "
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches."
f"{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. "
f"Static routes are not consistent across policies. route_tag are different in "
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}"
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches."
f"{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}"
)
break
else:
results.append(
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches.{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. "
cls.results.append(
f"vxlan.overlay_extensions.vrf_lites.{good_route[index]['policy']}.switches."
f"{good_route[index]['switch']}.static_routes.{prefix_to_be_compared['prefix']}. "
f"Static routes are not consistent across policies. Prefix not found in "
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches.{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}"
f"vxlan.overlay_extensions.vrf_lites.{good_route[index2]['policy']}.switches."
f"{good_route[index2]['switch']}.static_routes.{prefixes_for_comparison}"
)
break

0 comments on commit 5d4b466

Please sign in to comment.