Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subnet routes refactoring #205

Open
wants to merge 8 commits into
base: stable/yoga-m3
Choose a base branch
from
2 changes: 1 addition & 1 deletion octavia_f5/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def setup_logging(conf):
default=[],
help=_('List of vcmp guest names to use for identifying the '
'correct vcmp guest - defaults to the bigip hostname.')),
cfg.BoolOpt('route_on_active',
cfg.BoolOpt('routes_only_on_active',
default=True,
help=_("Sync routes only to active bigip device, this option"
"is useful if automatic full-sync is activated.")),
Expand Down
1 change: 1 addition & 0 deletions octavia_f5/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
PREFIX_IRULE = 'irule_'
PREFIX_MEMBER = 'member_'
PREFIX_SECRET = 'secret_'
PREFIX_VLAN = 'vlan-'

APPLICATION_TCP = 'tcp'
APPLICATION_UDP = 'udp'
Expand Down
25 changes: 12 additions & 13 deletions octavia_f5/controller/worker/controller_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ def as3worker(self):
@lockutils.synchronized("f5sync", fair=True)
def f5sync(network_id, device, *args):
self._metric_as3worker_queue.labels(octavia_host=CONF.host).set(self.queue.qsize())
loadbalancers = self._get_all_loadbalancer(network_id)
loadbalancers = self._get_all_loadbalancers(network_id)
LOG.debug("AS3Worker after pop (queue_size=%d): Refresh tenant '%s' with loadbalancer %s",
self.queue.qsize(), network_id, [lb.id for lb in loadbalancers])
selfips = list(chain.from_iterable(
self.network_driver.ensure_selfips(loadbalancers, CONF.host, cleanup_orphans=True)))
if all(lb.provisioning_status == lib_consts.PENDING_DELETE for lb in loadbalancers):
if not loadbalancers or all(lb.provisioning_status == lib_consts.PENDING_DELETE for lb in loadbalancers):
self.sync.tenant_delete(network_id, device).raise_for_status()
# Cleanup l2 configuration and remove selfip ports
self.l2sync.remove_l2_flow(network_id, device)
Expand Down Expand Up @@ -155,6 +155,7 @@ def f5sync(network_id, device, *args):

@periodics.periodic(60*60*24, run_immediately=CONF.f5_agent.sync_immediately)
def cleanup_orphaned_tenants(self):
""" Find tenants without load balancers and put them on the sync queue """
LOG.info("Running (24h) tenant cleanup")
session = db_apis.get_session(autocommit=False)

Expand All @@ -180,6 +181,7 @@ def cleanup_orphaned_tenants(self):

@periodics.periodic(60*4, run_immediately=CONF.f5_agent.sync_immediately)
def full_sync_reappearing_devices(self):
""" For all devices that just came online, put all their tenants on the sync queue """
session = db_apis.get_session(autocommit=False)

# Get all pending devices
Expand Down Expand Up @@ -207,20 +209,17 @@ def full_sync_reappearing_devices(self):

@periodics.periodic(60*60*24, run_immediately=CONF.f5_agent.sync_immediately)
def full_sync_l2(self):
""" For all tenants on this host, do a L2 sync"""
session = db_apis.get_session()

# get all load balancers (of this host)
loadbalancers = self._loadbalancer_repo.get_all_from_host(
session, show_deleted=False)
loadbalancers = self._loadbalancer_repo.get_all_from_host(session, show_deleted=False)
self.l2sync.full_sync(loadbalancers)

@periodics.periodic(60*2, run_immediately=CONF.f5_agent.sync_immediately)
def pending_sync(self):
"""
Reconciliation loop that
- synchronizes load balancers that are in a PENDING state
- synchronizes load balancers that are in a PENDING state or have child objects in a PENDING state
- deletes load balancers that are PENDING_DELETE
- executes a full sync on F5 devices that were offline but are now back online
"""

# delete load balancers that are PENDING_DELETE
Expand All @@ -231,7 +230,7 @@ def pending_sync(self):
LOG.info("Found pending deletion of lb %s", lb.id)
self.delete_load_balancer(lb.id)

# Find pending loadbalancer not yet finally assigned to this host
# Find load balancers pending creation that are not yet finally assigned to this host
lbs = []
pending_create_lbs = self._loadbalancer_repo.get_all(
db_apis.get_session(),
Expand All @@ -243,7 +242,7 @@ def pending_sync(self):
self.ensure_host_set(lb)
lbs.append(lb)

# Find pending loadbalancer
# Find load balancers pending an update
lbs.extend(self._loadbalancer_repo.get_all_from_host(
db_apis.get_session(),
provisioning_status=lib_consts.PENDING_UPDATE))
Expand Down Expand Up @@ -275,7 +274,7 @@ def pending_sync(self):
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def _get_all_loadbalancer(self, network_id):
def _get_all_loadbalancers(self, network_id):
LOG.debug("Get load balancers from DB for network id: %s ", network_id)
return self._loadbalancer_repo.get_all_by_network(
db_apis.get_session(), network_id=network_id, show_deleted=False)
Expand Down Expand Up @@ -606,7 +605,7 @@ def add_loadbalancer(self, load_balancer_id):
LOG.debug("add_loadbalancer: force adding loadbalancer '%s' for tenant '%s'",
load_balancer_id, network_id)

loadbalancers = self._get_all_loadbalancer(network_id)
loadbalancers = self._get_all_loadbalancers(network_id)
if load_balancer_id not in [_lb.id for _lb in loadbalancers]:
loadbalancers.append(lb)

Expand Down Expand Up @@ -635,7 +634,7 @@ def remove_loadbalancer(self, load_balancer_id):
LOG.debug("remove_loadbalancer: force removing loadbalancer '%s' for tenant '%s'",
load_balancer_id, network_id)

loadbalancers = self._get_all_loadbalancer(network_id)
loadbalancers = self._get_all_loadbalancers(network_id)
loadbalancers = [_lb for _lb in loadbalancers if _lb.id != load_balancer_id]

selfips = list(chain.from_iterable(
Expand Down
2 changes: 1 addition & 1 deletion octavia_f5/controller/worker/flows/f5_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def ensure_l2(self, selfips: [network_models.Port]) -> flow.Flow:

ensure_routedomain = f5_tasks.EnsureRouteDomain()
ensure_default_route = f5_tasks.EnsureDefaultRoute()
ensure_static_routes = f5_tasks.SyncSubnetRoutes(inject={'selfips': selfips})
ensure_static_routes = f5_tasks.EnsureSubnetRoutes(inject={'selfips': selfips})
ensure_vlan = f5_tasks.EnsureVLAN()

ensure_l2_flow = linear_flow.Flow('ensure-l2-flow')
Expand Down
84 changes: 51 additions & 33 deletions octavia_f5/controller/worker/l2_sync_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def _do_sync_l2_selfips_flow(self, expected_selfips: [network_models.Port], stor
e.run()

def _do_sync_l2_static_routes_flow(self, selfips: [network_models.Port], store: dict):
ensure_static_routes = f5_tasks.SyncSubnetRoutes(inject={'selfips': selfips})
ensure_static_routes = f5_tasks.EnsureSubnetRoutes(inject={'selfips': selfips})
e = self.taskflow_load(ensure_static_routes, store=store)
with tf_logging.LoggingListener(e, log=LOG):
e.run()
Expand Down Expand Up @@ -269,8 +269,8 @@ def sync_l2_selfips_flow(self, selfips: [network_models.Port], network_id: str,
def full_sync(self, loadbalancers: [octavia_models.LoadBalancer]):
""" Initiates a full sync for all L2 entities, this is a very api-heavy function. """
network_ids = set(lb.vip.network_id for lb in loadbalancers)
networks = {net.id: net for net in
self.executor.map(self._network_driver.get_network, network_ids, timeout=60)}
existing_networks = {net.id: net for net in
self.executor.map(self._network_driver.get_network, network_ids, timeout=60)}
selfip_ports = list(chain.from_iterable(
self._network_driver.ensure_selfips(loadbalancers, cleanup_orphans=True)))

Expand All @@ -281,36 +281,51 @@ def full_sync(self, loadbalancers: [octavia_models.LoadBalancer]):
executor = futures.ThreadPoolExecutor(max_workers=1)
fs = []
for bigip in self._bigips:

""" 1. Delete all orphaned routes """
res = bigip.get(path='/mgmt/tm/net/route')
res.raise_for_status()
for route in res.json().get('items', []):
# Check if route name is a legacy route net-{network_id}
if route['name'] in [f"net-{id}" for id in network_ids]:
net_id = route['name'][len('net-'):]
# Consider route with unexpected vlan to be obsoloted
if net_id in networks and route['network'].endswith(str(networks[net_id].vlan_id)):

def ignore_route(route):
route_name = route['name']

# Ignore unmanaged routes
if not (route_name.startswith(constants.PREFIX_NETWORK_LEGACY) # legacy routes
or route_name.startswith(constants.PREFIX_VLAN) # VLAN routes
or route_name.startswith(constants.PREFIX_NETWORK) # subnet routes
):
return True

# Check if route is relevant to a network
for network_id in network_ids:
existing_network = existing_networks.get(network_id)

# Delete (don't ignore) routes of networks that don't exist
if not existing_network:
continue

if route['name'] in [f"vlan-{network.vlan_id}" for network in networks.values()]:
continue
# Ignore VLAN routes for existing networks
if route_name == f"{constants.PREFIX_VLAN}{existing_network.vlan_id}":
return True

# Skip unmanaged routes
if not (route['name'].startswith(constants.PREFIX_NETWORK_LEGACY)
or route['name'].startswith('vlan-')
or route['name'].startswith(constants.PREFIX_NETWORK)):
continue
# Ignore legacy routes for existing networks
if route_name == f"{constants.PREFIX_NETWORK_LEGACY}{network_id}" and \
route['network'].endswith(str(existing_network.vlan_id)):
return True

# Skip routes for existing subnets
for network_id in networks:
for subnet_id in networks[network_id].subnets:
# Ignore subnet routes for existing subnets
for subnet_id in existing_network.subnets:
subnet_route_name = f5_tasks.get_subnet_route_name(network_id, subnet_id)
if route['name'] == subnet_route_name:
continue
if route_name == subnet_route_name:
return True

# Cleanup
path = f"/mgmt/tm/net/route/{route['fullPath'].replace('/', '~')}"
fs.append(executor.submit(bigip.delete, path=path))
# Delete the route
return False

for route in res.json().get('items', []):
if not ignore_route(route):
path = f"/mgmt/tm/net/route/{route['fullPath'].replace('/', '~')}"
fs.append(executor.submit(bigip.delete, path=path))

""" 2. Delete all orphaned selfips """
res = bigip.get(path='/mgmt/tm/net/self')
Expand All @@ -331,17 +346,19 @@ def full_sync(self, loadbalancers: [octavia_models.LoadBalancer]):
res = bigip.get(path='/mgmt/tm/net/route-domain')
res.raise_for_status()
for route_domain in res.json().get('items', []):
if route_domain['name'] in [f"net-{id}" for id in network_ids]:
net_id = route_domain['name'][len('net-'):]
# Consider routedomain with unexpected vlan to be obsoloted
if net_id in networks and route_domain['id'] == networks[net_id].vlan_id:
if route_domain['name'] in [f"{constants.PREFIX_NETWORK_LEGACY}{id}" for id in network_ids]:
net_id = route_domain['name'][len(constants.PREFIX_NETWORK_LEGACY):]
# Consider route domain with unexpected vlan to be obsolete
if net_id in existing_networks and route_domain['id'] == existing_networks[net_id].vlan_id:
continue

if route_domain['name'] in [f"vlan-{network.vlan_id}" for network in networks.values()]:
if route_domain['name'] in [f"{constants.PREFIX_VLAN}{network.vlan_id}"
for network in existing_networks.values()]:
continue

# Skip unmanaged route domains
if not (route_domain['name'].startswith('net-') or route_domain['name'].startswith('vlan-')):
if not (route_domain['name'].startswith(constants.PREFIX_NETWORK_LEGACY) or
route_domain['name'].startswith(constants.PREFIX_VLAN)):
continue

# Cleanup Route-Domain
Expand All @@ -352,7 +369,8 @@ def full_sync(self, loadbalancers: [octavia_models.LoadBalancer]):
res = bigip.get(path='/mgmt/tm/net/vlan')
res.raise_for_status()
for vlan in res.json().get('items', []):
if vlan['name'] in [f"vlan-{network.vlan_id}" for network in networks.values()]:
if vlan['name'] in [f"{constants.PREFIX_VLAN}{network.vlan_id}"
for network in existing_networks.values()]:
continue

# Skip unmanaged vlans
Expand All @@ -364,12 +382,12 @@ def full_sync(self, loadbalancers: [octavia_models.LoadBalancer]):
fs.append(executor.submit(bigip.delete, path=path))

""" 5. Full sync """
for network in networks.values():
for network in existing_networks.values():
selfips = [sip for sip in selfip_ports if sip.network_id == network.id]
fs.append(executor.submit(self.ensure_l2_flow, selfips=selfips, network_id=network.id))

""" 6. Execute cleanup and full-sync and collect any errors """
done, not_done = futures.wait(fs, timeout=CONF.networking.l2_timeout * len(networks))
done, not_done = futures.wait(fs, timeout=CONF.networking.l2_timeout * len(existing_networks))
for f in done | not_done:
try:
res = f.result(0)
Expand Down
19 changes: 12 additions & 7 deletions octavia_f5/controller/worker/tasks/f5_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ def execute(self, network: f5_network_models.Network,
rd = {'name': f"vlan-{network.vlan_id}", 'vlans': vlans, 'id': network.vlan_id}

device_response = bigip.get(path=f"/mgmt/tm/net/route-domain/{rd['name']}")

# check for legacy name
if device_response.status_code == 404:
path = f"/mgmt/tm/net/route-domain/net-{network.id}"
device_response = bigip.get(path=path)
Expand Down Expand Up @@ -241,8 +243,8 @@ def execute(self, bigip: bigip_restclient.BigIPRestClient,
subnet_id: str,
network: f5_network_models.Network):

if CONF.networking.route_on_active and not bigip.is_active:
# Skip passive device if route_on_active is enabled
# Skip passive device if routes_only_on_active is enabled
if CONF.networking.routes_only_on_active and not bigip.is_active:
return None

name = f"vlan-{network.vlan_id}"
Expand All @@ -251,12 +253,14 @@ def execute(self, bigip: bigip_restclient.BigIPRestClient,
route = {'name': name, 'gw': gw, 'network': network_name}

device_response = bigip.get(path=f"/mgmt/tm/net/route/~Common~{route['name']}")

# check for legacy name
if device_response.status_code == 404:
path=f"/mgmt/tm/net/route/~Common~net-{network.id}"
device_response = bigip.get(path=path)

# Create route if not existing
if device_response.status_code == 404:
# Create route_domain if not existing
res = bigip.post(path='/mgmt/tm/net/route', json=route)
res.raise_for_status()
return res.json()
Expand All @@ -278,19 +282,20 @@ def execute(self, bigip: bigip_restclient.BigIPRestClient,
return device_route


class SyncSubnetRoutes(task.Task):
class EnsureSubnetRoutes(task.Task):
""" Task to create static subnet routes """

@decorators.RaisesIControlRestError()
def execute(self, bigip: bigip_restclient.BigIPRestClient,
selfips: [network_models.Port],
network: f5_network_models.Network):

# Skip passive device if route_on_active is enabled
if CONF.networking.route_on_active and not bigip.is_active:
# Skip passive device if routes_only_on_active is enabled
if CONF.networking.routes_only_on_active and not bigip.is_active:
return None

def subnet_in_selfips(subnet, selfips):
"""Check whether a SelfIP exists for the subnet."""
for selfip in selfips:
for fixed_ip in selfip.fixed_ips:
if fixed_ip.subnet_id == subnet:
Expand Down Expand Up @@ -372,7 +377,7 @@ def execute(self, network: f5_network_models.Network,
class CleanupSubnetRoutes(task.Task):
""" Task to clean up static subnet routes.

We cannot clean up static subnet routes in SyncSubnetRoutes, because that function does not know whether the
We cannot clean up static subnet routes in EnsureSubnetRoutes, because that function does not know whether the
network/tenant is to be removed completely, and thus blindly syncs routes for all subnets of the network,
even though they have to be deleted.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def test_SyncSubnetRoutes(self, mock_get_subnet):
]
)

engines.run(f5_tasks.SyncSubnetRoutes(),
engines.run(f5_tasks.EnsureSubnetRoutes(),
store={'network': mock_network,
'bigip': mock_bigip,
'selfips': [mock_selfip]})
Expand All @@ -213,7 +213,7 @@ def test_SyncSubnetRoutes(self, mock_get_subnet):
mock_bigip = mock.Mock(spec=as3restclient.AS3RestClient)
mock_route_response = test_f5_flows.MockResponse({'items': []}, status_code=200)
mock_bigip.get.return_value = mock_route_response
engines.run(f5_tasks.SyncSubnetRoutes(),
engines.run(f5_tasks.EnsureSubnetRoutes(),
store={'network': mock_network,
'bigip': mock_bigip,
'selfips': [mock_selfip]})
Expand Down