diff --git a/capirca/lib/policy.py b/capirca/lib/policy.py index f38a6357..75d18d06 100644 --- a/capirca/lib/policy.py +++ b/capirca/lib/policy.py @@ -357,6 +357,7 @@ class Term: source-service-accounts: VarType.SOURCE_SERVICE_ACCOUNTS target-service-accounts: VarType.TARGET_SERVICE_ACCOUNTS source-zone: VarType.SZONE + versa-application: VarType.VERSA_APPLICATION vpn: VarType.VPN """ # fmt: skip ICMP_TYPE = { @@ -457,6 +458,7 @@ def __init__(self, obj): self.protocol_except = [] self.qos = None self.pan_application = [] + self.versa_application = [] self.routing_instance = None self.source_address = [] self.source_address_exclude = [] @@ -827,6 +829,9 @@ def __str__(self): ret_str.append(' qos: %s' % self.qos) if self.pan_application: ret_str.append(' pan_application: %s' % self.pan_application) + if self.versa_application: + # pylint: disable=consider-using-f-string + ret_str.append(' versa_application: %s' % self.versa_application) if self.logging: ret_str.append(' logging: %s' % self.logging) if self.log_limit: @@ -929,6 +934,10 @@ def __eq__(self, other): if sorted(self.pan_application) != sorted(other.pan_application): return False + # versa-application + if sorted(self.versa_application) != sorted(other.versa_application): + return False + # verbatim if self.verbatim != other.verbatim: return False @@ -962,6 +971,8 @@ def __eq__(self, other): return False if sorted(self.pan_application) != sorted(other.pan_application): return False + if sorted(self.versa_application) != sorted(other.versa_application): + return False if self.packet_length != other.packet_length: return False if self.fragment_offset != other.fragment_offset: @@ -1223,6 +1234,8 @@ def AddObject(self, obj): self.forwarding_class_except.append(x.value) elif x.var_type is VarType.PAN_APPLICATION: self.pan_application.append(x.value) + elif x.var_type is VarType.VERSA_APPLICATION: + self.versa_application.append(x.value) elif x.var_type is VarType.NEXT_IP: self.next_ip = DEFINITIONS.GetNetAddr(x.value) elif x.var_type is VarType.PLATFORM: @@ -1276,6 +1289,8 @@ def AddObject(self, obj): self.forwarding_class_except.append(obj.value) elif obj.var_type is VarType.PAN_APPLICATION: self.pan_application.append(obj.value) + elif obj.var_type is VarType.VERSA_APPLICATION: + self.versa_application.append(obj.value) elif obj.var_type is VarType.NEXT_IP: self.next_ip = DEFINITIONS.GetNetAddr(obj.value) elif obj.var_type is VarType.VERBATIM: @@ -1694,6 +1709,7 @@ class VarType: DZONE = 66 DECAPSULATE = 67 SOURCE_SERVICE_ACCOUNTS = 68 + VERSA_APPLICATION = 69 def __init__(self, var_type, value): self.var_type = var_type @@ -1931,6 +1947,7 @@ def __ne__(self, other): 'TRAFFIC_TYPE', 'TTL', 'VERBATIM', + 'VERSA_APPLICATION', 'VPN', ) @@ -2013,6 +2030,7 @@ def __ne__(self, other): 'traffic-type': 'TRAFFIC_TYPE', 'ttl': 'TTL', 'verbatim': 'VERBATIM', + 'versa-application': 'VERSA_APPLICATION', 'vpn': 'VPN', } @@ -2194,6 +2212,7 @@ def p_term_spec(p): | term_spec ttl_spec | term_spec traffic_type_spec | term_spec verbatim_spec + | term_spec versa_application_spec | term_spec vpn_spec | """ @@ -2607,6 +2626,13 @@ def p_vpn_spec(p): p[0] = VarType(VarType.VPN, [p[4], '']) +def p_versa_application_spec(p): + """versa_application_spec : VERSA_APPLICATION ':' ':' one_or_more_strings""" + p[0] = [] + for apps in p[4]: + p[0].append(VarType(VarType.VERSA_APPLICATION, apps)) + + def p_qos_spec(p): """qos_spec : QOS ':' ':' STRING""" p[0] = VarType(VarType.QOS, p[4]) diff --git a/capirca/lib/policy_simple.py b/capirca/lib/policy_simple.py index 40272a06..3e848130 100644 --- a/capirca/lib/policy_simple.py +++ b/capirca/lib/policy_simple.py @@ -325,6 +325,8 @@ class TrafficClassCount(Field): class Verbatim(Field): """A verbatim field.""" +class VersaApplication(Field): + """A VersaApplication field.""" class Vpn(Field): """A vpn field.""" @@ -385,6 +387,7 @@ class Vpn(Field): 'traffic-class-count': TrafficClassCount, 'traffic-type': TrafficType, 'verbatim': Verbatim, + 'versa-application': VersaApplication, 'vpn': Vpn, 'encapsulate': Encapsulate, 'decapsulate': Decapsulate, diff --git a/capirca/lib/versa.py b/capirca/lib/versa.py new file mode 100644 index 00000000..0dc18a05 --- /dev/null +++ b/capirca/lib/versa.py @@ -0,0 +1,901 @@ +# Copyright 2023 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Versa generator.""" +# pylint: disable=super-init-not-called + + +import collections +import copy +import datetime +import itertools + +from absl import logging +from capirca.lib import aclgenerator +from capirca.lib import nacaddr +#import six + + + +class Error(Exception): + """generic error class.""" + + +class UnsupportedFilterError(Error): + """generic error class.""" + pass + + +class UnsupportedHeaderError(Error): + """generic error class.""" + pass + + +class VersaDuplicateTermError(Error): + """generic error class.""" + pass + +class VersaUnsupportedTerm(Error): + """generic error class.""" + pass + +class VersaVerbatimError(Error): + """generic error class.""" + pass + + +class VersaOptionError(Error): + """generic error class.""" + pass + + +class MixedAddrBookTypesError(Error): + """generic error class.""" + pass + + +class ConflictingTargetOptionsError(Error): + """generic error class.""" + pass + + +class ConflictingApplicationSetsError(Error): + """generic error class.""" + pass + +class Tree: + """Creates a Tree Object.""" + target=[] + INDENT = ' ' + + def __init__(self, name='root',typ=None): + """The init function""" + self.children = [] + self.name = name + self.typ = typ + + + def AddParent(self, parent=None): + """Add a Node to Parent""" + if isinstance(parent, Tree): + parent.AddNode(self) + + def __repr__(self): + """repr for a Node """ + return self.name + + def AddNode(self, node): + """add a Node """ + assert isinstance(node, Tree) + self.children.append(node) + + def FindNode(self, nodename): + """find a Node """ + if self.name == nodename: + return self + if self.children: + for child in self.children: + ret = child.FindNode(nodename) + if isinstance(ret, Tree): + return ret + return None + + # Print the tree + def PrintTree(self,num=0): + """Prints the tree. It returns the target """ + self.ResetTarget() + self.PrintTreeInt(num) + return self.target + + def PrintTreeInt(self,num=0): + """Internal function to print the tree. Does recursion""" + if self.name: + self.target.append(f'{self.INDENT*num}{self.name}' + ' {') + if self.typ is not None: + if isinstance(self.typ, str): + if self.name: + self.target.append(f'{self.INDENT*(num+1)}{self.typ}') + else: + self.target.append(f'{self.INDENT*num}{self.typ}') + elif isinstance(self.typ, list): + for item in self.typ: + self.target.append(f'{self.INDENT*(num+1)}{item}') + if self.children: + for child in self.children: + child.PrintTreeInt(num+1) + if self.name: + self.target.append(f'{self.INDENT*num}' + '}') + + def ResetTarget(self): + """Reset the target """ + self.target.clear() + +class Term(aclgenerator.Term): + """Representation of an individual Versa term. + + This is mostly useful for the __str__() method. + + Args: + obj: a policy.Term object + filter_options: list of remaining target options (zones) + """ + + ACTIONS = {'accept': 'permit', + 'deny': 'deny', + 'reject': 'reject', + 'count': 'count', + 'log': 'log', + 'dscp': 'dscp'} + + def __init__(self, term, from_zone, to_zone, addrbook, verbose=True): + super().__init__(term) + self.term = term + self.from_zone = [] + self.to_zone = [] + self.verbose = verbose + self.addrbook = addrbook + self.app = [] + + if term.source_zone: + self.from_zone = term.source_zone + elif from_zone != 'all': + self.from_zone.append(from_zone) + + if term.destination_zone: + self.to_zone = term.destination_zone + elif to_zone != 'all': + self.to_zone.append(to_zone) + + def AddApplication(self,new_app): + """Add new app""" + self.app.append(new_app) + + + def BuildTermZone(self, p_node, zonetype): + + if zonetype == 'src': + mtype = self.from_zone + match_str = 'source' + maddr = self.term.source_address + maddr_ex = self.term.source_address_exclude + addr_str = 'src-' + else: + mtype = self.to_zone + match_str = 'destination' + maddr = self.term.destination_address + maddr_ex = self.term.destination_address_exclude + addr_str = 'dest-' + + + zone_str = '[' + for zone in mtype: + zone_str = zone_str + ' ' + zone + zone_str = zone_str + ' ];' + match_node= Tree(match_str) + match_node.AddParent(p_node) + matchnode_zone = Tree('zone', 'zone-list ' + zone_str) + matchnode_zone.AddParent(match_node) + + if maddr: + if addr_str + self.term.name in self.addrbook: + addr_list = ['address-group-list [ ' + addr_str + self.term.name+ ' ];'] + if maddr_ex: + not_found = 0 + maddr_st = map(str, maddr) + for ip in maddr_ex: + if str(ip) not in maddr_st: + not_found += 1 + if not_found > 0: + # pylint: disable=logging-not-lazy + logging.warning( f'WARNING: Term {self.term.name} in policy '+ + 'has source or destination addresses that does not match ' + + 'address list') + addr_list.append('negate;') + addr_t = Tree('address',addr_list) + addr_t.AddParent(matchnode_zone ) + else: + pass + + + def BuildTermApp(self, p_node): + """Build Term app""" + mstr = [] + if self.app: + apps = list(filter(lambda x: x['name'] == self.term.name, self.app)) + if len(apps) > 0: + slist = '' + for i in range(0,len(apps)): + for j in range(0,len(apps[i]['protocol'])): + slist = slist + ' ' + self.term.name + '-app' + str(j+1) + if len(slist) > 0: + slist = 'services-list [' + slist + ' ];' + mstr.append(slist) + + if self.term.versa_application: + predef_str = 'predefined-services-list [' + for predef in self.term.versa_application: + predef_str = predef_str + ' ' + predef + predef_str = predef_str + ' ];' + mstr.append(predef_str) + if len(mstr) > 0 : + services= Tree('services', mstr) + services.AddParent(p_node) + + def BuildTermDscp(self, p_node): + """Build Term dscp""" + valid_dscp = [] + + for dscp in self.term.dscp_match: + if int(dscp) >= 0 and int(dscp) <= 63: + valid_dscp.append(dscp) + else: + pass + if valid_dscp: + valid_dscp_str = 'dscp [ ' + ' '.join(valid_dscp) + ' ];' + dscp = Tree('', valid_dscp_str) + dscp.AddParent(p_node) + + def BuildTermLogging(self, p_node): + """Build the Term Logging """ + set_term = Tree('set') + set_term.AddParent(p_node) + action_val = self.term.action[0] + if action_val == 'accept': + action_val = 'allow' + action=Tree('', 'action ' + action_val + ';') + action.AddParent(set_term) + log_event = '' + if not self.term.logging: + log_event = 'never' + elif str(self.term.logging[0]).lower() == 'true': + log_event = 'start' + elif str(self.term.logging[0]) == 'log-both': + log_event = 'both' + elif str(self.term.logging[0]) == 'disable': + log_event = 'never' + else: + log_event = 'never' + lef = Tree('lef', 'event '+ log_event + ';') + lef.AddParent(set_term) + + def BuildTerm(self, p_node): + """Build the Term Tree""" + + max_comment_length = 60 + access_pn=Tree('access-policy ' + self.term.name ) + access_pn.AddParent(p_node) + if self.verbose and self.term.comment: + if len(self.term.comment[0]) < max_comment_length: + comm=Tree('', '/* ' + self.term.comment[0] + ' */') + else: + comments = aclgenerator.WrapWords(self.term.comment, 60) + comments.append( '*/') + comments.insert(0, '/*') + comm=Tree('', comments) + comm.AddParent(access_pn) + + rule_match =Tree('match') + rule_match.AddParent(access_pn) + + if self.from_zone: + self.BuildTermZone(rule_match, 'src') + + if self.to_zone: + self.BuildTermZone(rule_match, 'dest') + + if self.term.versa_application or self.app: + self.BuildTermApp(rule_match) + + if self.term.dscp_match: + self.BuildTermDscp(rule_match) + + if self.term.action: + self.BuildTermLogging(access_pn) + + #print("\n".join(set_term.PrintTree())) + + + +class Versa(aclgenerator.ACLGenerator): + """Versa rendering class. + + This class takes a policy object and renders the output into a syntax + which is understood by Versa firewalls. + + Args: + pol: policy.Policy object + """ + + _PLATFORM = 'versa' + SUFFIX = '.vsp' + _SUPPORTED_AF = set(('inet', 'inet6', 'mixed')) + _ZONE_ADDR_BOOK = 'address-book-zone' + _GLOBAL_ADDR_BOOK = 'address-book-global' + _ADDRESSBOOK_TYPES = set((_ZONE_ADDR_BOOK, _GLOBAL_ADDR_BOOK)) + _NOVERBOSE = 'noverbose' + _SUPPORTED_TARGET_OPTIONS = set((_ZONE_ADDR_BOOK, + _GLOBAL_ADDR_BOOK, + _NOVERBOSE)) + _VERSA_SUPPORTED_TARGET_OPTIONS = set(('template', + 'tenant', + 'policy')) + + _AF_MAP = {'inet': (4,), + 'inet6': (6,), + 'mixed': (4, 6)} + _AF_ICMP_MAP = {'icmp': 'inet', + 'icmpv6': 'inet6'} + INDENT = ' ' + _MAX_HEADER_COMMENT_LENGTH = 71 + # The Versa platform is limited in how many IP addresses can be used in + # a single policy. + _ADDRESS_LENGTH_LIMIT = 1023 + # IPv6 are 32 bytes compared to IPv4, this is used as a multiplier. + _IPV6_SIZE = 4 + + def __init__(self, pol, exp_info): + self.versa_policies = [] + self.comment = '' + self.addressbook = collections.OrderedDict() + self.applications = [] + self.ports = [] + self.from_zone = '' + self.to_zone = '' + self.addr_book_type = set() + self.templatename = '_templatename' + self.tenantname = '_tenantname' + self.policyname = '_policyname' + super().__init__(pol, exp_info) + + def _BuildTokens(self): + """Build supported tokens for platform. + + Returns: + tuple containing both supported tokens and sub tokens + """ + supported_tokens, supported_sub_tokens = super()._BuildTokens() + + supported_tokens |= { + 'dscp_match', + 'destination_zone', + 'logging', + 'option', + 'source_zone', + 'versa_application' + } + + supported_sub_tokens.update( + {'action': {'accept', 'deny', 'reject', 'count', 'log', 'dscp'}, + }) + + del supported_sub_tokens['option'] + return supported_tokens, supported_sub_tokens + + + def HeaderParams(self, mstr, val): + """HeaderParams populates the template name and tenant name + and policy name The basic config without the rules looks like this. + + devices { template template_name { + config { orgs { org-services tenantname { security { access-policies { + access-policy-group Default-Policy { rules + ... + } + } } } } } + } } + """ + + if len(val) > 0: + if 'template' in mstr: + self.templatename = val + elif 'tenant' in mstr: + self.tenantname = val + elif 'policy' in mstr: + self.policyname = val + + def _TranslatePolicy(self, pol, exp_info): + """ + # pylint: disable=attribute-defined-outside-init + Transform a policy object into a Versa object. + + Args: + pol: policy.Policy object + exp_info: print a info message when a term is set to expire + in that many weeks + + Raises: + UnsupportedFilterError: An unsupported filter was specified + UnsupportedHeaderError: A header option exists that is not + understood/usable + VersaDuplicateTermError: Two terms were found with same name + in same filter + ConflictingTargetOptionsError: Two target options are + conflicting in the header + MixedAddrBookTypesError: Global and Zone address books in the + same policy + ConflictingApplicationSetsError: When two duplicate named terms + have conflicting application entries + """ + current_date = datetime.datetime.utcnow().date() + exp_info_date = current_date + datetime.timedelta(weeks=exp_info) + + for header, terms in pol.filters: + if self._PLATFORM not in header.platforms: + continue + + + filter_options = header.FilterOptions(self._PLATFORM) + + # TODO(robankeny): Clean up option section. + if (len(filter_options) < 4 or filter_options[0] != 'from-zone' or + filter_options[2] != 'to-zone'): + raise UnsupportedFilterError('Versa filter arguments must specify ' + 'from-zone and to-zone.') + + # check if to-zone is not a supported target option + self.from_zone = filter_options[1] + if filter_options[1] in self._SUPPORTED_TARGET_OPTIONS: + raise UnsupportedFilterError(f'to-zone {filter_options[1]} cannot be '+ + 'the same as any valid Versa target-options') + + # check if from-zone is not a supported target option + self.to_zone = filter_options[3] + if filter_options[3] in self._SUPPORTED_TARGET_OPTIONS: + raise UnsupportedFilterError(f'from-zone {filter_options[1]} cannot'+ + ' be the same as any valid Versa target-options') + + # variables used to collect target-options and set defaults + filter_type = '' + + # parse versa target options + extra_options = filter_options[4:] + if 'address-book' in ''.join(extra_options): + raise UnsupportedFilterError('Unsupported address-book in target') + + address_book_type = {self._ZONE_ADDR_BOOK} + self.addr_book_type.update(address_book_type) + + verbose = True + cnt = -1 + for i in range(0, len(extra_options)): + if cnt == i: + continue # we want to skip this element + if (i+1 <= len(extra_options) and (extra_options[i] in + self._VERSA_SUPPORTED_TARGET_OPTIONS)): + self.HeaderParams(extra_options[i], extra_options[i+1]) + cnt = i+1 + elif extra_options[i] in self._SUPPORTED_AF: + if not filter_type: + filter_type = extra_options[i] + else: + raise ConflictingTargetOptionsError( + 'only one address family can be specified per header') + elif self._NOVERBOSE in extra_options[i]: + verbose = False + else: + raise UnsupportedHeaderError( + 'Versa Generator currently does not support ' + f'{extra_options[i]} as a header option') + + if verbose and header.comment: + self.comment = header.comment[0] + + # if address-family and address-book-type have not been set then default + if not filter_type: + filter_type = 'mixed' + + + term_dup_check = set() + + new_terms = [] + self._FixLargePolices(terms, filter_type) + addr_counter = 0 + for term in terms: + # Only generate the term if it's for the appropriate platform. + if term.platform: + if self._PLATFORM not in term.platform: + continue + if term.platform_exclude: + if self._PLATFORM in term.platform_exclude: + continue + + if term.counter: + raise VersaUnsupportedTerm( + 'Versa Generator currently does not support counter' + '{' '.join(term.counter)}in the protocol field of term') + + if term.icmp_type: + raise VersaUnsupportedTerm( + 'Versa Generator currently does not support icmp-type' + '{' '.join(term.protocol)}in the protocol field of term') + + if term.protocol and 'icmpv6' in ' '.join(term.protocol): + raise VersaUnsupportedTerm( + 'Versa Generator currently does not support icmpv6' + '{' '.join(term.protocol)}in the protocol field of term') + + if term.stateless_reply: + # pylint: disable=logging-not-lazy + logging.warning( f'WARNING: Term {term.name} in policy '+ + f'{self.from_zone}>{self.to_zone} is a stateless reply '+ + 'term and will not be rendered.') + continue + if set(['established', 'tcp-established']).intersection(term.option): + logging.debug('Skipping established term %s because Versa is' + + ' stateful.',term.name) + continue + term.name = self.FixTermLength(term.name) + if term.name in term_dup_check: + raise VersaDuplicateTermError('You have a duplicate term: ' + + f'{term.name}') + term_dup_check.add(term.name) + + if term.expiration: + if term.expiration <= exp_info_date: + logging.info('INFO: Term %s in policy %s>%s expires ' + 'in less than two weeks.', term.name, self.from_zone, + self.to_zone) + if term.expiration <= current_date: + logging.warning('WARNING: Term %s in policy %s>%s is expired.', + term.name, self.from_zone, self.to_zone) + continue + + + # Versa address books leverage network token names for IPs. + # When excluding addresses, we lose those distinct names so we need + # to create a new unique name based off the term name before excluding. + if term.source_address_exclude: + # If we have a naked source_exclude, we need something to exclude from + if not term.source_address: + raise VersaUnsupportedTerm('Versa Generator received source '+ + 'address exclude but no source address') + + if term.destination_address_exclude: + if not term.destination_address: + raise VersaUnsupportedTerm('Versa Generator received destination '+ + 'address but no destination address') + + # Filter source_address based on filter_type & add to address book + if term.source_address: + valid_addrs = [] + for addr in term.source_address: + if addr.version in self._AF_MAP[filter_type]: + valid_addrs.append(addr) + if not valid_addrs: + logging.warning( + 'WARNING: Term %s has 0 valid source IPs, skipping.', term.name) + continue + term.source_address = valid_addrs + for addr in term.source_address: + addr_counter=self._BuildAddressBook('src-'+term.name, + addr_counter, addr) + + # Filter destination_address based on filter_type & add to address book + if term.destination_address: + valid_addrs = [] + for addr in term.destination_address: + if addr.version in self._AF_MAP[filter_type]: + valid_addrs.append(addr) + if not valid_addrs: + logging.warning( + 'WARNING: Term %s has 0 valid destination IPs, skipping.', + term.name) + continue + term.destination_address = valid_addrs + for addr in term.destination_address: + addr_counter=self._BuildAddressBook('dest-'+term.name, + addr_counter, addr) + + new_term = Term(term, self.from_zone, self.to_zone, + self.addressbook, verbose) + new_terms.append(new_term) + + if term.protocol and 'icmp' in ' '.join(term.protocol): + term.protocol.remove('icmp') + term.versa_application.append('ICMP') + # Because Versa terms can contain inet and inet6 addresses. We have to + # have ability to recover proper AF for ICMP type we need. + # If protocol is empty or we cannot map to inet or inet6 we insert bogus + # af_type name which will cause new_term.NormalizeIcmpTypes to fail. + + # NormalizeIcmpTypes returns [''] for empty, convert to [] for eval + #normalized_icmptype = tmp_icmptype if tmp_icmptype != [''] else [] + # rewrites the protocol icmpv6 to icmp6 + if 'icmpv6' in term.protocol: + protocol = list(term.protocol) + protocol[protocol.index('icmpv6')] = 'icmp6' + else: + protocol = term.protocol + new_application_set = {'sport': self.BuildPort(term.source_port), + 'dport': self.BuildPort(term.destination_port), + 'protocol': protocol } + + # add this only of one of the parameters is not None + if ( new_application_set['sport'] or new_application_set['dport'] or + new_application_set['protocol'] ): + for application_set in self.applications: + if all(item in list(application_set.items()) for item in + new_application_set.items()): + new_application_set = '' + term.replacement_application_name = application_set['name'] + break + if (term.name == application_set['name'] and + new_application_set != application_set): + raise ConflictingApplicationSetsError( + f'Application set {term.name} has a conflicting entry') + + if new_application_set: + new_application_set['name'] = term.name + self.applications.append(new_application_set) + new_term.AddApplication(new_application_set) + + self.versa_policies.append((header, new_terms, filter_options)) + + def _FixLargePolices(self, terms, address_family): + """Loops over all terms finding terms exceeding Versas policy limit. + + Args: + terms: List of terms from a policy. + address_family: Tuple containing address family versions. + + See the following URL for more information + http://www.juniper.net/techpubs/en_US/junos12.1x44/topics/reference/ + general/address-address-sets-limitations.html + """ + + def Chunks(l): + """Splits a list of IP addresses into smaller lists based on byte size.""" + return_list = [[]] + counter = 0 + index = 0 + for i in l: + # Size is split in half due to the max size being a sum of src and dst. + if counter > (self._ADDRESS_LENGTH_LIMIT/2): + counter = 0 + index += 1 + return_list.append([]) + if i.version == 6: + counter += self._IPV6_SIZE + else: + counter += 1 + return_list[index].append(i) + return return_list + + expanded_terms = [] + for term in terms: + if (term.AddressesByteLength( + self._AF_MAP[address_family]) > self._ADDRESS_LENGTH_LIMIT): + logging.warning('LARGE TERM ENCOUNTERED') + src_chunks = Chunks(term.source_address) + counter = 0 + for chunk in src_chunks: + for ip in chunk: + ip.parent_token = 'src_' + term.name + str(counter) + counter += 1 + dst_chunks = Chunks(term.destination_address) + counter = 0 + for chunk in dst_chunks: + for ip in chunk: + ip.parent_token = 'dst_' + term.name + str(counter) + counter += 1 + + src_dst_products = itertools.product(src_chunks, dst_chunks) + counter = 0 + for src_dst_list in src_dst_products: + new_term = copy.copy(term) + new_term.source_address = src_dst_list[0] + new_term.destination_address = src_dst_list[1] + new_term.name = new_term.name + '_' + str(counter) + expanded_terms.append(new_term) + counter += 1 + else: + expanded_terms.append(term) + if expanded_terms: + del terms[:] + terms.extend(expanded_terms) + + def _BuildAddressBook(self, zone, counter, address): + """Create the address book configuration entries. + + Args: + zone: the zone these objects will reside in + address: a naming library address object + """ + if zone not in self.addressbook: + self.addressbook[zone] = collections.defaultdict(list) + if str(counter) in self.addressbook[zone]: + return counter + self.addressbook[zone][str(counter)].append(address) + return counter + 1 + + + def BuildPort(self, ports): + """Transform specified ports into list and ranges. + + Args: + ports: a policy terms list of ports + + Returns: + port_list: list of ports and port ranges + """ + port_list = [] + for i in ports: + if i[0] == i[1]: + port_list.append(str(i[0])) + else: + port_list.append(f'{str(i[0])}-{str(i[1])}') + return port_list + + def GenerateAddressBook(self, node): + """Generate Address Book into the Tree Structure + + Args: + node: the Parent node to attach too + + Returns: + None + """ + if not self.addressbook: + return + addrs = Tree('addresses') + addrs.AddParent(node) + index=0 + for zone in self.addressbook: + # building individual addresses + groups = sorted(self.addressbook[zone]) + for group in groups: + ips = nacaddr.SortAddrList(self.addressbook[zone][group]) + ips = nacaddr.CollapseAddrList(ips) + self.addressbook[zone][group] = ips + count = index + 0 + for address in self.addressbook[zone][group]: + prefix_type = 'ipv4-prefix ' + if isinstance( address, nacaddr.IPv6): + prefix_type = 'ipv6-prefix ' + addr_list = Tree('address'+' _' + group, prefix_type + + ' ' + str(address) + ';') + addr_list.AddParent(addrs) + count += 1 + index += count + + addr_groups=Tree('address-groups') + addr_groups.AddParent(node) + for zone in self.addressbook: + # building address-sets + addrlist = '' + for group in self.addressbook[zone]: + addrlist = addrlist + '_' + group + ' ' + group_t=Tree('group ' + zone, 'address-list [ ' + addrlist + '];') + group_t.AddParent(addr_groups) + + + + def GenerateApplications(self, node): + """Generate Application into the Tree Structure + + Args: + node: the Parent node to attach too + + Returns: + None + """ + if len(self.applications) == 0: + return + srvcs= Tree('services') + srvcs.AddParent(node) + i=1 + for app in sorted(self.applications, key=lambda x: x['name']): + for proto in app['protocol'] or ['']: + mstr = [] + # Protocol + mstr.append('protocol '+ proto.upper() + ';') + + # Source Port + if app['sport'] : + sport_str = 'source-port \"' + j=0 + for sport in app['sport']: + sport_str = sport_str + sport + if j < len(app['sport']) - 1: + sport_str = sport_str + ', ' + j += 1 + sport_str = sport_str + '";' + mstr.append(sport_str) + + # Destination Port + if app['dport'] : + dport_str = 'destination-port \"' + j=0 + for dport in app['dport']: + dport_str = dport_str + dport + if j < len(app['dport']) - 1: + dport_str = dport_str + ', ' + j += 1 + dport_str = dport_str + '";' + mstr.append(dport_str) + + srv= Tree('service '+app['name'] + '-app' + str(i),mstr) + srv.AddParent(srvcs) + i += 1 + + def __str__(self): + """Render the output of the Versa policy into config.""" + root=Tree(name='devices') + + tmplt=Tree(name='template ' + self.templatename) + tmplt.AddParent(root) + + config=Tree('config') + config.AddParent(tmplt) + + if self.comment: + comm=Tree('', '/* ' + self.comment + ' */') + comm.AddParent(config) + + orgs=Tree('orgs') + orgs.AddParent(config) + + org_services=Tree('org-services ' + self.tenantname ) + org_services.AddParent(orgs) + + sec=Tree('security') + sec.AddParent(org_services) + + access_p=Tree('access-policies') + access_p.AddParent(sec) + + access_pg=Tree('access-policy-group ' + self.policyname) + access_pg.AddParent(access_p) + + rules=Tree('rules') + rules.AddParent(access_pg) + + + # pylint: disable=unused-variable + for (header, terms, filter_options) in self.versa_policies: + for term in terms: + term.BuildTerm(rules) + + if self.addressbook: + objects = Tree('objects') + objects.AddParent(org_services) + # AddressBook + self.GenerateAddressBook(objects) + # APPLICATIONS + self.GenerateApplications(objects) + + target = root.PrintTree() + + return '\n'.join(target) diff --git a/doc/generators/versa.md b/doc/generators/versa.md new file mode 100644 index 00000000..a90bc2b3 --- /dev/null +++ b/doc/generators/versa.md @@ -0,0 +1,43 @@ + +## Versa +Note: The Versa generator is currently in beta testing. +``` +target:: srx from-zone [zone name] to-zone [zone name] {template templatename } {tenant tenantname} {policy policyname} { inet} +``` + * _from-zone_: static keyword, followed by user specified zone + * _to-zone_: static keyword, followed by user specified zone + * _template_: static keyword, followed by user specified template name + * _tenant_: static keyword, followed by user specified tenant name + * _policy: static keyword, followed by user specified policy name + * _inet_: Address family (only IPv4 tested at this time) + +### Term Format +* _action::_ The action to take when matched. See Actions section for valid options. +* _comment::_ A text comment enclosed in double-quotes. The comment can extend over multiple lines if desired, until a closing quote is encountered. +* _destination-address::_ One or more destination address tokens +* _destination-exclude::_ Exclude one or more address tokens from the specified destination-address +* _destination-port::_ One or more service definition tokens +* _destination-zone::_ one or more destination zones tokens. Only supported by global policy +* _dscp_match::_ Match a DSCP number. +* _logging::_ Specify that these packets should be logged. + * Based on the input value the resulting logging actions will follow this logic: + * _action_ is 'accept': + * _logging_ is 'true': resulting output will be 'event start;' + * _logging_ is 'log-both': resulting output will be 'event both;' +* _name::_ Name of the term. +* _option::_ See platforms supported Options section. +* _owner::_ Owner of the term, used for organizational purposes. +* _platform::_ one or more target platforms for which this term should ONLY be rendered. +* _protocol::_ the network protocols this term will match, such as tcp, udp, icmp, or a numeric value. +* _source-address::_ one or more source address tokens. +* _source-exclude::_ exclude one or more address tokens from the specified source-address. +* _source-port::_ one or more service definition tokens. +* _source-zone::_ one or more source zones tokens. Only supported by global policy +### Sub Tokens +#### Actions +* _accept_ +* _deny_ +* _dscp_ +* _log_ +* _reject_ + diff --git a/policies/pol/sample_versa.pol b/policies/pol/sample_versa.pol new file mode 100644 index 00000000..13b63279 --- /dev/null +++ b/policies/pol/sample_versa.pol @@ -0,0 +1,32 @@ +header { + comment:: "this is a sample policy to generate Versa filter" + target:: versa from-zone trust to-zone untrust template test tenant tenant1 policy Default-Policy +} + +term test-tcp { + destination-address:: WEB_SERVERS + source-address:: INTERNAL + destination-port:: SMTP + protocol:: tcp + logging:: log-both + action:: accept +} + +term test-icmp { + destination-address:: RFC1918 + protocol:: icmp + action:: accept + logging:: true +} +term good-term-5 { + destination-address:: WEB_SERVERS + source-address:: INTERNAL + action:: accept + versa-application:: ssh who +} + + +term default-deny { + action:: deny + logging:: true +} diff --git a/tests/lib/versa_test.py b/tests/lib/versa_test.py new file mode 100644 index 00000000..20810213 --- /dev/null +++ b/tests/lib/versa_test.py @@ -0,0 +1,739 @@ +# Copyright 2023 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit test for Versa acl rendering module.""" + +import copy +import datetime +import re +from absl.testing import absltest +from unittest import mock +from capirca.lib import aclgenerator +from capirca.lib import versa +from capirca.lib import nacaddr +from capirca.lib import naming +from capirca.lib import policy + + +GOOD_HEADER = """ +header { + comment:: "This is a test acl with a comment" + target:: versa from-zone trust to-zone untrust template test tenant tenant1 +} +""" +GOOD_HEADER_1 = """ +header { + comment:: "This is a test acl with a comment" + target:: versa from-zone trust to-zone untrust +} +""" +GOOD_HEADER_2 = """ +header { + comment:: "This is a test acl with a comment" + target:: versa from-zone trust to-zone untrust template test +} +""" + +GOOD_HEADER_3 = """ +header { + comment:: "This is a test acl with a comment" + target:: versa from-zone trust to-zone untrust template test tenant tenant1 +} +""" + +GOOD_HEADER_4 = """ +header { + comment:: "This is a test acl with a comment" + target:: versa from-zone trust to-zone untrust template test tenant tenant1 policy Default-Policy +} +""" + +GOOD_HEADER_NOVERBOSE = """ +header { + comment:: "This is a test acl with a comment" + target:: versa from-zone trust to-zone untrust template test tenant tenant1 policy Default-Policy noverbose +} +""" + +BAD_HEADER = """ +header { + target:: versa something +} +""" + +BAD_HEADER_1 = """ +header { + comment:: "This header has two address families" + target:: versa from-zone trust to-zone untrust inet6 mixed +} +""" + +BAD_HEADER_3 = """ +header { + comment:: "This is a test acl with a global policy" + target:: versa from-zone all to-zone all address-book-zone +} +""" + +BAD_HEADER_4 = """ +header { + comment:: "This is a test acl with a global policy" + target:: versa from-zone test to-zone all +} +""" + +BAD_HEADER_5 = """ +header { + comment:: "This header has address-book-global in from zone" + target:: versa from-zone address-book-global to-zone any +} +""" + +GOOD_TERM_1 = """ +term good-term-1 { + comment:: "Term allow source dest" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: accept +} +""" + +GOOD_TERM_2 = """ +term good-term-2 { + comment:: "Term reject source dest" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: reject +} +""" + +GOOD_TERM_3 = """ +term good-term-3 { + comment:: "Term deny source dest" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: deny +} +""" + +GOOD_TERM_4 = """ +term good-term-4 { + comment:: "Add a service" + destination-address:: SOME_HOST + source-address:: INTERNAL + destination-port:: SMTP + protocol:: tcp udp + action:: accept +} +""" + +GOOD_TERM_5 = """ +term good-term-5 { + comment:: "Add a pre-defined service" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: accept + versa-application:: ssh +} +""" + +GOOD_TERM_6 = """ +term good-term-6 { + comment:: "Add both service and pre-defined service" + destination-address:: SOME_HOST + source-address:: INTERNAL + destination-port:: SMTP + protocol:: tcp udp + action:: accept + versa-application:: ssh who +} +""" + +GOOD_TERM_7 = """ +term good-term-7 { + comment:: "Add a source zone in term" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: accept + source-zone:: gen +} +""" + +GOOD_TERM_8 = """ +term good-term-8 { + comment:: "Add a dest zone in term" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: accept + destination-zone:: gen +} +""" + +GOOD_TERM_9 = """ +term good-term-9 { + comment:: "Add source and dest zone in term" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: accept + source-zone:: gen + destination-zone:: untrust +} +""" + +GOOD_TERM_10 = """ +term good-term-10 { + comment:: "Add dscp match" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: accept + source-zone:: gen + destination-zone:: untrust + dscp-match:: 40 41 +} +""" + +GOOD_TERM_11 = """ +term good-term-11 { + comment:: "This header is very very very very very very very very very very very very very very very very very very very very large" + destination-address:: SOME_HOST + source-address:: INTERNAL + action:: accept + source-zone:: gen + destination-zone:: untrust + dscp-match:: 40 41 +} +""" + +GOOD_TERM_12 = """ +term good-term-12 { + comment:: "Source address exclude" + source-address:: INCLUDES + source-exclude:: EXCLUDES + protocol:: tcp + action:: accept +} +""" + +GOOD_TERM_13 = """ +term good-term-13 { + comment:: "Destination address exclude" + destination-address:: INCLUDES + destination-exclude:: EXCLUDES + protocol:: tcp + action:: accept +} +""" + +GOOD_TERM_14 = """ +term good-term-14 { + destination-address:: DSTADDRS + source-address:: SRCADDRS + protocol:: tcp + action:: accept +} +""" + +GOOD_TERM_LOG_1 = """ +term good-term-log-1 { + action:: accept + logging:: log-both +} +""" + +GOOD_TERM_LOG_2 = """ +term good-term-log-2 { + action:: accept + logging:: true +} +""" + +GOOD_TERM_LOG_3 = """ +term good-term-log-3 { + action:: deny + logging:: disable +} +""" + +ICMP_TYPE_TERM_0 = """ +term test-icmp { + comment:: "Add icmp " + protocol:: icmp + action:: accept +} +""" + +ICMP_TYPE_TERM_1 = """ +term test-icmp-1 { + comment:: "Add icmp type not supported" + protocol:: icmp + icmp-type:: echo-request echo-reply + action:: accept +} +""" + +ICMP_TYPE_TERM_2 = """ +term test-icmp-2 { + comment:: "Add icmpv6 not supported" + protocol:: icmpv6 + action:: accept +} +""" +EXPIRED_TERM_1 = """ +term expired_test { + expiration:: 2000-1-1 + action:: deny +} +""" + +EXPIRING_TERM = """ +term is_expiring { + expiration:: %s + action:: accept + protocol:: icmp +} +""" + +PLATFORM_TERM = """ +term platform-term { + protocol:: tcp udp + platform:: versa + action:: accept +} +""" + +PLATFORM_EXCLUDE_TERM = """ +term platform-exclude-term { + protocol:: tcp udp + platform-exclude:: versa + action:: accept +} +""" + + +BAD_TERM_COUNT_1 = """ +term bad-term-count-1 { + counter:: good-counter + action:: accept +} +""" + +BAD_TERM_DSCP_SET = """ +term bad-term-11 { + destination-address:: SOME_HOST + action:: accept + dscp-set:: af42 +} +""" + +BAD_TERM_DSCP_EXCEPT = """ +term bad-term-11 { + destination-address:: SOME_HOST + action:: accept + dscp-except:: be +} +""" + +SUPPORTED_TOKENS = frozenset({ + 'action', + 'comment', + 'counter', + 'destination_address', + 'destination_address_exclude', + 'destination_port', + 'destination_zone', + 'dscp_except', + 'dscp_match', + 'dscp_set', + 'source_zone', + 'expiration', + 'icmp_type', + 'stateless_reply', + 'logging', + 'name', + 'option', + 'owner', + 'platform', + 'platform_exclude', + 'protocol', + 'source_address', + 'source_address_exclude', + 'source_port', + 'timeout', + 'translated', + 'verbatim', + 'vpn' +}) + +SUPPORTED_SUB_TOKENS = { + 'action': {'accept', 'deny', 'reject', 'count', 'log', 'dscp'}, + 'icmp_type': { + 'alternate-address', + 'certification-path-advertisement', + 'certification-path-solicitation', + 'conversion-error', + 'destination-unreachable', + 'echo-reply', + 'echo-request', + 'mobile-redirect', + 'home-agent-address-discovery-reply', + 'home-agent-address-discovery-request', + 'icmp-node-information-query', + 'icmp-node-information-response', + 'information-request', + 'inverse-neighbor-discovery-advertisement', + 'inverse-neighbor-discovery-solicitation', + 'mask-reply', + 'mask-request', + 'information-reply', + 'mobile-prefix-advertisement', + 'mobile-prefix-solicitation', + 'multicast-listener-done', + 'multicast-listener-query', + 'multicast-listener-report', + 'multicast-router-advertisement', + 'multicast-router-solicitation', + 'multicast-router-termination', + 'neighbor-advertisement', + 'neighbor-solicit', + 'packet-too-big', + 'parameter-problem', + 'redirect', + 'redirect-message', + 'router-advertisement', + 'router-renumbering', + 'router-solicit', + 'router-solicitation', + 'source-quench', + 'time-exceeded', + 'timestamp-reply', + 'timestamp-request', + 'unreachable', + 'version-2-multicast-listener-report', + }, +} + +# Print a info message when a term is set to expire in that many weeks. +# This is normally passed from command line. +EXP_INFO = 2 + +_IPSET = [nacaddr.IP('10.0.0.0/8'), + nacaddr.IP('2001:4860:8000::/33')] +_IPSET2 = [nacaddr.IP('10.23.0.0/22'), nacaddr.IP('10.23.0.6/23', strict=False)] +_IPSET3 = [nacaddr.IP('10.23.0.0/23')] +_IPSET4 = [nacaddr.IP('10.0.0.0/20')] +_IPSET5 = [nacaddr.IP('10.0.0.0/24')] + + +class VersaTest(absltest.TestCase): + + def setUp(self): + super().setUp() + self.naming = mock.create_autospec(naming.Naming) + + def testHeaderComment(self): + pol = policy.ParsePolicy(GOOD_HEADER + ICMP_TYPE_TERM_0 , self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('This is a test acl with a comment', output, output) + + def testLongComment(self): + out1 = 'This header is very very very very very very very very very' + out2 = 'very very very very very very very very very very very large' + + pol = policy.ParsePolicy(GOOD_HEADER + GOOD_TERM_11, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + + + self.assertIn(out1, output, output) + self.assertIn(out2, output, output) + + + def testHeaderWithoutVersaHeader(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + ICMP_TYPE_TERM_0 , self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('template _templatename', output, output) + self.assertIn('org-services _tenantname', output, output) + self.assertIn('access-policy-group _policyname', output, output) + + def testHeaderWithoutVersaHeaderTemplate(self): + pol = policy.ParsePolicy(GOOD_HEADER_2 + ICMP_TYPE_TERM_0 , self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('template test', output, output) + + def testHeaderWithoutVersaHeaderTenant(self): + pol = policy.ParsePolicy(GOOD_HEADER_3 + ICMP_TYPE_TERM_0 , self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('org-services tenant1', output, output) + + def testHeaderWithNoVerbose(self): + pol = policy.ParsePolicy(GOOD_HEADER_NOVERBOSE+ ICMP_TYPE_TERM_0, + self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertNotIn('/*', output, output) + self.assertNotIn('*/', output, output) + + def testHeaderWithoutVersaHeaderPolicy(self): + pol = policy.ParsePolicy(GOOD_HEADER_4 + ICMP_TYPE_TERM_0 , self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('access-policy-group Default-Policy', output, output) + + def testIcmpV4(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + ICMP_TYPE_TERM_0 , self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn(' predefined-services-list [ ICMP ]', output, output) + + def testIcmpType(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + ICMP_TYPE_TERM_1 , self.naming) + self.assertRaises(versa.VersaUnsupportedTerm, versa.Versa, pol, EXP_INFO) + + def testIcmpV6(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + ICMP_TYPE_TERM_2 , self.naming) + self.assertRaises(versa.VersaUnsupportedTerm, versa.Versa, pol, EXP_INFO) + + def testLoggingBoth(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_LOG_1, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('event both', output, output) + + def testLoggingTrue(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_LOG_2, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('event start', output, output) + + def testLoggingNever(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_LOG_3, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('event never', output, output) + + def testSourceDestAllow(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_1, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('action allow', output, output) + + def testSourceDestReject(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_2, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('action reject', output, output) + + def testSourceDestDeny(self): + self.naming.GetServiceByProto.return_value = ['25'] + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_3, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('action deny', output, output) + + def testAddingService(self): + self.naming.GetServiceByProto.return_value = ['25'] + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_4, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('services-list [ good-term-4-app1 good-term-4-app2 ]', + output, output) + + + def testAddingApplication(self): + self.naming.GetServiceByProto.return_value = ['25'] + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_5, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn(' predefined-services-list [ ssh ]', output, output) + + + def testAddingServiceApplication(self): + self.naming.GetServiceByProto.return_value = ['25'] + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_6, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('services-list [ good-term-6-app1 good-term-6-app2 ]', + output, output) + self.assertIn(' predefined-services-list [ ssh who ]', output, output) + + def testSourceZone(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_7, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('zone-list [ gen ]', output, output) + + def testDestZone(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_8, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('zone-list [ gen ]', output, output) + + def testSourceDestZone(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_9, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('zone-list [ untrust ]', output, output) + self.assertIn('zone-list [ gen ]', output, output) + + def testDscpMatch(self): + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_10, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('dscp [ 40 41 ]', output, output) + + def testBadHeaderSupportedTargetOption(self): + pol = policy.ParsePolicy(BAD_HEADER_5 + GOOD_TERM_10, self.naming) + self.assertRaises(versa.UnsupportedFilterError, versa.Versa, pol, EXP_INFO) + + + @mock.patch.object(versa.logging, 'warning') + def testExpiredTerm(self, mock_warn): + _ = versa.Versa(policy.ParsePolicy(GOOD_HEADER + EXPIRED_TERM_1, + self.naming), EXP_INFO) + + mock_warn.assert_called_once_with( + 'WARNING: Term %s in policy %s>%s is expired.', + 'expired_test', 'trust', 'untrust') + + @mock.patch.object(versa.logging, 'info') + def testExpiringTerm(self, mock_info): + exp_date = datetime.date.today() + datetime.timedelta(weeks=EXP_INFO) + pol = policy.ParsePolicy(GOOD_HEADER + EXPIRING_TERM % + exp_date.strftime('%Y-%m-%d'), + self.naming) + + _ = str(versa.Versa(pol, EXP_INFO)) + mock_info.assert_called_once_with( + 'INFO: Term %s in policy %s>%s expires in ' + 'less than two weeks.', 'is_expiring', + 'trust', 'untrust') + + def testCounterAccept(self): + pol = policy.ParsePolicy(GOOD_HEADER + BAD_TERM_COUNT_1, + self.naming) + self.assertRaises(versa.VersaUnsupportedTerm, + versa.Versa, pol, EXP_INFO) + + def testDscpSet(self): + pol = policy.ParsePolicy(GOOD_HEADER + BAD_TERM_DSCP_SET, + self.naming) + self.assertRaises(aclgenerator.UnsupportedFilterError, + versa.Versa, pol, EXP_INFO) + + + def testDscpExcept(self): + pol = policy.ParsePolicy(GOOD_HEADER + BAD_TERM_DSCP_EXCEPT, + self.naming) + self.assertRaises(aclgenerator.UnsupportedFilterError, + versa.Versa, pol, EXP_INFO) + + def testSourceAddressExclude(self): + includes = ['1.0.0.0/8'] + excludes = ['1.0.0.0/8'] + self.naming.GetNetAddr.side_effect = [[nacaddr.IPv4(ip) for ip in includes], + [nacaddr.IPv4(ip) for ip in excludes]] + + pol = policy.ParsePolicy(GOOD_HEADER + GOOD_TERM_12, + self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('negate', output, output) + + def testDestinationAddressExclude(self): + includes = ['1.0.0.0/8', '2.0.0.0/16' ] + excludes = ['1.0.0.0/8', '2.0.0.0/16' ] + self.naming.GetNetAddr.side_effect = [[nacaddr.IPv4(ip) for ip in includes], + [nacaddr.IPv4(ip) for ip in excludes]] + + pol = policy.ParsePolicy(GOOD_HEADER + GOOD_TERM_13, + self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('negate', output, output) + + @mock.patch.object(versa.logging, 'warning') + def testDestinationAddresssNotMatching(self, mock_warn): + includes = ['1.0.0.0/8', '2.0.0.0/16' ] + excludes = ['1.0.0.0/8', '2.0.0.1/32' ] + self.naming.GetNetAddr.side_effect = [[nacaddr.IPv4(ip) for ip in includes], + [nacaddr.IPv4(ip) for ip in excludes]] + + _ = str(versa.Versa(policy.ParsePolicy(GOOD_HEADER + GOOD_TERM_13, + self.naming), EXP_INFO)) + mock_warn.assert_called_once_with( + 'WARNING: Term good-term-13 in policy ' + + 'has source or destination addresses that does not match '+ + 'address list') + + def testAdressBookIPv4(self): + srcaddrs = ['10.23.0.0/24', '10.24.0.0/24' ] + dstaddrs = ['10.25.0.0/24', '10.26.0.0/24' ] + self.naming.GetNetAddr.side_effect = [ + [nacaddr.IPv4(ip) for ip in srcaddrs ], + [nacaddr.IPv4(ip) for ip in dstaddrs ]] + + self.naming.GetServiceByProto.return_value = ['25'] + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_14, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('10.23.0.0/24', output, output) + self.assertIn('10.24.0.0/24', output, output) + self.assertIn('10.25.0.0/24', output, output) + self.assertIn('10.26.0.0/24', output, output) + + def testAdressBookIPv6(self): + srcaddrs = ['2620:15c:2c4:202:b0e7:158f:6a7a:3188/128', + '2620:15c:2c4:202:b0e7:158a:6a7a:3188/128' ] + dstaddrs = ['2620:15c:2c4:202:b0e7:158b:6a7a:3188/128', + '2620:15c:2c4:202:b0e7:158c:6a7a:3188/128' ] + self.naming.GetNetAddr.side_effect =[ + [nacaddr.IPv6(ip) for ip in srcaddrs ], + [nacaddr.IPv6(ip) for ip in dstaddrs ]] + + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_14, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('2620:15c:2c4:202:b0e7:158f:6a7a:3188/128', output, output) + self.assertIn('2620:15c:2c4:202:b0e7:158a:6a7a:3188/128', output, output) + self.assertIn('2620:15c:2c4:202:b0e7:158b:6a7a:3188/128', output, output) + self.assertIn('2620:15c:2c4:202:b0e7:158c:6a7a:3188/128', output, output) + + def testPlatformTerm(self): + srcaddrs = ['10.23.0.0/24', '10.24.0.0/24' ] + dstaddrs = ['10.25.0.0/24', '10.26.0.0/24' ] + self.naming.GetNetAddr.side_effect = [ + [nacaddr.IPv4(ip) for ip in srcaddrs ], + [nacaddr.IPv4(ip) for ip in dstaddrs ]] + + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_14 + PLATFORM_TERM, + self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('good-term-14', output, output) + self.assertIn('platform-term', output, output) + + def testPlatformExcludeTerm(self): + srcaddrs = ['10.23.0.0/24', '10.24.0.0/24' ] + dstaddrs = ['10.25.0.0/24', '10.26.0.0/24' ] + self.naming.GetNetAddr.side_effect = [ + [nacaddr.IPv4(ip) for ip in srcaddrs ], + [nacaddr.IPv4(ip) for ip in dstaddrs ]] + + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_14, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('good-term-14', output, output) + self.assertNotIn('platform-exclude-term', output) + + def testAdressBookMixedIPs(self): + srcaddrs = [nacaddr.IPv4('10.23.0.0/24'), + nacaddr.IPv6('2620:15c:2c4:202:b0e7:158a:6a7a:3188/128') ] + dstaddrs = [nacaddr.IPv6('2620:15c:2c4:202:b0e7:158b:6a7a:3188/128'), + nacaddr.IPv4('10.24.0.0/24') ] + self.naming.GetNetAddr.side_effect = [srcaddrs ,dstaddrs ] + + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_14, self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('10.23.0.0/24', output, output) + self.assertIn('2620:15c:2c4:202:b0e7:158a:6a7a:3188/128', output, output) + self.assertIn('2620:15c:2c4:202:b0e7:158b:6a7a:3188/128', output, output) + self.assertIn('10.24.0.0/24', output, output) + + def testMultipleTerms1(self): + self.naming.GetServiceByProto.return_value = ['25'] + pol = policy.ParsePolicy(GOOD_HEADER_1 + GOOD_TERM_10 + GOOD_TERM_6, + self.naming) + output = str(versa.Versa(pol, EXP_INFO)) + self.assertIn('access-policy good-term-10', output, output) + self.assertIn('access-policy good-term-6', output, output) + +if __name__ == '__main__': + absltest.main()