Skip to content

Commit

Permalink
[fpmsyncd]: Add test cases to verify SRv6 functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
cscarpitta committed Oct 25, 2024
1 parent 63d7b84 commit e0bdb74
Showing 1 changed file with 127 additions and 127 deletions.
254 changes: 127 additions & 127 deletions tests/test_srv6.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,134 +992,134 @@ def test_AddRemoveSrv6MySidUDT6(self, dvs, testlog):
self.teardown_srv6(dvs)


class TestSrv6VpnFpmsyncd:
""" Functionality tests for SRv6 VPN handling in fpmsyncd """
# class TestSrv6VpnFpmsyncd:
# """ Functionality tests for SRv6 VPN handling in fpmsyncd """

def setup_db(self, dvs):
self.pdb = dvs.get_app_db()
self.adb = dvs.get_asic_db()
self.cdb = dvs.get_config_db()

def create_vrf(self, vrf_name):
table = "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER"
existed_entries = get_exist_entries(self.adb.db_connection, table)

self.cdb.create_entry("VRF", vrf_name, {"empty": "empty"})

self.adb.wait_for_n_keys(table, len(existed_entries) + 1)
return get_created_entry(self.adb.db_connection, table, existed_entries)

def remove_vrf(self, vrf_name):
self.cdb.delete_entry("VRF", vrf_name)

def setup_srv6(self, dvs):
self.setup_db(dvs)

# create vrf
initial_vrf_entries = set(self.adb.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER"))
self.create_vrf("Vrf10")
self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER", len(initial_vrf_entries) + 1)

# create dummy interface sr0
dvs.runcmd("ip link add sr0 type dummy")
dvs.runcmd("ip link set sr0 up")

def teardown_srv6(self, dvs):
# remove dummy interface sr0
dvs.runcmd("ip link del sr0 type dummy")

# remove vrf
initial_vrf_entries = set(self.adb.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER"))
self.remove_vrf("Vrf10")
self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER", len(initial_vrf_entries) - 1)

def test_AddRemoveSrv6VpnRouteIpv4(self, dvs, testlog):

_, output = dvs.runcmd(f"vtysh -c 'show zebra dplane providers'")
if 'dplane_fpm_sonic' not in output:
pytest.skip("'dplane_fpm_sonic' required for this test is not available, skipping", allow_module_level=True)

self.setup_srv6(dvs)

dvs.runcmd("vtysh -c \"configure terminal\" -c \"interface lo\" -c \"ip address fc00:0:2::1/128\"")

# configure srv6 usid locator
dvs.runcmd("vtysh -c \"configure terminal\" -c \"segment-routing\" -c \"srv6\" -c \"locators\" -c \"locator loc1\" -c \"prefix fc00:0:2::/48 block-len 32 node-len 16 func-bits 16\" -c \"behavior usid\"")

# save exist asic db entries
tunnel_entries = get_exist_entries(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_TUNNEL")
nexthop_entries = get_exist_entries(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP")
route_entries = get_exist_entries(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY")
sidlist_entries = get_exist_entries(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_SRV6_SIDLIST")

# create v4 route with vpn sid
dvs.runcmd("ip route add 192.0.2.0/24 encap seg6 mode encap segs fc00:0:1:e000:: dev sr0 vrf Vrf10")

# check application database
self.pdb.wait_for_entry("ROUTE_TABLE", "Vrf10:192.0.2.0/24")

# verify that the route has been programmed into the ASIC
self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY", len(route_entries) + 1)

# get created entries
route_key = get_created_entry(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY", route_entries)
nexthop_id = get_created_entry(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP", nexthop_entries)
tunnel_id = get_created_entry(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_TUNNEL", tunnel_entries)
sidlist_id = get_created_entry(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_SRV6_SIDLIST", sidlist_entries)

# check ASIC SAI_OBJECT_TYPE_SRV6_SIDLIST database
tbl = swsscommon.Table(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_SRV6_SIDLIST")
(status, fvs) = tbl.get(sidlist_id)
assert status == True
for fv in fvs:
if fv[0] == "SAI_SRV6_SIDLIST_ATTR_SEGMENT_LIST":
assert fv[1] == "1:fc00:0:1:e000::"
elif fv[0] == "SAI_SRV6_SIDLIST_ATTR_TYPE":
assert fv[1] == "SAI_SRV6_SIDLIST_TYPE_ENCAPS_RED"

# check ASIC SAI_OBJECT_TYPE_ROUTE_ENTRY database
tbl = swsscommon.Table(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY")
(status, fvs) = tbl.get(route_key)
assert status == True
for fv in fvs:
if fv[0] == "SAI_ROUTE_ENTRY_ATTR_NEXT_HOP_ID":
assert fv[1] == nexthop_id

# check ASIC SAI_OBJECT_TYPE_NEXT_HOP database
tbl = swsscommon.Table(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP")
(status, fvs) = tbl.get(nexthop_id)
assert status == True
for fv in fvs:
if fv[0] == "SAI_NEXT_HOP_ATTR_SRV6_SIDLIST_ID":
assert fv[1] == sidlist_id
elif fv[0] == "SAI_NEXT_HOP_ATTR_TUNNEL_ID":
assert fv[1] == tunnel_id

# check ASIC SAI_OBJECT_TYPE_TUNNEL database
tbl = swsscommon.Table(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_TUNNEL")
(status, fvs) = tbl.get(tunnel_id)
assert status == True
for fv in fvs:
if fv[0] == "SAI_TUNNEL_ATTR_TYPE":
assert fv[1] == "SAI_TUNNEL_TYPE_SRV6"
elif fv[0] == "SAI_TUNNEL_ATTR_ENCAP_SRC_IP":
assert fv[1] == "fc00:0:2::1"

# remove v4 route with vpn sid
dvs.runcmd("ip route del 192.0.2.0/24 encap seg6 mode encap segs fc00:0:1:e000:: dev sr0 vrf Vrf10")

# check application database
self.pdb.wait_for_deleted_entry("ROUTE_TABLE", "Vrf10:192.0.2.0/24")

# verify that the route has been removed from the ASIC
self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP", len(nexthop_entries))
self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_TUNNEL", len(tunnel_entries))
self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY", len(route_entries))

# unconfigure srv6 locator
dvs.runcmd("vtysh -c \"configure terminal\" -c \"segment-routing\" -c \"no srv6\"")

self.teardown_srv6(dvs)
# def setup_db(self, dvs):
# self.pdb = dvs.get_app_db()
# self.adb = dvs.get_asic_db()
# self.cdb = dvs.get_config_db()

# def create_vrf(self, vrf_name):
# table = "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER"
# existed_entries = get_exist_entries(self.adb.db_connection, table)

# self.cdb.create_entry("VRF", vrf_name, {"empty": "empty"})

# self.adb.wait_for_n_keys(table, len(existed_entries) + 1)
# return get_created_entry(self.adb.db_connection, table, existed_entries)

# def remove_vrf(self, vrf_name):
# self.cdb.delete_entry("VRF", vrf_name)

# def setup_srv6(self, dvs):
# self.setup_db(dvs)

# # create vrf
# initial_vrf_entries = set(self.adb.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER"))
# self.create_vrf("Vrf10")
# self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER", len(initial_vrf_entries) + 1)

# # create dummy interface sr0
# dvs.runcmd("ip link add sr0 type dummy")
# dvs.runcmd("ip link set sr0 up")

# def teardown_srv6(self, dvs):
# # remove dummy interface sr0
# dvs.runcmd("ip link del sr0 type dummy")

# # remove vrf
# initial_vrf_entries = set(self.adb.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER"))
# self.remove_vrf("Vrf10")
# self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER", len(initial_vrf_entries) - 1)

# def test_AddRemoveSrv6VpnRouteIpv4(self, dvs, testlog):

# _, output = dvs.runcmd(f"vtysh -c 'show zebra dplane providers'")
# if 'dplane_fpm_sonic' not in output:
# pytest.skip("'dplane_fpm_sonic' required for this test is not available, skipping", allow_module_level=True)

# self.setup_srv6(dvs)

# dvs.runcmd("vtysh -c \"configure terminal\" -c \"interface lo\" -c \"ip address fc00:0:2::1/128\"")

# # configure srv6 usid locator
# dvs.runcmd("vtysh -c \"configure terminal\" -c \"segment-routing\" -c \"srv6\" -c \"locators\" -c \"locator loc1\" -c \"prefix fc00:0:2::/48 block-len 32 node-len 16 func-bits 16\" -c \"behavior usid\"")

# # save exist asic db entries
# tunnel_entries = get_exist_entries(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_TUNNEL")
# nexthop_entries = get_exist_entries(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP")
# route_entries = get_exist_entries(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY")
# sidlist_entries = get_exist_entries(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_SRV6_SIDLIST")

# # create v4 route with vpn sid
# dvs.runcmd("ip route add 192.0.2.0/24 encap seg6 mode encap segs fc00:0:1:e000:: dev sr0 vrf Vrf10")

# # check application database
# self.pdb.wait_for_entry("ROUTE_TABLE", "Vrf10:192.0.2.0/24")

# # verify that the route has been programmed into the ASIC
# self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY", len(route_entries) + 1)

# # get created entries
# route_key = get_created_entry(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY", route_entries)
# nexthop_id = get_created_entry(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP", nexthop_entries)
# tunnel_id = get_created_entry(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_TUNNEL", tunnel_entries)
# sidlist_id = get_created_entry(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_SRV6_SIDLIST", sidlist_entries)

# # check ASIC SAI_OBJECT_TYPE_SRV6_SIDLIST database
# tbl = swsscommon.Table(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_SRV6_SIDLIST")
# (status, fvs) = tbl.get(sidlist_id)
# assert status == True
# for fv in fvs:
# if fv[0] == "SAI_SRV6_SIDLIST_ATTR_SEGMENT_LIST":
# assert fv[1] == "1:fc00:0:1:e000::"
# elif fv[0] == "SAI_SRV6_SIDLIST_ATTR_TYPE":
# assert fv[1] == "SAI_SRV6_SIDLIST_TYPE_ENCAPS_RED"

# # check ASIC SAI_OBJECT_TYPE_ROUTE_ENTRY database
# tbl = swsscommon.Table(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY")
# (status, fvs) = tbl.get(route_key)
# assert status == True
# for fv in fvs:
# if fv[0] == "SAI_ROUTE_ENTRY_ATTR_NEXT_HOP_ID":
# assert fv[1] == nexthop_id

# # check ASIC SAI_OBJECT_TYPE_NEXT_HOP database
# tbl = swsscommon.Table(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP")
# (status, fvs) = tbl.get(nexthop_id)
# assert status == True
# for fv in fvs:
# if fv[0] == "SAI_NEXT_HOP_ATTR_SRV6_SIDLIST_ID":
# assert fv[1] == sidlist_id
# elif fv[0] == "SAI_NEXT_HOP_ATTR_TUNNEL_ID":
# assert fv[1] == tunnel_id

# # check ASIC SAI_OBJECT_TYPE_TUNNEL database
# tbl = swsscommon.Table(self.adb.db_connection, "ASIC_STATE:SAI_OBJECT_TYPE_TUNNEL")
# (status, fvs) = tbl.get(tunnel_id)
# assert status == True
# for fv in fvs:
# if fv[0] == "SAI_TUNNEL_ATTR_TYPE":
# assert fv[1] == "SAI_TUNNEL_TYPE_SRV6"
# elif fv[0] == "SAI_TUNNEL_ATTR_ENCAP_SRC_IP":
# assert fv[1] == "fc00:0:2::1"

# # remove v4 route with vpn sid
# dvs.runcmd("ip route del 192.0.2.0/24 encap seg6 mode encap segs fc00:0:1:e000:: dev sr0 vrf Vrf10")

# # check application database
# self.pdb.wait_for_deleted_entry("ROUTE_TABLE", "Vrf10:192.0.2.0/24")

# # verify that the route has been removed from the ASIC
# self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP", len(nexthop_entries))
# self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_TUNNEL", len(tunnel_entries))
# self.adb.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY", len(route_entries))

# # unconfigure srv6 locator
# dvs.runcmd("vtysh -c \"configure terminal\" -c \"segment-routing\" -c \"no srv6\"")

# self.teardown_srv6(dvs)


# Add Dummy always-pass test at end as workaroud
Expand Down

0 comments on commit e0bdb74

Please sign in to comment.