From 113d468487881588e7388014c9637c5ced37f304 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 11 Sep 2015 17:32:29 +0200 Subject: [PATCH 01/21] filter SRM endpoints too --- bin/topology-gocdb-connector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index bfaab321..973a34b4 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -301,6 +301,7 @@ def main(): getags = confcust.get_gocdb_getags(job) if getags: group_endpoints = filter_by_tags(getags, group_endpoints) + gelegmap = filter_by_tags(getags, gelegmap) filename = jobdir+globopts['OutputTopologyGroupOfEndpoints'.lower()] % timestamp avro = AvroWriter(globopts['AvroSchemasTopologyGroupOfEndpoints'.lower()], filename, group_endpoints + gelegmap, os.path.basename(sys.argv[0]), logger) From ae1ae45ef2f70cabc62e29ee7cf19a65318c9389 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 16 Sep 2015 10:27:07 +0200 Subject: [PATCH 02/21] refactored use of logging --- bin/downtimes-gocdb-connector.py | 5 +++-- bin/poem-connector.py | 15 ++++++--------- bin/topology-gocdb-connector.py | 7 ++++--- bin/topology-vo-connector.py | 7 ++++--- bin/weights-gstat-connector.py | 7 ++++--- modules/writers.py | 26 +++++++++++++++----------- 6 files changed, 36 insertions(+), 31 deletions(-) diff --git a/bin/downtimes-gocdb-connector.py b/bin/downtimes-gocdb-connector.py index 2d736920..83cecfe9 100755 --- a/bin/downtimes-gocdb-connector.py +++ b/bin/downtimes-gocdb-connector.py @@ -32,7 +32,8 @@ import xml.dom.minidom import copy -from argo_egi_connectors.writers import AvroWriter, Logger +from argo_egi_connectors.writers import AvroWriter +from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf logger = None @@ -144,7 +145,7 @@ def main(): filename = jobdir + globopts['OutputDowntimes'.lower()] % timestamp avro = AvroWriter(globopts['AvroSchemasDowntimes'.lower()], filename, - dts + dtslegmap, os.path.basename(sys.argv[0]), logger) + dts + dtslegmap, os.path.basename(sys.argv[0])) avro.write() logger.info('Fetched Date:%s Endpoints:%d' % (args.date[0], len(dts + dtslegmap))) diff --git a/bin/poem-connector.py b/bin/poem-connector.py index 1310de55..64d6eba2 100755 --- a/bin/poem-connector.py +++ b/bin/poem-connector.py @@ -33,11 +33,10 @@ import urlparse import socket import re -from argo_egi_connectors.writers import AvroWriter, Logger +from argo_egi_connectors.writers import AvroWriter +from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import CustomerConf, PoemConf, Global -writers = ['file', 'avro'] - logger = None globopts, poemopts = {}, {} cpoem = None @@ -206,7 +205,7 @@ def createProfileEntries(self, server, ngi, profile): entries.append(entry) return entries -class FileWriter: +class PrefilterPoem: def __init__(self, outdir): self.outputDir = outdir self.outputFileTemplate = 'poem_sync_%s.out' @@ -271,10 +270,8 @@ def main(): for cust in confcust.get_customers(): # write profiles - for writer in writers: - if writer == 'file': - writerInstance = FileWriter(confcust.get_custdir(cust)) - writerInstance.writeProfiles(ps, timestamp) + poempref = PrefilterPoem(confcust.get_custdir(cust)) + poempref.writeProfiles(ps, timestamp) for job in confcust.get_jobs(cust): jobdir = confcust.get_fulldir(cust, job) @@ -284,7 +281,7 @@ def main(): filename = jobdir + globopts['OutputPoem'.lower()]% timestamp avro = AvroWriter(globopts['AvroSchemasPoem'.lower()], filename, - lfprofiles, os.path.basename(sys.argv[0]), logger) + lfprofiles, os.path.basename(sys.argv[0])) avro.write() logger.info('Job:'+job+' Profiles:%s Tuples:%d' % (','.join(profiles), len(lfprofiles))) diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index 973a34b4..29724530 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -34,7 +34,8 @@ import copy from exceptions import AssertionError -from argo_egi_connectors.writers import AvroWriter, Logger +from argo_egi_connectors.writers import AvroWriter +from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf @@ -290,7 +291,7 @@ def main(): group_groups = filter_by_tags(ggtags, group_groups) filename = jobdir+globopts['OutputTopologyGroupOfGroups'.lower()] % timestamp avro = AvroWriter(globopts['AvroSchemasTopologyGroupOfGroups'.lower()], filename, - group_groups, os.path.basename(sys.argv[0]), logger) + group_groups, os.path.basename(sys.argv[0])) avro.write() gelegmap = [] @@ -304,7 +305,7 @@ def main(): gelegmap = filter_by_tags(getags, gelegmap) filename = jobdir+globopts['OutputTopologyGroupOfEndpoints'.lower()] % timestamp avro = AvroWriter(globopts['AvroSchemasTopologyGroupOfEndpoints'.lower()], filename, - group_endpoints + gelegmap, os.path.basename(sys.argv[0]), logger) + group_endpoints + gelegmap, os.path.basename(sys.argv[0])) avro.write() logger.info('Job:'+job+' Fetched Endpoints:%d' % (numge + len(gelegmap))+' Groups(%s):%d' % (fetchtype, numgg)) diff --git a/bin/topology-vo-connector.py b/bin/topology-vo-connector.py index 539d36f7..b0d656c3 100755 --- a/bin/topology-vo-connector.py +++ b/bin/topology-vo-connector.py @@ -24,7 +24,8 @@ # the EGI-InSPIRE project through the European Commission's 7th # Framework Programme (contract # INFSO-RI-261323) -from argo_egi_connectors.writers import AvroWriter, Logger +from argo_egi_connectors.writers import AvroWriter +from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf from exceptions import AssertionError import datetime @@ -148,7 +149,7 @@ def ismatch(elem): filename = jobdir + globopts['OutputTopologyGroupOfGroups'.lower()] % timestamp avro = AvroWriter(globopts['AvroSchemasTopologyGroupOfGroups'.lower()], filename, filtlgroups, - os.path.basename(sys.argv[0]), logger) + os.path.basename(sys.argv[0])) avro.write() filename = jobdir + globopts['OutputTopologyGroupOfEndpoints'.lower()] % timestamp @@ -160,7 +161,7 @@ def ismatch(elem): gelegmap.append(copy.copy(g)) gelegmap[-1]['service'] = LegMapServType[g['service']] avro = AvroWriter(globopts['AvroSchemasTopologyGroupOfEndpoints'.lower()], filename, group_endpoints + gelegmap, - os.path.basename(sys.argv[0]), logger) + os.path.basename(sys.argv[0])) avro.write() logger.info('Job:'+job+' Fetched Endpoints:%d' % (numge + len(gelegmap))+' Groups:%d' % (numgg)) diff --git a/bin/weights-gstat-connector.py b/bin/weights-gstat-connector.py index a34f724b..2131c515 100755 --- a/bin/weights-gstat-connector.py +++ b/bin/weights-gstat-connector.py @@ -33,7 +33,8 @@ from avro.datafile import DataFileReader from avro.io import DatumReader -from argo_egi_connectors.writers import AvroWriter, Logger +from argo_egi_connectors.writers import AvroWriter +from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf globopts = {} @@ -141,12 +142,12 @@ def main(): filename = jobdir + globopts['OutputWeights'.lower()] % timestamp datawr = gen_outdict(newData) - avro = AvroWriter(globopts['AvroSchemasWeights'.lower()], filename, datawr, os.path.basename(sys.argv[0]), logger) + avro = AvroWriter(globopts['AvroSchemasWeights'.lower()], filename, datawr, os.path.basename(sys.argv[0])) avro.write() if oldDataExists: datawr = gen_outdict(oldData) - avro = AvroWriter(globopts['AvroSchemasWeights'.lower()], filename, datawr, os.path.basename(sys.argv[0]), logger) + avro = AvroWriter(globopts['AvroSchemasWeights'.lower()], filename, datawr, os.path.basename(sys.argv[0])) avro.write() logger.info('Jobs:%d Sites:%d' % (len(jobcust), len(datawr))) diff --git a/modules/writers.py b/modules/writers.py index f173fb7c..882068c5 100644 --- a/modules/writers.py +++ b/modules/writers.py @@ -5,6 +5,7 @@ import sys from exceptions import IOError + class Logger: def __init__(self, connector): lfs = '%(name)s[%(process)s]: %(levelname)s %(message)s' @@ -19,25 +20,28 @@ def __init__(self, connector): sh.setLevel(lv) self.logger.addHandler(sh) - def warn(self, msg): - self.logger.warn(msg) - - def error(self, msg): - self.logger.error(msg) + for func in ['warn', 'error', 'critical', 'info']: + code = """def %s(self, msg): + self.logger.%s(msg)""" % (func, func) + exec code - def critical(self, msg): - self.logger.critical(msg) +class SingletonLogger: + def __init__(self, connector): + if not getattr(self.__class__, 'shared_object', None): + self.__class__.shared_object = Logger(connector) - def info(self, msg): - self.logger.info(msg) + for func in ['warn', 'error', 'critical', 'info']: + code = """def %s(self, msg): + self.__class__.shared_object.%s(msg)""" % (func, func) + exec code class AvroWriter: """ AvroWriter """ - def __init__(self, schema, outfile, listdata, name, logger): + def __init__(self, schema, outfile, listdata, name): + self.logger = SingletonLogger(name) self.schema = schema self.listdata = listdata self.outfile = outfile - self.logger = logger def write(self): try: From 3aa103a30e4550fdfe3fc93b5b0785a438e225ed Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 17 Sep 2015 13:08:11 +0200 Subject: [PATCH 03/21] connectors can verify server certificate --- argo-egi-connectors.spec | 1 + bin/downtimes-gocdb-connector.py | 24 ++++++------- bin/poem-connector.py | 47 ++++++------------------ bin/topology-gocdb-connector.py | 32 ++++++++++------- bin/topology-vo-connector.py | 40 ++++++++++----------- bin/weights-gstat-connector.py | 43 ++++++++++++++++------ etc/global.conf | 2 ++ modules/tools.py | 62 ++++++++++++++++++++++++++++++++ 8 files changed, 161 insertions(+), 90 deletions(-) create mode 100644 modules/tools.py diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index 1b14d891..bd15a624 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -11,6 +11,7 @@ Vendor: SRCE Obsoletes: ar-sync Prefix: %{_prefix} Requires: avro +Requires: pyOpenSSL Source0: %{name}-%{version}.tar.gz BuildArch: noarch diff --git a/bin/downtimes-gocdb-connector.py b/bin/downtimes-gocdb-connector.py index 83cecfe9..4588caa5 100755 --- a/bin/downtimes-gocdb-connector.py +++ b/bin/downtimes-gocdb-connector.py @@ -31,10 +31,14 @@ import sys import xml.dom.minidom import copy +import socket +from urlparse import urlparse from argo_egi_connectors.writers import AvroWriter from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf +from argo_egi_connectors.tools import verify_cert, errmsg_from_excp +from OpenSSL.SSL import Error as SSLError logger = None @@ -44,26 +48,19 @@ class GOCDBReader(object): def __init__(self, feed): - self.gocdbUrl = feed - self.gocdbHost = self._getHostFeed(feed) + self.gocdbHost = urlparse(feed).netloc self.hostKey = globopts['AuthenticationHostKey'.lower()] self.hostCert = globopts['AuthenticationHostCert'.lower()] self.argDateFormat = "%Y-%m-%d" self.WSDateFormat = "%Y-%m-%d %H:%M" - def _getHostFeed(self, feed): - host = feed - if "https://" in feed: - host = feed.split("https://")[1] - if "/" in host: - host = host.split('/')[0] - return host - def getDowntimes(self, start, end): filteredDowntimes = list() try: + if eval(globopts['AuthenticationVerifyServerCert'.lower()]): + verify_cert(self.gocdbHost, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(self.gocdbHost, 443, self.hostKey, self.hostCert) - conn.request('GET', '/gocdbpi/private/' + '?method=get_downtime&windowstart=%s&windowend=%s' % (start.strftime(self.argDateFormat), end.strftime(self.argDateFormat))) + conn.request('GET', '/gocdbpi/private/?method=get_downtime&windowstart=%s&windowend=%s' % (start.strftime(self.argDateFormat), end.strftime(self.argDateFormat))) res = conn.getresponse() if res.status == 200: doc = xml.dom.minidom.parseString(res.read()) @@ -98,13 +95,16 @@ def getDowntimes(self, start, end): except AssertionError: logger.error("GOCDBReader.getDowntimes():", "Error parsing feed") raise SystemExit(1) + except(SSLError, socket.error, socket.timeout) as e: + logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e))) + raise SystemExit(1) return filteredDowntimes def main(): global logger logger = Logger(os.path.basename(sys.argv[0])) - certs = {'Authentication': ['HostKey', 'HostCert']} + certs = {'Authentication': ['HostKey', 'HostCert', 'CAPath', 'VerifyServerCert']} schemas = {'AvroSchemas': ['Downtimes']} output = {'Output': ['Downtimes']} cglob = Global(certs, schemas, output) diff --git a/bin/poem-connector.py b/bin/poem-connector.py index 64d6eba2..d16ffee5 100755 --- a/bin/poem-connector.py +++ b/bin/poem-connector.py @@ -33,40 +33,17 @@ import urlparse import socket import re + from argo_egi_connectors.writers import AvroWriter from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import CustomerConf, PoemConf, Global +from argo_egi_connectors.tools import verify_cert, errmsg_from_excp +from OpenSSL.SSL import Error as SSLError logger = None globopts, poemopts = {}, {} cpoem = None -def resolve_http_redirect(url, depth=0): - if depth > 10: - raise Exception("Redirected "+depth+" times, giving up.") - - o = urlparse.urlparse(url,allow_fragments=True) - conn = httplib.HTTPSConnection(o.netloc, 443, - globopts['AuthenticationHostKey'.lower()], - globopts['AuthenticationHostCert'.lower()]) - path = o.path - if o.query: - path +='?'+o.query - - try: - conn.request("HEAD", path) - res = conn.getresponse() - headers = dict(res.getheaders()) - if headers.has_key('location') and headers['location'] != url: - return resolve_http_redirect(headers['location'], - globopts['AuthenticationHostKey'.lower()], - globopts['AuthenticationHostCert'.lower()], - depth+1) - else: - return url - except: - return url; - class PoemReader: def __init__(self): self.poemRequest = '%s/poem/api/0.2/json/metrics_in_profiles?vo_name=%s' @@ -159,15 +136,13 @@ def loadProfilesFromServer(self, server, vo, filterProfiles): if len(filterProfiles) > 0: doFilterProfiles = True - if 'https://' not in server: - server = 'https://' + server - - logger.info('Server:%s VO:%s' % (server, vo)) - - url = resolve_http_redirect(self.poemRequest % (server,vo)) + url = self.poemRequest % (server, vo) + o = urlparse.urlparse(url, allow_fragments=True) + logger.info('Server:%s VO:%s' % (o.netloc, vo)) - o = urlparse.urlparse(url,allow_fragments=True) try: + if eval(globopts['AuthenticationVerifyServerCert'.lower()]): + verify_cert(o.netloc, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(o.netloc, 443, globopts['AuthenticationHostKey'.lower()], globopts['AuthenticationHostCert'.lower()]) @@ -185,8 +160,8 @@ def loadProfilesFromServer(self, server, vo, filterProfiles): else: logger.error('POEMReader.loadProfilesFromServer(): HTTP response: %s %s' % (str(res.status), res.reason)) raise SystemExit(1) - except (socket.error, httplib.HTTPException) as e: - logger.error('Connection to %s failed: ' % (server) + str(e)) + except(SSLError, socket.error, socket.timeout, httplib.HTTPException) as e: + logger.error('Connection error %s - %s' % (server, errmsg_from_excp(e))) raise SystemExit(1) return validProfiles @@ -246,7 +221,7 @@ def main(): global logger logger = Logger(os.path.basename(sys.argv[0])) - certs = {'Authentication': ['HostKey', 'HostCert']} + certs = {'Authentication': ['HostKey', 'HostCert', 'VerifyServerCert', 'CAPath']} schemas = {'AvroSchemas': ['Poem']} output = {'Output': ['Poem']} cglob = Global(certs, schemas, output) diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index 29724530..a84dbf06 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -24,7 +24,6 @@ # the EGI-InSPIRE project through the European Commission's 7th # Framework Programme (contract # INFSO-RI-261323) -import urllib import datetime import xml.dom.minidom import httplib @@ -32,11 +31,14 @@ import os import socket import copy +from urlparse import urlparse from exceptions import AssertionError from argo_egi_connectors.writers import AvroWriter +from argo_egi_connectors.tools import verify_cert, errmsg_from_excp from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf +from OpenSSL.SSL import Error as SSLError LegMapServType = {'SRM' : 'SRMv2'} @@ -47,22 +49,13 @@ class GOCDBReader: def __init__(self, feed): - self.gocdbUrl = feed - self.gocdbHost = self._getHostFeed(feed) + self.gocdbHost = urlparse(feed).netloc self.hostKey = globopts['AuthenticationHostKey'.lower()] self.hostCert = globopts['AuthenticationHostCert'.lower()] self.siteListEGI, self.siteListLocal = dict(), dict() self.serviceListEGI, self.serviceListLocal = dict(), dict() self.groupListEGI, self.groupListLocal = dict(), dict() - def _getHostFeed(self, feed): - host = feed - if "https://" in feed: - host = feed.split("https://")[1] - if "/" in host: - host = host.split('/')[0] - return host - def getGroupOfServices(self): self.loadDataIfNeeded() @@ -154,6 +147,8 @@ def loadDataIfNeeded(self): def getServiceEndpoints(self, serviceList, scope): try: + if eval(globopts['AuthenticationVerifyServerCert'.lower()]): + verify_cert(self.gocdbHost, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(self.gocdbHost, 443, self.hostKey, self.hostCert) conn.request('GET', '/gocdbpi/private/?method=get_service_endpoint&scope=' + scope) res = conn.getresponse() @@ -181,9 +176,14 @@ def getServiceEndpoints(self, serviceList, scope): except AssertionError: logger.error("GOCDBReader.getServiceEndpoints():", "Error parsing feed") raise SystemExit(1) + except(SSLError, socket.error, socket.timeout) as e: + logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e))) + raise SystemExit(1) def getSitesInternal(self, siteList, scope): try: + if eval(globopts['AuthenticationVerifyServerCert'.lower()]): + verify_cert(self.gocdbHost, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(self.gocdbHost, 443, self.hostKey, self.hostCert) conn.request('GET', '/gocdbpi/private/?method=get_site&scope=' + scope) res = conn.getresponse() @@ -205,9 +205,14 @@ def getSitesInternal(self, siteList, scope): except AssertionError: logger.error("GOCDBReader.getSitesInternal():", "Error parsing feed") raise SystemExit(1) + except(SSLError, socket.error, socket.timeout) as e: + logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e))) + raise SystemExit(1) def getServiceGroups(self, groupList, scope): try: + if eval(globopts['AuthenticationVerifyServerCert'.lower()]): + verify_cert(self.gocdbHost, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(self.gocdbHost, 443, self.hostKey, self.hostCert) conn.request('GET', '/gocdbpi/private/?method=get_service_group&scope=' + scope) res = conn.getresponse() @@ -237,6 +242,9 @@ def getServiceGroups(self, groupList, scope): except AssertionError: logger.error("GOCDBReader.getServiceGroups():", "Error parsing feed") raise SystemExit(1) + except(SSLError, socket.error, socket.timeout) as e: + logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e))) + raise SystemExit(1) def filter_by_tags(tags, listofelem): for attr in tags.keys(): @@ -255,7 +263,7 @@ def main(): global logger logger = Logger(os.path.basename(sys.argv[0])) - certs = {'Authentication': ['HostKey', 'HostCert']} + certs = {'Authentication': ['HostKey', 'HostCert', 'CAPath', 'VerifyServerCert']} schemas = {'AvroSchemas': ['TopologyGroupOfEndpoints', 'TopologyGroupOfGroups']} output = {'Output': ['TopologyGroupOfEndpoints', 'TopologyGroupOfGroups']} cglob = Global(certs, schemas, output) diff --git a/bin/topology-vo-connector.py b/bin/topology-vo-connector.py index b0d656c3..c6cc589d 100755 --- a/bin/topology-vo-connector.py +++ b/bin/topology-vo-connector.py @@ -24,9 +24,11 @@ # the EGI-InSPIRE project through the European Commission's 7th # Framework Programme (contract # INFSO-RI-261323) +from OpenSSL.SSL import Error as SSLError from argo_egi_connectors.writers import AvroWriter from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf +from argo_egi_connectors.tools import verify_cert, errmsg_from_excp from exceptions import AssertionError import datetime import httplib @@ -34,9 +36,11 @@ import sys import socket import os +from urlparse import urlparse import xml.dom.minidom import copy + LegMapServType = {'SRM' : 'SRMv2', 'SRMv2': 'SRM'} globopts = {} logger = None @@ -46,29 +50,25 @@ class VOReader: lgroups = [] def __init__(self, feed): - host, path = self._host_path(feed) - self._parse(host, path) - - def _host_path(self, feed): - urlsplit = re.split('/*', feed) - host = (urlsplit[0], urlsplit[1]) - path = '/'+'/'.join(urlsplit[2:]) - return host, path + self.feed = feed + self._parse() - def _parse(self, host, path): + def _parse(self): + o = urlparse(self.feed) try: - if 'https' in host[0]: - conn = httplib.HTTPSConnection(host[1], 443, + if o.scheme == 'https': + if eval(globopts['AuthenticationVerifyServerCert'.lower()]): + verify_cert(os.path.basename(sys.argv[0]), o.netloc, globopts['AuthenticationCAPath'.lower()], 180) + conn = httplib.HTTPSConnection(o.netloc, 443, globopts['AuthenticationHostKey'.lower()], globopts['AuthenticationHostCert'.lower()]) - elif 'http' in host[0]: - conn = httplib.HTTPConnection(host[1]) - - except (socket.error, httplib.HTTPException) as e: - logger.error('Connection to %s failed: ' % (host) + str(e)) + elif o.scheme == 'http': + conn = httplib.HTTPConnection(o.netloc) + except(SSLError, socket.error, socket.timeout, httplib.HTTPConnection) as e: + logger.error('Connection error %s - %s' % (o.netloc, errmsg_from_excp(e))) raise SystemExit(1) - conn.request('GET', path) + conn.request('GET', o.path) res = conn.getresponse() try: if res.status == 200: @@ -98,10 +98,10 @@ def _parse(self, host, path): ge['type'] = 'SITES' self.lendpoints.append(ge) else: - logger.error('VOReader._parse(): Connection failed %s, HTTP response: %s %s' % (host[1], str(res.status), res.reason)) + logger.error('VOReader._parse(): Connection failed %s, HTTP response: %s %s' % (o.netloc, str(res.status), res.reason)) raise SystemExit(1) except AssertionError: - logger.error("Error parsing VO-feed %s" % ('//'.join(host)+path)) + logger.error("Error parsing VO-feed %s" % (o.netloc + o.path)) raise SystemExit(1) def get_groupgroups(self): @@ -115,7 +115,7 @@ def main(): global logger logger = Logger(os.path.basename(sys.argv[0])) - certs = {'Authentication': ['HostKey', 'HostCert']} + certs = {'Authentication': ['HostKey', 'HostCert', 'CAPath', 'VerifyServerCert']} schemas = {'AvroSchemas': ['TopologyGroupOfEndpoints', 'TopologyGroupOfGroups']} output = {'Output': ['TopologyGroupOfEndpoints', 'TopologyGroupOfGroups']} cglob = Global(certs, schemas, output) diff --git a/bin/weights-gstat-connector.py b/bin/weights-gstat-connector.py index 2131c515..0d6d9f31 100755 --- a/bin/weights-gstat-connector.py +++ b/bin/weights-gstat-connector.py @@ -26,9 +26,12 @@ import urllib2 import os +import httplib import json import datetime import sys +import socket +from urlparse import urlparse from avro.datafile import DataFileReader from avro.io import DatumReader @@ -36,6 +39,8 @@ from argo_egi_connectors.writers import AvroWriter from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf +from argo_egi_connectors.tools import verify_cert, errmsg_from_excp +from OpenSSL.SSL import Error as SSLError globopts = {} logger = None @@ -43,18 +48,35 @@ class GstatReader: def __init__(self, feed): self.GstatRequest = feed + self.hostKey = globopts['AuthenticationHostKey'.lower()] + self.hostCert = globopts['AuthenticationHostCert'.lower()] def getWeights(self): # load server data - urlFile = urllib2.urlopen(self.GstatRequest) - json_data = json.load(urlFile) - urlFile.close(); - weights = dict() - for site in json_data: - key = site['Sitename'] - val = site['HEPSPEC06'] - weights[key] = val - return weights + o = urlparse(self.GstatRequest) + + try: + if o.scheme == 'https': + if eval(globopts['AuthenticationVerifyServerCert'.lower()]): + verify_cert(o.netloc, globopts['AuthenticationCAPath'.lower()], 180) + conn = httplib.HTTPSConnection(o.netloc, 443, self.hostKey, self.hostCert) + else: + conn = httplib.HTTPConnection(o.netloc) + + except(SSLError, socket.error, socket.timeout) as e: + logger.error('Connection error %s - %s' % (o.netloc, errmsg_from_excp(e))) + raise SystemExit(1) + + conn.request('GET', o.path) + res = conn.getresponse() + if res.status == 200: + json_data = json.loads(res.read()) + weights = dict() + for site in json_data: + key = site['Sitename'] + val = site['HEPSPEC06'] + weights[key] = val + return weights def gen_outdict(data): datawr = [] @@ -82,9 +104,10 @@ def main(): global logger logger = Logger(os.path.basename(sys.argv[0])) + certs = {'Authentication': ['HostKey', 'HostCert', 'CAPath', 'VerifyServerCert']} schemas = {'AvroSchemas': ['Weights']} output = {'Output': ['Weights']} - cglob = Global(schemas, output) + cglob = Global(schemas, output, certs) global globopts globopts = cglob.parse() diff --git a/etc/global.conf b/etc/global.conf index 796073da..ddd573b5 100644 --- a/etc/global.conf +++ b/etc/global.conf @@ -2,6 +2,8 @@ SchemaDir = /etc/argo-egi-connectors/schemas/ [Authentication] +VerifyServerCert = False +CAPath = /etc/grid-security/certificates HostKey = /etc/grid-security/hostkey.pem HostCert = /etc/grid-security/hostcert.pem diff --git a/modules/tools.py b/modules/tools.py new file mode 100644 index 00000000..6d896f40 --- /dev/null +++ b/modules/tools.py @@ -0,0 +1,62 @@ +import logging, logging.handlers +import sys +import re +import socket +import signal + +from OpenSSL.SSL import TLSv1_METHOD, Context, Connection +from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT +from OpenSSL.SSL import Error as SSLError +from OpenSSL.SSL import OP_NO_SSLv3 + +def errmsg_from_excp(e): + if getattr(e, 'message', False): + retstr = '' + if isinstance(e.message, list) or isinstance(e.message, tuple) \ + or isinstance(e.message, dict): + for s in e.message: + if isinstance(s, str): + retstr += s + ' ' + if isinstance(s, tuple) or isinstance(s, tuple): + retstr += ' '.join(s) + return retstr + elif isinstance(e.message, str): + return e.message + else: + for s in e.message: + retstr += str(s) + ' ' + return retstr + else: + return str(e) + +def verify_cert(host, capath, timeout): + server_ctx = Context(TLSv1_METHOD) + server_ctx.load_verify_locations(None, capath) + + def verify_cb(conn, cert, errnum, depth, ok): + return ok + server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, 443)) + + server_conn = Connection(server_ctx, sock) + server_conn.set_connect_state() + + def handler(signum, frame): + raise socket.error([('Timeout', 'after', str(timeout) + 's')]) + signal.signal(signal.SIGALRM, handler) + signal.alarm(timeout) + signal.alarm(0) + try: + server_conn.do_handshake() + except SSLError as e: + if 'sslv3 alert handshake failure' in errmsg_from_excp(e): + pass + else: + raise SSLError(e.message) + + server_conn.shutdown() + server_conn.close() + + return True From 2bc3ebfb1ed0fd6ce742ed4edd6ce9036872d5e9 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 17 Sep 2015 15:16:00 +0200 Subject: [PATCH 04/21] report correct number of fetched endpoints even if SRM endpoints were being filtered --- bin/topology-gocdb-connector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index a84dbf06..1d01d124 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -308,6 +308,7 @@ def main(): gelegmap.append(copy.copy(g)) gelegmap[-1]['service'] = LegMapServType[g['service']] getags = confcust.get_gocdb_getags(job) + numgeleg = len(gelegmap) if getags: group_endpoints = filter_by_tags(getags, group_endpoints) gelegmap = filter_by_tags(getags, gelegmap) @@ -316,7 +317,7 @@ def main(): group_endpoints + gelegmap, os.path.basename(sys.argv[0])) avro.write() - logger.info('Job:'+job+' Fetched Endpoints:%d' % (numge + len(gelegmap))+' Groups(%s):%d' % (fetchtype, numgg)) + logger.info('Job:'+job+' Fetched Endpoints:%d' % (numge + numgeleg) +' Groups(%s):%d' % (fetchtype, numgg)) if getags or ggtags: selstr = 'Job:%s Selected ' % (job) selge, selgg = '', '' From 43b7a5d87e85d9398d96338c3f3c82ed222a3d95 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 17 Sep 2015 22:12:31 +0200 Subject: [PATCH 05/21] connectors handle help argument and describe basic info and usage --- bin/downtimes-gocdb-connector.py | 2 +- bin/prefilter-egi.py | 20 ++++++++++++-------- bin/topology-gocdb-connector.py | 21 +++++++++++++-------- bin/topology-vo-connector.py | 18 ++++++++++++------ bin/weights-gstat-connector.py | 27 +++++++++++++++------------ 5 files changed, 53 insertions(+), 35 deletions(-) diff --git a/bin/downtimes-gocdb-connector.py b/bin/downtimes-gocdb-connector.py index 4588caa5..e802118a 100755 --- a/bin/downtimes-gocdb-connector.py +++ b/bin/downtimes-gocdb-connector.py @@ -116,7 +116,7 @@ def main(): confcust.make_dirstruct() feeds = confcust.get_mapfeedjobs(sys.argv[0], deffeed='https://goc.egi.eu/gocdbpi/') - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(description='Fetch downtimes from GOCDB for given date') parser.add_argument('-d', dest='date', nargs=1, metavar='YEAR-MONTH-DAY', required=True) args = parser.parse_args() diff --git a/bin/prefilter-egi.py b/bin/prefilter-egi.py index 6566e39f..a4b8f534 100755 --- a/bin/prefilter-egi.py +++ b/bin/prefilter-egi.py @@ -24,17 +24,17 @@ # the EGI-InSPIRE project through the European Commission's 7th # Framework Programme (contract # INFSO-RI-261323) -import sys -import os -import datetime -import avro.schema import argparse +import avro.schema +import datetime +import os +import sys import time -from avro.datafile import DataFileReader, DataFileWriter -from avro.io import DatumReader, DatumWriter from argo_egi_connectors.config import Global, CustomerConf from argo_egi_connectors.writers import Logger +from avro.datafile import DataFileReader, DataFileWriter +from avro.io import DatumReader, DatumWriter globopts, confcust = {}, None logger = None @@ -298,7 +298,8 @@ def main(): stats = () - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(description="""Filters consumer logs based on various criteria + (allowed NGIs, service flavours, metrics...)""") group = parser.add_mutually_exclusive_group() group.add_argument('-d', dest='date', nargs=1, metavar='YEAR-MONTH-DAY') group.add_argument('-f', dest='cfile', nargs=1, metavar='consumer_log_YEAR-MONTH-DAY.avro') @@ -309,8 +310,11 @@ def main(): date = args.cfile[0].split('_')[-1] date = date.split('.')[0] date = date.split('-') - else: + elif args.date: date = args.date[0].split('-') + else: + parser.print_help() + raise SystemExit(1) if len(date) == 0 or len(date) != 3: logger.error('Consumer file does not end with correctly formatted date') diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index 1d01d124..bfab96f2 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -24,21 +24,22 @@ # the EGI-InSPIRE project through the European Commission's 7th # Framework Programme (contract # INFSO-RI-261323) +import argparse +import copy import datetime -import xml.dom.minidom import httplib -import sys import os import socket -import copy -from urlparse import urlparse -from exceptions import AssertionError +import sys +import xml.dom.minidom -from argo_egi_connectors.writers import AvroWriter +from OpenSSL.SSL import Error as SSLError +from argo_egi_connectors.config import Global, CustomerConf from argo_egi_connectors.tools import verify_cert, errmsg_from_excp +from argo_egi_connectors.writers import AvroWriter from argo_egi_connectors.writers import SingletonLogger as Logger -from argo_egi_connectors.config import Global, CustomerConf -from OpenSSL.SSL import Error as SSLError +from exceptions import AssertionError +from urlparse import urlparse LegMapServType = {'SRM' : 'SRMv2'} @@ -258,6 +259,10 @@ def getit(elem): return listofelem def main(): + parser = argparse.ArgumentParser(description="""Fetch wanted entities (ServiceGroups, Sites, Endpoints) + from GOCDB for every job listed in customer.conf and write them + in an appropriate place""") + args = parser.parse_args() group_endpoints, group_groups = [], [] global logger diff --git a/bin/topology-vo-connector.py b/bin/topology-vo-connector.py index c6cc589d..3edc7c54 100755 --- a/bin/topology-vo-connector.py +++ b/bin/topology-vo-connector.py @@ -25,20 +25,22 @@ # Framework Programme (contract # INFSO-RI-261323) from OpenSSL.SSL import Error as SSLError -from argo_egi_connectors.writers import AvroWriter -from argo_egi_connectors.writers import SingletonLogger as Logger from argo_egi_connectors.config import Global, CustomerConf from argo_egi_connectors.tools import verify_cert, errmsg_from_excp +from argo_egi_connectors.writers import AvroWriter +from argo_egi_connectors.writers import SingletonLogger as Logger from exceptions import AssertionError +from urlparse import urlparse + +import argparse +import copy import datetime import httplib +import os import re -import sys import socket -import os -from urlparse import urlparse +import sys import xml.dom.minidom -import copy LegMapServType = {'SRM' : 'SRMv2', 'SRMv2': 'SRM'} @@ -112,6 +114,10 @@ def get_groupendpoints(self): def main(): + parser = argparse.ArgumentParser(description="""Fetch wanted entities from VO feed provided in customer.conf + and write them in an appropriate place""") + args = parser.parse_args() + global logger logger = Logger(os.path.basename(sys.argv[0])) diff --git a/bin/weights-gstat-connector.py b/bin/weights-gstat-connector.py index 0d6d9f31..b82656d6 100755 --- a/bin/weights-gstat-connector.py +++ b/bin/weights-gstat-connector.py @@ -24,23 +24,23 @@ # the EGI-InSPIRE project through the European Commission's 7th # Framework Programme (contract # INFSO-RI-261323) -import urllib2 -import os +import argparse +import datetime import httplib import json -import datetime -import sys +import os import socket -from urlparse import urlparse - -from avro.datafile import DataFileReader -from avro.io import DatumReader +import sys +import urllib2 -from argo_egi_connectors.writers import AvroWriter -from argo_egi_connectors.writers import SingletonLogger as Logger +from OpenSSL.SSL import Error as SSLError from argo_egi_connectors.config import Global, CustomerConf from argo_egi_connectors.tools import verify_cert, errmsg_from_excp -from OpenSSL.SSL import Error as SSLError +from argo_egi_connectors.writers import AvroWriter +from argo_egi_connectors.writers import SingletonLogger as Logger +from avro.datafile import DataFileReader +from avro.io import DatumReader +from urlparse import urlparse globopts = {} logger = None @@ -99,8 +99,11 @@ def loadOldData(directory, timestamp): return oldDataDict - def main(): + parser = argparse.ArgumentParser(description="""Fetch weights information from Gstat provider + for every job listed in customer.conf""") + args = parser.parse_args() + global logger logger = Logger(os.path.basename(sys.argv[0])) From 9d0de25aab309df482c8b9d74021e8417b8a9c81 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 18 Sep 2015 13:12:58 +0200 Subject: [PATCH 06/21] removed hardcoded scopes and grab them dynamically from config --- bin/topology-gocdb-connector.py | 116 +++++++++++++++++++------------- bin/topology-vo-connector.py | 4 +- modules/config.py | 62 ++++++++--------- 3 files changed, 104 insertions(+), 78 deletions(-) diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index bfab96f2..695303c6 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -49,28 +49,37 @@ logger = None class GOCDBReader: - def __init__(self, feed): + def __init__(self, feed, scopes): self.gocdbHost = urlparse(feed).netloc + self.scopes = scopes if scopes else set(['EGI']) self.hostKey = globopts['AuthenticationHostKey'.lower()] self.hostCert = globopts['AuthenticationHostCert'.lower()] - self.siteListEGI, self.siteListLocal = dict(), dict() - self.serviceListEGI, self.serviceListLocal = dict(), dict() - self.groupListEGI, self.groupListLocal = dict(), dict() + for scope in self.scopes: + code = "self.serviceList%s = dict(); " % scope + code += "self.groupList%s = dict();" % scope + code += "self.siteList%s = dict()" % scope + exec code + self.fetched = False def getGroupOfServices(self): - self.loadDataIfNeeded() + if not self.fetched: + self.loadDataIfNeeded() - groups = list() - for d in self.groupListEGI, self.groupListLocal: - key, group = d.iteritems() - for service in group['services']: + groups, gl = list(), list() + + for scope in self.scopes: + code = "gl = gl + [value for key, value in self.groupList%s.iteritems()]" % scope + exec code + + for d in gl: + for service in d['services']: g = dict() g['type'] = fetchtype.upper() - g['group'] = group['name'] + g['group'] = d['name'] g['service'] = service['type'] g['hostname'] = service['hostname'] - g['group_monitored'] = group['monitored'] - g['tags'] = {'scope' : group['scope'], \ + g['group_monitored'] = d['monitored'] + g['tags'] = {'scope' : d['scope'], \ 'monitored' : 1 if service['monitored'] == "Y" else 0, \ 'production' : 1 if service['production'] == "Y" else 0} groups.append(g) @@ -78,23 +87,28 @@ def getGroupOfServices(self): return groups def getGroupOfGroups(self): - self.loadDataIfNeeded() + if not self.fetched: + self.loadDataIfNeeded() - groupofgroups = list() + groupofgroups, gl = list(), list() if fetchtype == "ServiceGroups": - for d in self.groupListEGI, self.groupListLocal: - key, value = d.iteritems() + for scope in self.scopes: + code = "gl = gl + [value for key, value in self.groupList%s.iteritems()]" % scope + exec code + for d in gl: g = dict() g['type'] = 'PROJECT' g['group'] = 'EGI' - g['subgroup'] = value['name'] - g['tags'] = {'monitored' : 1 if value['monitored'] == 'Y' else 0, - 'scope' : value['scope']} + g['subgroup'] = d['name'] + g['tags'] = {'monitored' : 1 if d['monitored'] == 'Y' else 0, + 'scope' : d['scope']} groupofgroups.append(g) else: - gg = sorted([value for d in self.siteListEGI, self.siteListLocal for key, value in d.iteritems()], - key=lambda s: s['ngi']) + gg = [] + for scope in self.scopes: + code = "gg = gg + sorted([value for key, value in self.siteList%s.iteritems()], key=lambda s: s['ngi'])" % scope + exec code for gr in gg: g = dict() @@ -110,11 +124,13 @@ def getGroupOfGroups(self): return groupofgroups def getGroupOfEndpoints(self): - self.loadDataIfNeeded() + if not self.fetched: + self.loadDataIfNeeded() - groupofendpoints = list() - ge = sorted([value for d in self.serviceListEGI, self.serviceListLocal for key, value in d.iteritems()], - key=lambda s: s['site']) + groupofendpoints, ge = list(), list() + for scope in self.scopes: + code = "ge = ge + sorted([value for key, value in self.serviceList%s.iteritems()], key=lambda s: s['site'])" % scope + exec code for gr in ge: g = dict() @@ -131,17 +147,12 @@ def getGroupOfEndpoints(self): def loadDataIfNeeded(self): try: - if len(self.siteListEGI) == 0: - self.getSitesInternal(self.siteListEGI, 'EGI') - self.getSitesInternal(self.siteListLocal, 'Local') - - if len(self.serviceListEGI) == 0: - self.getServiceEndpoints(self.serviceListEGI, 'EGI') - self.getServiceEndpoints(self.serviceListLocal, 'Local') + for scope in self.scopes: + eval("self.getSitesInternal(self.siteList%s, '&scope='+scope)" % scope) + eval("self.getServiceGroups(self.groupList%s, '&scope='+scope)" % scope) + eval("self.getServiceEndpoints(self.serviceList%s, '&scope='+scope)" % scope) + self.fetched = True - if len(self.groupListEGI) == 0: - self.getServiceGroups(self.groupListEGI, 'EGI') - self.getServiceGroups(self.groupListLocal, 'Local') except (socket.error, httplib.HTTPException) as e: logger.error('Connection to GOCDB failed: ' + str(e)) raise SystemExit(1) @@ -151,7 +162,7 @@ def getServiceEndpoints(self, serviceList, scope): if eval(globopts['AuthenticationVerifyServerCert'.lower()]): verify_cert(self.gocdbHost, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(self.gocdbHost, 443, self.hostKey, self.hostCert) - conn.request('GET', '/gocdbpi/private/?method=get_service_endpoint&scope=' + scope) + conn.request('GET', '/gocdbpi/private/?method=get_service_endpoint' + scope) res = conn.getresponse() if res.status == 200: doc = xml.dom.minidom.parseString(res.read()) @@ -169,13 +180,13 @@ def getServiceEndpoints(self, serviceList, scope): serviceList[serviceId]['production'] = service.getElementsByTagName('IN_PRODUCTION')[0].childNodes[0].data serviceList[serviceId]['site'] = service.getElementsByTagName('SITENAME')[0].childNodes[0].data serviceList[serviceId]['roc'] = service.getElementsByTagName('ROC_NAME')[0].childNodes[0].data - serviceList[serviceId]['scope'] = scope + serviceList[serviceId]['scope'] = scope.split('=')[1] serviceList[serviceId]['sortId'] = serviceList[serviceId]['hostname'] + '-' + serviceList[serviceId]['type'] + '-' + serviceList[serviceId]['site'] else: logger.error('GOCDBReader.getServiceEndpoints(): HTTP response: %s %s' % (str(res.status), res.reason)) raise SystemExit(1) except AssertionError: - logger.error("GOCDBReader.getServiceEndpoints():", "Error parsing feed") + logger.error("GOCDBReader.getServiceEndpoints(): Error parsing feed") raise SystemExit(1) except(SSLError, socket.error, socket.timeout) as e: logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e))) @@ -186,7 +197,7 @@ def getSitesInternal(self, siteList, scope): if eval(globopts['AuthenticationVerifyServerCert'.lower()]): verify_cert(self.gocdbHost, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(self.gocdbHost, 443, self.hostKey, self.hostCert) - conn.request('GET', '/gocdbpi/private/?method=get_site&scope=' + scope) + conn.request('GET', '/gocdbpi/private/?method=get_site'+scope) res = conn.getresponse() if res.status == 200: dom = xml.dom.minidom.parseString(res.read()) @@ -199,12 +210,12 @@ def getSitesInternal(self, siteList, scope): siteList[siteName]['infrastructure'] = site.getElementsByTagName('PRODUCTION_INFRASTRUCTURE')[0].childNodes[0].data siteList[siteName]['certification'] = site.getElementsByTagName('CERTIFICATION_STATUS')[0].childNodes[0].data siteList[siteName]['ngi'] = site.getElementsByTagName('ROC')[0].childNodes[0].data - siteList[siteName]['scope'] = scope + siteList[siteName]['scope'] = scope.split('=')[1] else: logger.error('GOCDBReader.getSitesInternal(): HTTP response: %s %s' % (str(res.status), res.reason)) raise SystemExit(1) except AssertionError: - logger.error("GOCDBReader.getSitesInternal():", "Error parsing feed") + logger.error("GOCDBReader.getSitesInternal(): Error parsing feed") raise SystemExit(1) except(SSLError, socket.error, socket.timeout) as e: logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e))) @@ -215,7 +226,7 @@ def getServiceGroups(self, groupList, scope): if eval(globopts['AuthenticationVerifyServerCert'.lower()]): verify_cert(self.gocdbHost, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(self.gocdbHost, 443, self.hostKey, self.hostCert) - conn.request('GET', '/gocdbpi/private/?method=get_service_group&scope=' + scope) + conn.request('GET', '/gocdbpi/private/?method=get_service_group' + scope) res = conn.getresponse() if res.status == 200: doc = xml.dom.minidom.parseString(res.read()) @@ -227,7 +238,7 @@ def getServiceGroups(self, groupList, scope): groupList[groupId] = {} groupList[groupId]['name'] = group.getElementsByTagName('NAME')[0].childNodes[0].data groupList[groupId]['monitored'] = group.getElementsByTagName('MONITORED')[0].childNodes[0].data - groupList[groupId]['scope'] = scope + groupList[groupId]['scope'] = scope.split('=')[1] groupList[groupId]['services'] = [] services = group.getElementsByTagName('SERVICE_ENDPOINT') for service in services: @@ -241,7 +252,7 @@ def getServiceGroups(self, groupList, scope): logger.error('GOCDBReader.getServiceGroups(): HTTP response: %s %s' % (str(res.status), res.reason)) raise SystemExit(1) except AssertionError: - logger.error("GOCDBReader.getServiceGroups():", "Error parsing feed") + logger.error("GOCDBReader.getServiceGroups(): Error parsing feed") raise SystemExit(1) except(SSLError, socket.error, socket.timeout) as e: logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e))) @@ -253,8 +264,14 @@ def getit(elem): value = elem['tags'][attr.lower()] if isinstance(value, int): value = 'Y' if value else 'N' - if value.lower() == tags[attr].lower(): - return True + if isinstance(tags[attr], list): + for a in tags[attr]: + if value.lower() == a.lower(): + return True + else: + if value.lower() == tags[attr].lower(): + return True + listofelem = filter(getit, listofelem) return listofelem @@ -278,12 +295,13 @@ def main(): confcust = CustomerConf(sys.argv[0]) confcust.parse() confcust.make_dirstruct() + scopes = confcust.get_allspec_scopes(sys.argv[0], 'GOCDB') feeds = confcust.get_mapfeedjobs(sys.argv[0], 'GOCDB', deffeed='https://goc.egi.eu/gocdbpi/') timestamp = datetime.datetime.utcnow().strftime('%Y_%m_%d') for feed, jobcust in feeds.items(): - gocdb = GOCDBReader(feed) + gocdb = GOCDBReader(feed, scopes) for job, cust in jobcust: jobdir = confcust.get_fulldir(cust, job) @@ -328,11 +346,15 @@ def main(): selge, selgg = '', '' if getags: for key, value in getags.items(): + if isinstance(value, list): + value = '['+','.join(value)+']' selge += '%s:%s,' % (key, value) selstr += 'Endpoints(%s):' % selge[:len(selge) - 1] selstr += '%d ' % (len(group_endpoints) + len(gelegmap)) if ggtags: for key, value in ggtags.items(): + if isinstance(value, list): + value = '['+','.join(value)+']' selgg += '%s:%s,' % (key, value) selstr += 'Groups(%s):' % selgg[:len(selgg) - 1] selstr += '%d' % (len(group_groups)) diff --git a/bin/topology-vo-connector.py b/bin/topology-vo-connector.py index 3edc7c54..6c341a88 100755 --- a/bin/topology-vo-connector.py +++ b/bin/topology-vo-connector.py @@ -175,7 +175,9 @@ def ismatch(elem): selstr = 'Job:%s Selected ' % (job) selgg = '' for key, value in tags.items(): - selgg += '%s:%s,' % (key, ','.join(value)) + if isinstance(value, list): + value = ','.join(value) + selgg += '%s:%s,' % (key, value) selstr += 'Groups(%s):' % selgg[:len(selgg) - 1] selstr += '%d' % (len(filtlgroups)) diff --git a/modules/config.py b/modules/config.py index 4be3f154..ddcbffab 100644 --- a/modules/config.py +++ b/modules/config.py @@ -268,46 +268,31 @@ def get_profiles(self, job): def get_gocdb_fetchtype(self, job): return self._jobs[job]['TopoFetchType'] - def _get_gocdb_tags(self, job, option): + def _get_tags(self, job, option): tags = {} if option in self._jobs[job].keys(): tagstr = self._jobs[job][option] - for tag in tagstr.split(','): - mt = re.match('\s*(\w+)\s*:\s*(\w+)\s*', tag) - if mt is not None: - tkey = mt.group(1) - tvalue = mt.group(2) - tags.update({tkey: tvalue}) - else: - print self.__class__, "Could not parse option %s: %s" % (option, tag) + match = re.findall("(\w+)\s*:\s*(\(.*?\))", tagstr) + if match is not None: + for m in match: + tags.update({m[0]: [e.strip('() ') for e in m[1].split(',')]}) + match = re.findall('([\w]+)\s*:\s*([\w]+)', tagstr) + if match is not None: + for m in match: + tags.update({m[0]: m[1]}) + else: + print self.__class__, "Could not parse option %s: %s" % (option, tag) + return {} return tags def get_gocdb_ggtags(self, job): - return self._get_gocdb_tags(job, 'TopoSelectGroupOfGroups') + return self._get_tags(job, 'TopoSelectGroupOfGroups') def get_gocdb_getags(self, job): - return self._get_gocdb_tags(job, 'TopoSelectGroupOfEndpoints') + return self._get_tags(job, 'TopoSelectGroupOfEndpoints') def get_vo_ggtags(self, job): - if 'TopoSelectGroupOfGroups' in self._jobs[job].keys(): - t = self._jobs[job]['TopoSelectGroupOfGroups'] - match = re.match("\s*(\w+)\s*:\s*(\(.*\))", t) - if match is not None: - tkey = match.group(1) - tvalue = match.group(2).strip("() ") - tvalue = re.split("\s*,\s*", tvalue) - return {tkey: tvalue} - else: - match = re.match("\s*(\w+)\s*:(.*)", t) - if match is not None: - tkey = match.group(1) - tvalue = match.group(2).strip() - return {tkey: [tvalue]} - else: - print self.__class__, "Could not parse option TopoSelectGroupOfGroups: %s" % t - return {} - else: - return {} + return self._get_tags(job, 'TopoSelectGroupOfGroups') def _get_toponame(self, job): return self._jobs[job]['TopoType'] @@ -326,6 +311,23 @@ def _update_feeds(self, feeds, feedurl, job, cust): feeds[feedurl] = [] feeds[feedurl].append((job, cust)) + def get_allspec_scopes(self, caller, name=None): + distinct_scopes = set() + ggtags, getags = [], [] + for c in self.get_customers(): + for job in self.get_jobs(c): + if self._get_toponame(job) == name: + gg = self._get_tags(job, 'TopoSelectGroupOfGroups') + ge = self._get_tags(job, 'TopoSelectGroupOfEndpoints') + for g in gg.items() + ge.items(): + if 'Scope'.lower() == g[0].lower(): + if isinstance(g[1], list): + distinct_scopes.update(g[1]) + else: + distinct_scopes.update([g[1]]) + + return distinct_scopes + def get_mapfeedjobs(self, caller, name=None, deffeed=None): feeds = {} for c in self.get_customers(): From 3367a9d4bae457058218281def0bafbfe29d7dde Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Mon, 21 Sep 2015 11:01:23 +0200 Subject: [PATCH 07/21] report config parser errors via logger --- argo-egi-connectors.spec | 2 +- bin/topology-gocdb-connector.py | 2 +- modules/config.py | 51 ++++++++++++++------------------- 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index bd15a624..e55bbd54 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -11,7 +11,7 @@ Vendor: SRCE Obsoletes: ar-sync Prefix: %{_prefix} Requires: avro -Requires: pyOpenSSL +Requires: pyOpenSSL Source0: %{name}-%{version}.tar.gz BuildArch: noarch diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index 695303c6..4165b0b5 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -276,7 +276,7 @@ def getit(elem): return listofelem def main(): - parser = argparse.ArgumentParser(description="""Fetch wanted entities (ServiceGroups, Sites, Endpoints) + parser = argparse.ArgumentParser(description="""Fetch entities (ServiceGroups, Sites, Endpoints) from GOCDB for every job listed in customer.conf and write them in an appropriate place""") args = parser.parse_args() diff --git a/modules/config.py b/modules/config.py index ddcbffab..5b3a0067 100644 --- a/modules/config.py +++ b/modules/config.py @@ -1,9 +1,10 @@ import ConfigParser import os, re, errno - +from argo_egi_connectors.writers import SingletonLogger as Logger class Global: def __init__(self, *args, **kwargs): + self.logger = Logger(self.__class__) self._args = args self._filename = '/etc/argo-egi-connectors/global.conf' self._checkpath = kwargs['checkpath'] if 'checkpath' in kwargs.keys() else False @@ -11,7 +12,7 @@ def __init__(self, *args, **kwargs): def parse(self): config = ConfigParser.ConfigParser() if not os.path.exists(self._filename): - print self.__class__, 'Could not find %s' % self._filename + self.logger.error('Could not find %s' % self._filename) raise SystemExit(1) config.read(self._filename) options = {} @@ -27,15 +28,13 @@ def parse(self): raise OSError(errno.ENOENT, optget) options.update({(sect+opt).lower(): optget}) except ConfigParser.NoOptionError as e: - # TODO: syslog - print self.__class__, "No option '%s' in section: '%s'" % (e.args[0], e.args[1]) + self.logger.error("No option '%s' in section: '%s'" % (e.args[0], e.args[1])) raise SystemExit(1) except ConfigParser.NoSectionError as e: - # TODO: syslog - print self.__class__, "No section '%s' defined" % (e.args[0]) + self.logger.error("No section '%s' defined" % (e.args[0])) raise SystemExit(1) except OSError as e: - print self.__class__, os.strerror(e.args[0]), e.args[1], optget + self.logger.error('%s %s' % (os.strerror(e.args[0]), e.args[1])) raise SystemExit(1) return options @@ -44,13 +43,14 @@ class PoemConf: options = {} def __init__(self, *args): + self.logger = Logger(self.__class__) self._args = args self._filename = '/etc/argo-egi-connectors/poem-connector.conf' def parse(self): config = ConfigParser.ConfigParser() if not os.path.exists(self._filename): - print self.__class__, 'Could not find %s' % self._filename + self.logger.error('Could not find %s' % self._filename) raise SystemExit(1) config.read(self._filename) @@ -67,12 +67,10 @@ def parse(self): self.options.update({(section+o).lower(): optget}) except ConfigParser.NoOptionError as e: - # TODO: syslog - print self.__class__, "No option '%s' in section: '%s'" % (e.args[0], e.args[1]) + self.logger.error("No option '%s' in section: '%s'" % (e.args[0], e.args[1])) raise SystemExit(1) except ConfigParser.NoSectionError as e: - # TODO: syslog - print self.__class__, "No section '%s' defined" % (e.args[0]) + self.logger.error("No section '%s' defined" % (e.args[0])) raise SystemExit(1) return self.options @@ -100,16 +98,14 @@ def get_allngi(self): try: return self._get_ngis('PrefilterDataAllNGI'.lower()) except KeyError as e: - # TODO: syslog - print self.__class__, "No option %s defined" % e + self.logger.error("No option %s defined" % e) raise SystemExit(1) def get_allowedngi(self): try: return self._get_ngis('PrefilterDataAllowedNGI'.lower()) except KeyError as e: - # TODO: syslog - print self.__class__, "No option %s defined" % e + self.logger.error("No option %s defined" % e) raise SystemExit(1) def get_servers(self): @@ -145,6 +141,7 @@ class CustomerConf: tenantdir = '' def __init__(self, caller=None, **kwargs): + self.logger = Logger(self.__class__) self._filename = '/etc/argo-egi-connectors/customer.conf' if not kwargs: self._jobattrs = self._defjobattrs[os.path.basename(caller)] @@ -157,7 +154,7 @@ def __init__(self, caller=None, **kwargs): def parse(self): config = ConfigParser.ConfigParser() if not os.path.exists(self._filename): - print self.__class__, 'Could not find %s' % self._filename + self.logger.error('Could not find %s' % self._filename) raise SystemExit(1) config.read(self._filename) @@ -168,8 +165,7 @@ def parse(self): custjobs = [job.strip() for job in custjobs] custdir = config.get(section, 'OutputDir') except ConfigParser.NoOptionError as e: - # TODO: syslog - print self.__class__, "No option '%s' in section: '%s'" % (e.args[0], e.args[1]) + self.logger.error("No option '%s' in section: '%s'" % (e.args[0], e.args[1])) raise SystemExit(1) self._cust.update({section: {'Jobs': custjobs, 'OutputDir': custdir}}) @@ -186,8 +182,7 @@ def parse(self): profiles = config.get(job, 'Profiles') dirname = config.get(job, 'Dirname') except ConfigParser.NoOptionError as e: - # TODO: syslog - print self.__class__, "No option '%s' in section: '%s'" % (e.args[0], e.args[1]) + self.logger.error("No option '%s' in section: '%s'" % (e.args[0], e.args[1])) raise SystemExit(1) self._jobs.update({job: {'Profiles': profiles, 'Dirname': dirname}}) @@ -196,7 +191,7 @@ def parse(self): if config.has_option(job, attr): self._jobs[job].update({attr: config.get(job, attr)}) else: - print self.__class__, "Could not find Jobs: %s for customer: %s" % (job, cust) + self.logger.error("Could not find Jobs: %s for customer: %s" % (job, cust)) raise SystemExit(1) def _sect_to_dir(self, sect): @@ -205,8 +200,7 @@ def _sect_to_dir(self, sect): assert match != None dirname = match.group(1) except (AssertionError, KeyError) as e: - # TODO: syslog - print self.__class__, "Could not get Dirname for %s" % e + self.logger.error("Could not get Dirname for %s" % e) raise SystemExit(1) return dirname @@ -243,7 +237,7 @@ def make_dirstruct(self): os.makedirs(d) except OSError as e: if e.args[0] != errno.EEXIST: - print self.__class__, os.strerror(e.args[0]), e.args[1], d + self.logger.error('%s %s %s' % os.strerror(e.args[0]), e.args[1], d) raise SystemExit(1) def get_jobs(self, cust): @@ -251,8 +245,7 @@ def get_jobs(self, cust): try: jobs = self._cust[cust]['Jobs'] except KeyError: - # TODO: syslog - print self.__class__, "Could not get Jobs for %s" % cust + self.logger.error("Could not get Jobs for %s" % cust) raise SystemExit(1) return jobs @@ -281,7 +274,7 @@ def _get_tags(self, job, option): for m in match: tags.update({m[0]: m[1]}) else: - print self.__class__, "Could not parse option %s: %s" % (option, tag) + self.logger.error("Could not parse option %s: %s" % (option, tagstr)) return {} return tags @@ -338,7 +331,7 @@ def get_mapfeedjobs(self, caller, name=None, deffeed=None): if feedurl: self._update_feeds(feeds, feedurl, job, c) elif not feedurl and name == 'VOFeed': - print self.__class__, "Could not get VO TopoFeed for job %s" % job + self.logger.error("Could not get VO TopoFeed for job %s" % job) raise SystemExit(1) else: feedurl = deffeed From 8b7425a7807200839084ce163926fa6e0ad7b147 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 23 Sep 2015 09:35:31 +0200 Subject: [PATCH 08/21] downtimes connector complain if wrong date specified --- bin/downtimes-gocdb-connector.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/bin/downtimes-gocdb-connector.py b/bin/downtimes-gocdb-connector.py index e802118a..32b5a3a2 100755 --- a/bin/downtimes-gocdb-connector.py +++ b/bin/downtimes-gocdb-connector.py @@ -125,9 +125,13 @@ def main(): raise SystemExit(1) # calculate start and end times - start = datetime.datetime.strptime(args.date[0], '%Y-%m-%d') - end = datetime.datetime.strptime(args.date[0], '%Y-%m-%d') - timestamp = start.strftime('%Y_%m_%d') + try: + start = datetime.datetime.strptime(args.date[0], '%Y-%m-%d') + end = datetime.datetime.strptime(args.date[0], '%Y-%m-%d') + timestamp = start.strftime('%Y_%m_%d') + except ValueError as e: + logger.error(e) + raise SystemExit(1) start = start.replace(hour=0, minute=0, second=0) end = end.replace(hour=23, minute=59, second=59) From d5a8cec20d2f24407cbb14a0bf2278ba0182f010 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 24 Sep 2015 10:43:56 +0200 Subject: [PATCH 09/21] remove notion of default scope --- bin/topology-gocdb-connector.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index 4165b0b5..89edb1e6 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -51,7 +51,7 @@ class GOCDBReader: def __init__(self, feed, scopes): self.gocdbHost = urlparse(feed).netloc - self.scopes = scopes if scopes else set(['EGI']) + self.scopes = scopes if scopes else set(['NoScope']) self.hostKey = globopts['AuthenticationHostKey'.lower()] self.hostCert = globopts['AuthenticationHostCert'.lower()] for scope in self.scopes: @@ -147,10 +147,11 @@ def getGroupOfEndpoints(self): def loadDataIfNeeded(self): try: + scopequery = "'&scope='+scope" for scope in self.scopes: - eval("self.getSitesInternal(self.siteList%s, '&scope='+scope)" % scope) - eval("self.getServiceGroups(self.groupList%s, '&scope='+scope)" % scope) - eval("self.getServiceEndpoints(self.serviceList%s, '&scope='+scope)" % scope) + eval("self.getSitesInternal(self.siteList%s, %s)" % (scope, '' if scope == 'NoScope' else scopequery)) + eval("self.getServiceGroups(self.groupList%s, %s)" % (scope, '' if scope == 'NoScope' else scopequery)) + eval("self.getServiceEndpoints(self.serviceList%s, %s)" % (scope, '' if scope == 'NoScope' else scopequery)) self.fetched = True except (socket.error, httplib.HTTPException) as e: From 154446e478c19de7cb7f6040266e9a74003e8e3e Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 2 Oct 2015 09:01:20 +0200 Subject: [PATCH 10/21] doc moved to repo --- doc/sync.md | 453 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 453 insertions(+) create mode 100644 doc/sync.md diff --git a/doc/sync.md b/doc/sync.md new file mode 100644 index 00000000..06e9c73f --- /dev/null +++ b/doc/sync.md @@ -0,0 +1,453 @@ +--- +title: EGI connectors | ARGO +page_title: EGI connectors +font_title: 'fa fa-refresh' +description: This document describes the available connectors for data in EGI infrastructure. +--- + +## Description + +`argo-egi-connectors` is a bundle of connectors/sync components for various data sources established in EGI infrastructure, most notably GOCDB (EGI topology, downtimes), but there's also support for fetching alternative EGI topology via various VO feeds, weights information via GStat service and POEM metric profiles. + +Bundle consists of the following connectors: + + - `topology-gocdb-connector.py` + - `topology-vo-connector.py` + - `downtimes-gocdb-connector.py` + - `weights-gstat-connector.py` + - `poem-connector.py` + - `prefilter-egy.py`: component whose role is to filter out the messages coming from the `argo-egi-consumer`. + + +Connectors are syncing data on a daily basis. They are aware of the certain customer, associated jobs and their attributes and are generating and placing files into appropriate job folders. Data is written in a binary avro formated file which is suitable for processing at compute side. Topology, downtimes, weights and POEM profile information all together with a prefiltered metric results (status messages), represents an input for `argo-compute-engine`. + +## Installation + +Installation narrows down to simply installing the package: + + `yum -y install argo-egi-connectors` + +**`Components require avro package to be installed/available.`** + + +| File Types | Destination | +| Configuration files| `/etc/argo-egi-connectors`| +| Components| `/usr/libexec/argo-egi-connectors`| +| Cronjobs (configured to be executed once per day) | `/etc/cron.d` | +| Directory where components will put their files| `/var/lib/argo-connectors/EGI`| + +## Configuration + +Configuration of all components is centered around two configuration files: `global.conf` and `customer.conf`. Those files contains some shared config options and sections and are _read by every component_. There's also a third one `poem-connector.conf`, specific only for `poem-connector.py` because it needs some special treatment not available in first two's. + +| Configuration file | Description | Shortcut | +| `global.conf` | Config file is read by every component because every component needs to fetch host certificate to authenticate to a peer and to find correct avro schema. |Description| +| `customer.conf` | This configuration file lists all EGI jobs, their attributes and also all VOes and theirs set of jobs and attributes. | Description| +| `poem-connector.conf` | This configuration file is central configuration for poem-connector.py | Description| + + + + +### global.conf + +Config file is read by _every_ component because every component needs to, at least, fetch host certificate to authenticate to a peer and to find correct avro schema. Additionally, some connectors have default sources of data specified that can be overidden in a next configuration file. Config options are case insensitive and whole config file is splitted into a few sections: + + [DEFAULT] + SchemaDir = /etc/argo-egi-connectors/schemas/ + +Every component generates output file in an avro binary format. This section points to a directory that holds all avro schemas. + + [Authentication] + HostKey = /etc/grid-security/hostkey.pem + HostCert = /etc/grid-security/hostcert.pem + +Each component that talks to GOCDB or POEM peer authenticates itself with a host certificate. + + [AvroSchemas] + Downtimes = %(SchemaDir)s/downtimes.avsc + Poem = %(SchemaDir)s/metric_profiles.avsc + Prefilter = %(SchemaDir)s/metric_data.avsc + TopologyGroupOfEndpoints = %(SchemaDir)s/group_endpoints.avsc + TopologyGroupOfGroups = %(SchemaDir)s/group_groups.avsc + Weights = %(SchemaDir)s/weight_sites.avsc + +This section, together with a `[DEFAULT]` section, constitutes the full path of avro schema file for each component. Avro schema files define the format of the data that each component is writing. `Topology*` schemas are common to `topology-gocdb-connector.py` and `topology-vo-connector.py` because there is a need of compute side to not make a difference between two topologies. `Prefilter` schema is taken from `argo-egi-consumer` since `prefilter-egi.py` filters its metric results and needs to write them in the same format. + + [Output] + Downtimes = downtimes_%s.avro + Poem = poem_sync_%s.avro + Prefilter = prefilter_%s.avro + PrefilterConsumerFilePath = /var/lib/ar-consumer/ar-consumer_log_%s.avro + PrefilterPoem = poem_sync_%s.out + PrefilterPoemNameMapping = poem_name_mapping.cfg + TopologyGroupOfEndpoints = group_endpoints_%s.avro + TopologyGroupOfGroups = group_groups_%s.avro + Weights = weights_%s.avro + +Section lists all the filenames that each component is generating. Directory is purposely omitted because it's implicitly found in next configuration file. Exception is a `PrefilterConsumerFilePath` and `PrefilterPoem` options that tells the `prefilter-egi.py` where to look for its input files. `%s` is a string placeholder that will be replaced by the date timestamp in format `year_month_day`. + + + +### customer.conf + +This configuration file lists all customers, their jobs and appropriate attributes. Job is presented to `argo-compute-engine` as a folder with a set of files that are generated each day and that directs compute engine what metric results to take into account and do calculations upon them. + +#### Directory structure + +Job folders for each customer are placed under the customer's `OutputDir` directory and appropriate directory names are read from the config file. Segment of configuration file that reflects the creation of directories is for example: + + [CUSTOMER_C1] + OutputDir = /var/lib/argo-connectors/Customer1 + Jobs = JOB_Test1, JOB_Test2 + + [JOB_Test1] + Dirname = C1Testing1 + + [JOB_Test2] + Dirname = C2Testing2 + + + [CUSTOMER_C2] + OutputDir = /var/lib/argo-connectors/Customer2 + Jobs = Job_Test3, JOB_Test4 + + [JOB_Test3] + Dirname = C2Testing1 + + [JOB_Test4] + Dirname = C2Testing2 + +This will result in the following jobs directories: + + /var/lib/argo-connectors/Customer1/C1Testing1 + /var/lib/argo-connectors/Customer1/C1Testing2 + /var/lib/argo-connectors/Customer2/C2Testing1 + /var/lib/argo-connectors/Customer2/C2Testing2 + +So there are two customers, C1 and C2, each one identified with its `[CUSTOMER_*]` section. `CUSTOMER_` is a section keyword and must be specified when one wants to define a new customer. Each customer has two mandatory options: `OutputDir` and `Jobs`. With `OutputDir` option, customer defines his directory where he'll write job folders and other data. Customer must also specify set of jobs listed in `Jobs` options since it can not exist without associated jobs. The name of the job folder is specified with `Dirname` option of the certain job so `JOB\_Test1`, identified with `[JOB_Test1]` section, will be named `C1Testing1` and it will be placed under customer's `/var/lib/argo-connectors/Customer1/` directory. Each component will firstly try to create described directory structure if it doesn't exist yet. Only afterward it will write its data. + +Every connector reads this configuration file because it needs to find out how many customers are there and what are theirs customer and job directory names where they will lay down its files. So `poem-connector.py`, `downtimes-gocdb-connector.py`, `weights-gstat-connector.py`, all of them are writing theirs data in each job directory for each customer. Topology for EGI (fetched from GOCDB) is different than one for the VO so exceptions to this are `topology-gocdb-connector.py` and `topology-vo-connector.py`. They are writing data for a job based on the job's topology type specified with `TopoType` attribute. + +#### Job attributes + +Besides `Dirname` option that is common for all connectors, some of them have job attributes that are relevant only for them and upon which they are changing their behaviour. Some of those attributes are _mandatory_ like `Profiles` and `TopoType` and the other ones like `TopoSelect*` attributes are optional. Furthermore, as there are two kind of topologies, there are also two set of job attributes and values. + +##### GOCDB topology + + [JOB_EGICloudmon] + Dirname = EGI_Cloudmon + Profiles = CLOUD-MON + TopoType = GOCDB + TopoFeed = https://goc.egi.eu/gocdbpi/ + TopoFetchType = ServiceGroups + TopoSelectGroupOfEndpoints = Monitored:Y, Scope:EGI, Production:Y + TopoSelectGroupOfGroups = Monitored:Y, Scope:EGI + +This is an example of the job that fetchs topology from GOCDB since `TopoType` attribute is set to `GOCDB`. `Profiles` is an attribute relevant to `poem-connector.py` so for this job `poem-connector.py` will write CLOUD-MON profile in EGI_Cloudmon job folder under /EGI directory. `Topo*` attributes are relevant for `topology-gocdb-connector.py`. `TopoFeed` attribute in the context of the GOCDB topology is optional. If it's specified, it will override default source of topology which is https://goc.egi.eu/gocdbpi/ + +Topology is separated in two abstracts: + +- group of groups +- group of service endpoints + +Service endpoints are grouped either by the means of _Sites_ or _Service groups_. Those are listed and represented as an upper level abstract of group of service endpoints - group of groups. Customer can fetch either _Sites_ and their corresponding endpoints or _Service groups_ and their corresponding endpoints per job, but not both of them. What is being fetched is specified with `TopoFetchType` option/job attribute. For each abstract there will be written `TopologyGroupOfGroups` and `TopologyGroupOfEndpoints` filenames (specified in `global.conf`) into appropriate job folder. `TopoSelectGroupOfGroups` and `TopoSelectGroupOfEndpoints` options are used for further filtering. Values are set of tags used for picking up matching entity existing in the given abstract. Tags for group of groups are different for Sites and Service groups. In contrary, set of tags for groups of endpoints remains the same no matter what type of fetch customer specified. + +So, in a `TopoFetchType` option customer can either specify: + +- `ServiceGroups` - to fetch Service groups +- `Sites` - to fetch Sites + +###### Tags + +Tags represent a fine-grained control of what is being written in output files. It's a convenient way of selecting only certain entities, being it Sites, Service groups or Service endpoints based on appropriate criteria. Tags are optional so if a certain tag for a corresponding entity is omitted, than filtering is not done. In that case, it can be considered that entity is fetched for all its values of an omitted tag. + +Group of group tags are different for a different type of fetch. Tags and values for a different entities are: + +**Sites** + +* Certification = `{Certified, Uncertified, Closed, Suspended, Candidate}` +* Infrastructure = `{Production, Test}` +* Scope = `{EGI, Local}` + +**ServiceGroups** + +* Monitored = `{Y, N}` +* Scope = `{EGI, Local}` + +Tags for selecting group of endpoints are: + +**Service Endpoints** + +* Production = `{Y, N}` +* Monitored = `{Y, N}` +* Scope = `{EGI, Local}` + +##### VO topology + + [DEFAULT] + BioMed = http://kosjenka.srce.hr/~eimamagi/ops.feed.xml + + [JOB_BioMedCritical] + Dirname = BioMed_Critical + Profiles = ROC_CRITICAL + TopoType = VOFeed + TopoFeed = %(BioMed)s + TopoSelectGroupOfGroups = Type:(OPS_Tier, OPS_Site) + +This is an example of the job that is fetching topology from provided VO feed since `TopoType` attribute is set to `VOFeed`. Again, `Profiles` attribute is mandatory and is relevant to `poem-connector.py` which will write ROC\_CRITICAL profile in BioMed\_Critical job folder. `Topo*` attributes are relevant for `topology-vo-connector.py`. Contrary to GOCDB topology jobs, `TopoFeed` attribute for jobs dealing with the VO topology is _mandatory_ and must be specified. Although same topology feed can be specified across multiple jobs, internally, data from one feed is fetched only once and is further filtered and written for every job. + +VO topology is also separated and written in two abstracts, group of groups and group of service endpoints, but there are no tags since VO itself filters and handles what sites and service endpoints to take into account and defines the VO groups they belong to. With that being said, there is a `TopoSelectGroupOfGroups` option available which is used to pick up VO groups based on their type. VO groups are entities existing in the group of group abstract of topology. In the example above, `topology-vo-connector.py` will pick up VO groups that match `OPS_Site` and `OPS_Tier` types and write them into `TopologyGroupOfGroups` file. Endpoints are written in `TopologyGroupOfEndpoints` file. + +##### Data feeds + +Source of the data for other connectors like `weights-gstat-connector.py` and `downtimes-gocdb-connector.py` are optional and can be specified per job. If specified, they will override their default source of data. Example: + + [JOB_BioMedCritical] + Dirname = BioMed_Critical + Profiles = ROC_CRITICAL + TopoType = VOFeed + TopoFeed = %(BioMed)s + TopoSelectGroupOfGroups = Type:(OPS_Tier, OPS_Site) + WeightsFeed = http://gstat2.grid.sinica.edu.tw/gstat/summary/json/ + DowntimesFeed = https://goc.egi.eu/gocdbpi/ + +`WeightsFeed` and `DowntimesFeed` are alternative data feeds for this job for connectors `weights-gstat-connector.py` and `downtimes-gocdb-connector.py`, respectively. + + + +### poem-connector.conf + +This configuration file is central configuration for `poem-connector.py` whose role is: + +- fetch all defined POEM profiles from each POEM server specified +- prepare and layout data needed for `prefilter-egi.py` + +#### POEM profiles fetch + +Config file is splitted into a few sections: + + [PoemServer] + Host = snf-624922.vm.okeanos.grnet.gr + VO = ops + +This section defines the URL where POEM server is located and all VOes for which POEM profiles will be fetched. Multiple POEM servers can be specified by defining multiple POEM server sections: + + [PoemServer1] + Host = poem1 + VO = vo1, vo2 + + [PoemServer2] + Host = poem2 + VO = vo3, vo4 + +Same POEM profile can be defined on multiple POEM servers. Each POEM server can further extend it with a custom combinations of metrics and service flavours. To distinguish POEM profile defined on multiple POEM servers, namespace must be used. One must be aware of the namespace that POEM server exposes and specify it in `FetchProfiles` section: + + [FetchProfiles] + List = CH.CERN.SAM.ROC, CH.CERN.SAM.ROC_OPERATORS, CH.CERN.SAM.ROC_CRITICAL, CH.CERN.SAM.OPS_MONITOR, CH.CERN.SAM.OPS_MONITOR_CRITICAL, CH.CERN.SAM.GLEXEC, CH.CERN.SAM.CLOUD-MON + +#### Prefilter data + +`poem-connector.py` also generates plaintext `PrefilterPoem` file (specified in `global.conf`) on a daily basis for each customer and places it under customer's directory. Content of the file is controlled in `[PrefilterData]` section: + + [PrefilterData] + AllowedNGI = http://mon.egi.eu/nagios-roles.conf + AllowedNGIProfiles = ch.cern.sam.ROC, ch.cern.sam.ROC_OPERATORS, ch.cern.sam.ROC_CRITICAL, ch.cern.sam.GLEXEC + AllNGI1 = opsmon.egi.eu + AllNGIProfiles1 = ch.cern.sam.OPS_MONITOR, ch.cern.sam.OPS_MONITOR_CRITICAL + AllNGI2 = cloudmon.egi.eu + AllNGIProfiles2 = ch.cern.sam.CLOUD-MON + +`AllowedNGI` option defines remote config file that states all allowed NGIes and corresponding nagios boxes. All of them will be expanded and listed together with the information from `AllowedNGIProfiles` POEM profiles (metrics, service flavours, VOes). + +`AllNGI1` option is similar in sense that it will extended specified nagios box (monitoring instance) with the information from `AllNGIProfiles1` POEM profiles. Multiple `AllNGI*` options can be specified and they must come in pair fashion so for every `AllNGI[n]` option, there must exist `AllNGIProfiles[n]` option that is related to it. + +With all these informations written in `PrefilterPoem` file, `prefilter-egi.py` can do its work, so it will filter consumer messages if: + +- message that enter into broker network doesn't come from allowed NGI or nagios box for certain NGI is incorrectly specified +- metric result is response to metric not found in a fetched service flavour +- metric result's service flavour is not registered in any fetched POEM profile +- metric result is registered for different VO, not specified in `VO` option of any `[PoemServer]` section + +## Examples + +
+ + + + + +
+
+

 

+customer.conf: +

 

+
+	[DEFAULT]
+	BioMed = http://kosjenka.srce.hr/~eimamagi/ops.feed.xml
+
+	[DIR]
+	OutputDir = /var/lib/argo-connectors/EGI/
+
+	[CUSTOMER_EGI]
+	Jobs = JOB_EGICritical, JOB_EGICloudmon, JOB_BioMedCloudmon, JOB_BioMedCritical
+
+	[JOB_EGICritical]
+	Dirname = EGI_Critical
+	Profiles = ROC_CRITICAL
+	TopoType = GOCDB
+	TopoFetchType = Sites
+	#TopoSelectGroupOfEndpoints = Production:Y, Monitored:Y, Scope:EGI
+	TopoSelectGroupOfGroups = Certification:Uncertified, Infrastructure:Test, Scope:EGI
+
+	[JOB_EGICloudmon]
+	Dirname = EGI_Cloudmon
+	Profiles = CLOUD-MON
+	TopoType = GOCDB
+	TopoFetchType = ServiceGroups
+	TopoSelectGroupOfEndpoints = Monitored:Y, Scope:EGI, Production:N
+	#TopoSelectGroupOfGroups = Monitored:Y, Scope:EGI
+
+	[JOB_BioMedCritical]
+	Dirname = BioMed_Critical
+	Profiles = ROC_CRITICAL
+	TopoType = VOFeed
+	TopoFeed = %(BioMed)s
+	TopoSelectGroupOfGroups = Type:OPS_Site
+
+	[JOB_BioMedCloudmon]
+	Dirname = BioMed_Cloudmon
+	Profiles = CLOUD-MON
+	TopoType = VOFeed
+	TopoFeed = %(BioMed)s
+	#TopoSelectGroupOfGroups = Type:OPS_Tier
+
+ +
+
+ +

 

+Customer jobs: +

 

+
+	/var/lib/argo-connectors/EGI/BioMed_Cloudmon/group_endpoints_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Cloudmon/group_groups_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Cloudmon/poem_sync_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Cloudmon/weights_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Critical/group_endpoints_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Critical/group_groups_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Critical/poem_sync_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Critical/weights_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Cloudmon/group_endpoints_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Cloudmon/group_groups_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Cloudmon/poem_sync_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Cloudmon/weights_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Critical/group_endpoints_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Critical/group_groups_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Critical/poem_sync_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Critical/weights_2015_04_07.avro
+
+ +
+
+ +

 

+ +Prefilter data: + +

 

+
+
+	/var/lib/argo-connectors/EGI/poem_sync_2015_04_07.out
+	/var/lib/argo-connectors/EGI/prefilter_2015_04_07.avro
+
+
+
+
+ +

 

+For customer's job JOB_EGICritical, we are selecting only those sites that match `Certification:Uncertified`, `Infrastructure:Test` and `Scope:EGI`, so in `TopologyGroupOfGroups` file there will be only those sites listed: +

 

+
+	 % avro cat /var/lib/argo-connectors/EGI/EGI_Critical/group_groups_2015_04_07.avro | tail -n 1
+	 {"group": "Russia", "tags": {"scope": "EGI", "infrastructure": "Test", "certification": "Uncertified"}, "type": "NGI", "subgroup": "SU-Protvino-IHEP"}
+
+

 

+
+
+

 

+ For customer's JOB_EGICloudmon, we are selecting only those service endpoints that match `Monitored:Y`, `Scope:EGI`, `Production:N`: + +

 

+ +
	 % avro cat /var/lib/argo-connectors/EGI/EGI_Cloudmon/group_endpoints_2015_04_07.avro
+	 {"group": "ROC_RU_SERVICE", "hostname": "ce.ngc6475.ihep.su", "type": "SERVICEGROUPS", "service": "Top-BDII", "tags": {"scope": "EGI", "production": 0, "monitored": 1}}
+
+ +

 

+
+
+

 

+JOB_BioMedCritical is taking into account only OPS\_Site VO groups: +

 

+
+	% avro cat /var/lib/argo-connectors/EGI/BioMed_Critical/group_groups_2015_04_07.avro | tail -n 5
+	{"group": "SAMPA", "tags": null, "type": "OPS_Site", "subgroup": "SAMPA"}
+	{"group": "UPJS-Kosice", "tags": null, "type": "OPS_Site", "subgroup": "UPJS-Kosice"}
+	{"group": "GR-06-IASA", "tags": null, "type": "OPS_Site", "subgroup": "GR-06-IASA"}
+	{"group": "FI_HIP_T2", "tags": null, "type": "OPS_Site", "subgroup": "FI_HIP_T2"}
+	{"group": "UKI-SOUTHGRID-RALPP", "tags": null, "type": "OPS_Site", "subgroup": "UKI-SOUTHGRID-RALPP"}
+
+
+

 

+
+
+

 

+JOB_BioMedCloudmon requires only CLOUD-MON POEM profile so in `Poem` file you have: +

 

+
+	 % avro cat  /var/lib/argo-connectors/EGI/BioMed_Cloudmon/poem_sync_2015_04_07.avro | tail -n 5
+	 {"profile": "ch.cern.sam.CLOUD-MON", "metric": "eu.egi.cloud.Perun-Check", "service": "egi.Perun", "tags": {"fqan": "", "vo": "ops"}}
+	 {"profile": "ch.cern.sam.CLOUD-MON", "metric": "eu.egi.cloud.APEL-Pub", "service": "eu.egi.cloud.accounting", "tags": {"fqan": "", "vo": "ops"}}
+	 {"profile": "ch.cern.sam.CLOUD-MON", "metric": "org.nagios.Broker-TCP", "service": "eu.egi.cloud.broker.compss", "tags": {"fqan": "", "vo": "ops"}}
+	 {"profile": "ch.cern.sam.CLOUD-MON", "metric": "org.nagios.Broker-TCP", "service": "eu.egi.cloud.broker.proprietary.slipstream", "tags": {"fqan": "", "vo": "ops"}}
+	 {"profile": "ch.cern.sam.CLOUD-MON", "metric": "org.nagios.Broker-TCP", "service": "eu.egi.cloud.broker.vmdirac", "tags": {"fqan": "", "vo": "ops"}}
+
+

 

+
+ +
+

 

+Downtimes: +

 

+
+	% /usr/libexec/argo-egi-connectors/downtimes-gocdb-connector.py -d 2015-04-07
+	% find /var/lib/argo-connectors -name '*downtimes*'
+	/var/lib/argo-connectors/EGI/EGI_Cloudmon/downtimes_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/EGI_Critical/downtimes_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Cloudmon/downtimes_2015_04_07.avro
+	/var/lib/argo-connectors/EGI/BioMed_Critical/downtimes_2015_04_07.avro
+
+
+
+ +
+ +## Links + +Connectors are using following GOCDB PI methods: + +- [GOCDB - get_downtime_method](https://wiki.egi.eu/wiki/GOCDB/PI/get_downtime_method) +- [GOCDB - get_service_endpoint_method](https://wiki.egi.eu/wiki/GOCDB/PI/get_service_endpoint_method) +- [GOCDB - get_service_group](https://wiki.egi.eu/wiki/GOCDB/PI/get_service_group) +- [GOCDB - get_site_method](https://wiki.egi.eu/wiki/GOCDB/PI/get_site_method) + +[Construction of VO feeds](https://twiki.cern.ch/twiki/bin/view/Main/ATPVOFeeds) From 938dabfbca0ea8968b101d68bec7b35c546ee7b8 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 2 Oct 2015 10:52:32 +0200 Subject: [PATCH 11/21] updated doc with server's cert validate options --- doc/sync.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/doc/sync.md b/doc/sync.md index 06e9c73f..22b0f89b 100644 --- a/doc/sync.md +++ b/doc/sync.md @@ -58,10 +58,12 @@ Config file is read by _every_ component because every component needs to, at le Every component generates output file in an avro binary format. This section points to a directory that holds all avro schemas. [Authentication] + VerifyServerCert = False + CAPAth = /etc/grid-security/certificates HostKey = /etc/grid-security/hostkey.pem HostCert = /etc/grid-security/hostcert.pem -Each component that talks to GOCDB or POEM peer authenticates itself with a host certificate. +Each component that talks to GOCDB or POEM peer authenticates itself with a host certificate. `HostKey` indicates the private and `HostCert` indicates the public part of certificate. Additionally, server certificate can be validated rounding up the mutual authentication. `CAPath` contains certificates of authorities from which chain will be tried to be built upon validating. [AvroSchemas] Downtimes = %(SchemaDir)s/downtimes.avsc @@ -314,7 +316,7 @@ With all these informations written in `PrefilterPoem` file, `prefilter-egi.py` TopoType = GOCDB TopoFetchType = ServiceGroups TopoSelectGroupOfEndpoints = Monitored:Y, Scope:EGI, Production:N - #TopoSelectGroupOfGroups = Monitored:Y, Scope:EGI + #TopoSelectGroupOfGroups = Monitored:Y, Scope:EGI, Certification:(Certified,Candidate) [JOB_BioMedCritical] Dirname = BioMed_Critical @@ -450,4 +452,7 @@ Connectors are using following GOCDB PI methods: - [GOCDB - get_service_group](https://wiki.egi.eu/wiki/GOCDB/PI/get_service_group) - [GOCDB - get_site_method](https://wiki.egi.eu/wiki/GOCDB/PI/get_site_method) +`poem-connector.py` is using POEM PI method: +- [POEM - metrics_in_profiles](http://argoeu.github.io/guides/poem/) + [Construction of VO feeds](https://twiki.cern.ch/twiki/bin/view/Main/ATPVOFeeds) From 948dddcce23c501e0e80849a7e36395d3fb98f8a Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 2 Oct 2015 11:01:08 +0200 Subject: [PATCH 12/21] updated spec --- argo-egi-connectors.spec | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index e55bbd54..6f9c7c4a 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -1,6 +1,6 @@ Name: argo-egi-connectors -Version: 1.4.3 -Release: 3%{?dist} +Version: 1.4.4 +Release: 1%{?dist} Group: EGI/SA4 License: ASL 2.0 @@ -46,6 +46,21 @@ rm -rf $RPM_BUILD_ROOT %attr(0750,root,root) %dir %{_localstatedir}/log/argo-egi-connectors/ %changelog +* Fri 02-10-2015 Daniel Vrcic - 1.4.4-1%{?dist} +- filter SRM endpoints too +- refactored use of logging +- connectors can verify server certificate + https://github.com/ARGOeu/ARGO/issues/153 +- report correct number of fetched endpoints even if SRM endpoints were being filtered +- connectors handle help argument and describe basic info and usage + https://github.com/ARGOeu/ARGO/issues/169 +- removed hardcoded scopes and grab them dynamically from config + https://github.com/ARGOeu/ARGO/issues/168 +- report config parser errors via logger +- downtimes connector complain if wrong date specified +- remove notion of default scope +- doc moved to repo +- updated doc with server's cert validate options * Wed Aug 19 2015 Daniel Vrcic - 1.4.3-3%{?dist} - fix exception in case of returned HTTP 500 for other connectors * Sat Aug 15 2015 Daniel Vrcic - 1.4.3-2%{?dist} From b336d99fd7b851d26d4c3675d90ef6d466ea1fd0 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 6 Oct 2015 10:00:27 +0200 Subject: [PATCH 13/21] changelog date format corrected --- argo-egi-connectors.spec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index 6f9c7c4a..d164392b 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -46,7 +46,7 @@ rm -rf $RPM_BUILD_ROOT %attr(0750,root,root) %dir %{_localstatedir}/log/argo-egi-connectors/ %changelog -* Fri 02-10-2015 Daniel Vrcic - 1.4.4-1%{?dist} +* Fri Oct 2 2015 Daniel Vrcic - 1.4.4-1%{?dist} - filter SRM endpoints too - refactored use of logging - connectors can verify server certificate From 45e1abfe70dd12621079205d7cd41ead6347e57a Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 6 Oct 2015 12:03:49 +0200 Subject: [PATCH 14/21] fix initialization of loggers in config parsers --- modules/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/config.py b/modules/config.py index 5b3a0067..2373c458 100644 --- a/modules/config.py +++ b/modules/config.py @@ -4,7 +4,7 @@ class Global: def __init__(self, *args, **kwargs): - self.logger = Logger(self.__class__) + self.logger = Logger(str(self.__class__)) self._args = args self._filename = '/etc/argo-egi-connectors/global.conf' self._checkpath = kwargs['checkpath'] if 'checkpath' in kwargs.keys() else False @@ -43,7 +43,7 @@ class PoemConf: options = {} def __init__(self, *args): - self.logger = Logger(self.__class__) + self.logger = Logger(str(self.__class__)) self._args = args self._filename = '/etc/argo-egi-connectors/poem-connector.conf' @@ -141,7 +141,7 @@ class CustomerConf: tenantdir = '' def __init__(self, caller=None, **kwargs): - self.logger = Logger(self.__class__) + self.logger = Logger(str(self.__class__)) self._filename = '/etc/argo-egi-connectors/customer.conf' if not kwargs: self._jobattrs = self._defjobattrs[os.path.basename(caller)] From 4d815c80cbd320f4b1530381cb7d997a54c81c7c Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 6 Oct 2015 12:27:47 +0200 Subject: [PATCH 15/21] backward compatible exception messages --- argo-egi-connectors.spec | 5 ++++- modules/config.py | 8 ++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index d164392b..dea9a3b3 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -1,6 +1,6 @@ Name: argo-egi-connectors Version: 1.4.4 -Release: 1%{?dist} +Release: 2%{?dist} Group: EGI/SA4 License: ASL 2.0 @@ -46,6 +46,9 @@ rm -rf $RPM_BUILD_ROOT %attr(0750,root,root) %dir %{_localstatedir}/log/argo-egi-connectors/ %changelog +* Fri Oct 6 2015 Daniel Vrcic - 1.4.4-2%{?dist} +- fix initialization of loggers in config parsers +- backward compatible exception messages * Fri Oct 2 2015 Daniel Vrcic - 1.4.4-1%{?dist} - filter SRM endpoints too - refactored use of logging diff --git a/modules/config.py b/modules/config.py index 2373c458..e0e6a4b3 100644 --- a/modules/config.py +++ b/modules/config.py @@ -28,7 +28,7 @@ def parse(self): raise OSError(errno.ENOENT, optget) options.update({(sect+opt).lower(): optget}) except ConfigParser.NoOptionError as e: - self.logger.error("No option '%s' in section: '%s'" % (e.args[0], e.args[1])) + self.logger.error(e.message) raise SystemExit(1) except ConfigParser.NoSectionError as e: self.logger.error("No section '%s' defined" % (e.args[0])) @@ -67,7 +67,7 @@ def parse(self): self.options.update({(section+o).lower(): optget}) except ConfigParser.NoOptionError as e: - self.logger.error("No option '%s' in section: '%s'" % (e.args[0], e.args[1])) + self.logger.error(e.message) raise SystemExit(1) except ConfigParser.NoSectionError as e: self.logger.error("No section '%s' defined" % (e.args[0])) @@ -165,7 +165,7 @@ def parse(self): custjobs = [job.strip() for job in custjobs] custdir = config.get(section, 'OutputDir') except ConfigParser.NoOptionError as e: - self.logger.error("No option '%s' in section: '%s'" % (e.args[0], e.args[1])) + self.logger.error(e.message) raise SystemExit(1) self._cust.update({section: {'Jobs': custjobs, 'OutputDir': custdir}}) @@ -182,7 +182,7 @@ def parse(self): profiles = config.get(job, 'Profiles') dirname = config.get(job, 'Dirname') except ConfigParser.NoOptionError as e: - self.logger.error("No option '%s' in section: '%s'" % (e.args[0], e.args[1])) + self.logger.error(e.message) raise SystemExit(1) self._jobs.update({job: {'Profiles': profiles, 'Dirname': dirname}}) From e17431e7ceedbf072517ab9c420d3fbe731f3ec1 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 7 Oct 2015 00:10:46 +0200 Subject: [PATCH 16/21] scopes per feed --- argo-egi-connectors.spec | 4 +++- bin/topology-gocdb-connector.py | 2 +- modules/config.py | 24 +++++++++++------------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index dea9a3b3..56661119 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -46,7 +46,9 @@ rm -rf $RPM_BUILD_ROOT %attr(0750,root,root) %dir %{_localstatedir}/log/argo-egi-connectors/ %changelog -* Fri Oct 6 2015 Daniel Vrcic - 1.4.4-2%{?dist} +* Wed Oct 7 2015 Daniel Vrcic - 1.4.4-3%{?dist} +- grab all distinct scopes for feed +* Tue Oct 6 2015 Daniel Vrcic - 1.4.4-2%{?dist} - fix initialization of loggers in config parsers - backward compatible exception messages * Fri Oct 2 2015 Daniel Vrcic - 1.4.4-1%{?dist} diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index 89edb1e6..9548b41d 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -296,12 +296,12 @@ def main(): confcust = CustomerConf(sys.argv[0]) confcust.parse() confcust.make_dirstruct() - scopes = confcust.get_allspec_scopes(sys.argv[0], 'GOCDB') feeds = confcust.get_mapfeedjobs(sys.argv[0], 'GOCDB', deffeed='https://goc.egi.eu/gocdbpi/') timestamp = datetime.datetime.utcnow().strftime('%Y_%m_%d') for feed, jobcust in feeds.items(): + scopes = confcust.get_feedscopes(feed, jobcust) gocdb = GOCDBReader(feed, scopes) for job, cust in jobcust: diff --git a/modules/config.py b/modules/config.py index e0e6a4b3..40957914 100644 --- a/modules/config.py +++ b/modules/config.py @@ -304,20 +304,18 @@ def _update_feeds(self, feeds, feedurl, job, cust): feeds[feedurl] = [] feeds[feedurl].append((job, cust)) - def get_allspec_scopes(self, caller, name=None): - distinct_scopes = set() + def get_feedscopes(self, feed, jobcust): ggtags, getags = [], [] - for c in self.get_customers(): - for job in self.get_jobs(c): - if self._get_toponame(job) == name: - gg = self._get_tags(job, 'TopoSelectGroupOfGroups') - ge = self._get_tags(job, 'TopoSelectGroupOfEndpoints') - for g in gg.items() + ge.items(): - if 'Scope'.lower() == g[0].lower(): - if isinstance(g[1], list): - distinct_scopes.update(g[1]) - else: - distinct_scopes.update([g[1]]) + distinct_scopes = set() + for job, cust in jobcust: + gg = self._get_tags(job, 'TopoSelectGroupOfGroups') + ge = self._get_tags(job, 'TopoSelectGroupOfEndpoints') + for g in gg.items() + ge.items(): + if 'Scope'.lower() == g[0].lower(): + if isinstance(g[1], list): + distinct_scopes.update(g[1]) + else: + distinct_scopes.update([g[1]]) return distinct_scopes From 624468dfdab141e7bb75001d28c05f8cbb238bbe Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 7 Oct 2015 09:12:47 +0200 Subject: [PATCH 17/21] spec release updated --- argo-egi-connectors.spec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index 56661119..7e010c36 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -1,6 +1,6 @@ Name: argo-egi-connectors Version: 1.4.4 -Release: 2%{?dist} +Release: 3%{?dist} Group: EGI/SA4 License: ASL 2.0 From 5f943601174930ddffc3ee728c6e4768937075dd Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 7 Oct 2015 15:55:44 +0200 Subject: [PATCH 18/21] poem-connector urlparse bugfix --- argo-egi-connectors.spec | 4 +++- bin/poem-connector.py | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index 7e010c36..a48ba400 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -1,6 +1,6 @@ Name: argo-egi-connectors Version: 1.4.4 -Release: 3%{?dist} +Release: 4%{?dist} Group: EGI/SA4 License: ASL 2.0 @@ -46,6 +46,8 @@ rm -rf $RPM_BUILD_ROOT %attr(0750,root,root) %dir %{_localstatedir}/log/argo-egi-connectors/ %changelog +* Wed Oct 7 2015 Daniel Vrcic - 1.4.4-4%{?dist} +- poem-connector urlparse bugfix * Wed Oct 7 2015 Daniel Vrcic - 1.4.4-3%{?dist} - grab all distinct scopes for feed * Tue Oct 6 2015 Daniel Vrcic - 1.4.4-2%{?dist} diff --git a/bin/poem-connector.py b/bin/poem-connector.py index d16ffee5..a189e5ef 100755 --- a/bin/poem-connector.py +++ b/bin/poem-connector.py @@ -136,11 +136,15 @@ def loadProfilesFromServer(self, server, vo, filterProfiles): if len(filterProfiles) > 0: doFilterProfiles = True + if not server.startswith('http'): + server = 'https://' + server + url = self.poemRequest % (server, vo) o = urlparse.urlparse(url, allow_fragments=True) - logger.info('Server:%s VO:%s' % (o.netloc, vo)) try: + assert o.scheme != '' and o.netloc != '' and o.path != '' + logger.info('Server:%s VO:%s' % (o.netloc, vo)) if eval(globopts['AuthenticationVerifyServerCert'.lower()]): verify_cert(o.netloc, globopts['AuthenticationCAPath'.lower()], 180) conn = httplib.HTTPSConnection(o.netloc, 443, @@ -163,6 +167,9 @@ def loadProfilesFromServer(self, server, vo, filterProfiles): except(SSLError, socket.error, socket.timeout, httplib.HTTPException) as e: logger.error('Connection error %s - %s' % (server, errmsg_from_excp(e))) raise SystemExit(1) + except AssertionError: + logger.error('Invalid POEM PI URL: %s' % (url)) + raise SystemExit(1) return validProfiles From 0e6eb74bff1689bb4a15f535e60381379a01a0a1 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 8 Oct 2015 18:44:34 +0200 Subject: [PATCH 19/21] bugfix in case of no downtimes defined for given date --- argo-egi-connectors.spec | 5 +++- bin/downtimes-gocdb-connector.py | 44 +++++++++++++++++--------------- modules/config.py | 2 +- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index a48ba400..70dbabd0 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -1,6 +1,6 @@ Name: argo-egi-connectors Version: 1.4.4 -Release: 4%{?dist} +Release: 5%{?dist} Group: EGI/SA4 License: ASL 2.0 @@ -46,6 +46,9 @@ rm -rf $RPM_BUILD_ROOT %attr(0750,root,root) %dir %{_localstatedir}/log/argo-egi-connectors/ %changelog +* Thu Oct 8 2015 Daniel Vrcic - 1.4.4-5%{?dist} +- bugfix in case of no downtimes defined for given date + https://github.com/ARGOeu/ARGO/issues/170 * Wed Oct 7 2015 Daniel Vrcic - 1.4.4-4%{?dist} - poem-connector urlparse bugfix * Wed Oct 7 2015 Daniel Vrcic - 1.4.4-3%{?dist} diff --git a/bin/downtimes-gocdb-connector.py b/bin/downtimes-gocdb-connector.py index 32b5a3a2..eded11cb 100755 --- a/bin/downtimes-gocdb-connector.py +++ b/bin/downtimes-gocdb-connector.py @@ -30,6 +30,7 @@ import os import sys import xml.dom.minidom +from xml.parsers.expat import ExpatError import copy import socket from urlparse import urlparse @@ -48,7 +49,9 @@ class GOCDBReader(object): def __init__(self, feed): - self.gocdbHost = urlparse(feed).netloc + self._o = urlparse(feed) + self._parsed = True + self.gocdbHost = self._o.netloc self.hostKey = globopts['AuthenticationHostKey'.lower()] self.hostCert = globopts['AuthenticationHostCert'.lower()] self.argDateFormat = "%Y-%m-%d" @@ -65,7 +68,6 @@ def getDowntimes(self, start, end): if res.status == 200: doc = xml.dom.minidom.parseString(res.read()) downtimes = doc.getElementsByTagName('DOWNTIME') - assert len(downtimes) > 0 for downtime in downtimes: classification = downtime.getAttributeNode('CLASSIFICATION').nodeValue hostname = downtime.getElementsByTagName('HOSTNAME')[0].childNodes[0].data @@ -92,14 +94,15 @@ def getDowntimes(self, start, end): else: logger.error('GOCDBReader.getDowntimes(): HTTP response: %s %s' % (str(res.status), res.reason)) raise SystemExit(1) - except AssertionError: - logger.error("GOCDBReader.getDowntimes():", "Error parsing feed") - raise SystemExit(1) + except ExpatError as e: + logger.error("GOCDBReader.getDowntimes(): Error parsing feed %s - %s" % + (self._o.scheme + '://' + self._o.netloc + '/gocdbpi/private/?method=get_downtime', e.message)) + self._parsed = False except(SSLError, socket.error, socket.timeout) as e: logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e))) raise SystemExit(1) - return filteredDowntimes + return filteredDowntimes, self._parsed def main(): global logger @@ -137,21 +140,22 @@ def main(): for feed, jobcust in feeds.items(): gocdb = GOCDBReader(feed) - dts = gocdb.getDowntimes(start, end) + dts, parsed = gocdb.getDowntimes(start, end) dtslegmap = [] - for dt in dts: - if dt['service'] in LegMapServType.keys(): - dtslegmap.append(copy.copy(dt)) - dtslegmap[-1]['service'] = LegMapServType[dt['service']] - for job, cust in jobcust: - jobdir = confcust.get_fulldir(cust, job) - - filename = jobdir + globopts['OutputDowntimes'.lower()] % timestamp - avro = AvroWriter(globopts['AvroSchemasDowntimes'.lower()], filename, - dts + dtslegmap, os.path.basename(sys.argv[0])) - avro.write() - - logger.info('Fetched Date:%s Endpoints:%d' % (args.date[0], len(dts + dtslegmap))) + if parsed: + for dt in dts: + if dt['service'] in LegMapServType.keys(): + dtslegmap.append(copy.copy(dt)) + dtslegmap[-1]['service'] = LegMapServType[dt['service']] + for job, cust in jobcust: + jobdir = confcust.get_fulldir(cust, job) + + filename = jobdir + globopts['OutputDowntimes'.lower()] % timestamp + avro = AvroWriter(globopts['AvroSchemasDowntimes'.lower()], filename, + dts + dtslegmap, os.path.basename(sys.argv[0])) + avro.write() + + logger.info('Jobs:%d Feed:%s Fetched Date:%s Endpoints:%d' % (len(jobcust), feed, args.date[0], len(dts + dtslegmap))) main() diff --git a/modules/config.py b/modules/config.py index 40957914..a9191fc6 100644 --- a/modules/config.py +++ b/modules/config.py @@ -237,7 +237,7 @@ def make_dirstruct(self): os.makedirs(d) except OSError as e: if e.args[0] != errno.EEXIST: - self.logger.error('%s %s %s' % os.strerror(e.args[0]), e.args[1], d) + self.logger.error('%s %s %s' % (os.strerror(e.args[0]), e.args[1], d)) raise SystemExit(1) def get_jobs(self, cust): From b3454b623c6efa7c63b7d9c81b1fdfe6f314b575 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 14 Oct 2015 13:38:45 +0200 Subject: [PATCH 20/21] bugfix handling lowercase defined POEM profiles --- bin/poem-connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/poem-connector.py b/bin/poem-connector.py index a189e5ef..6c3a8998 100755 --- a/bin/poem-connector.py +++ b/bin/poem-connector.py @@ -156,8 +156,8 @@ def loadProfilesFromServer(self, server, vo, filterProfiles): if res.status == 200: json_data = json.loads(res.read()) for profile in json_data[0]['profiles']: - if not doFilterProfiles or (profile['namespace']+'.'+profile['name']).upper() in filterProfiles: - validProfiles[(profile['namespace']+'.'+profile['name']).upper()] = profile + if not doFilterProfiles or profile['namespace'].upper()+'.'+profile['name'] in filterProfiles: + validProfiles[profile['namespace'].upper()+'.'+profile['name']] = profile elif res.status in (301, 302): logger.warning('Redirect: ' + urlparse.urljoin(url, res.getheader('location', ''))) From 38ff19f29cc390d4970fba945a84c3c839423619 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 15 Oct 2015 12:32:18 +0200 Subject: [PATCH 21/21] remove hardcoded customer name --- argo-egi-connectors.spec | 7 ++++++- bin/downtimes-gocdb-connector.py | 3 ++- bin/poem-connector.py | 5 ++++- bin/topology-gocdb-connector.py | 9 ++++++--- bin/topology-vo-connector.py | 6 ++++-- bin/weights-gstat-connector.py | 3 ++- doc/sync.md | 8 +++++--- etc/customer.conf | 1 + modules/config.py | 6 +++++- 9 files changed, 35 insertions(+), 13 deletions(-) diff --git a/argo-egi-connectors.spec b/argo-egi-connectors.spec index 70dbabd0..ffad12ff 100644 --- a/argo-egi-connectors.spec +++ b/argo-egi-connectors.spec @@ -1,6 +1,6 @@ Name: argo-egi-connectors Version: 1.4.4 -Release: 5%{?dist} +Release: 6%{?dist} Group: EGI/SA4 License: ASL 2.0 @@ -46,6 +46,11 @@ rm -rf $RPM_BUILD_ROOT %attr(0750,root,root) %dir %{_localstatedir}/log/argo-egi-connectors/ %changelog +* Thu Oct 15 2015 Daniel Vrcic - 1.4.4-6%{?dist} +- bugfix handling lowercase defined POEM profiles +- remove hardcoded customer name for topology-gocdb-connector + https://github.com/ARGOeu/ARGO/issues/173 +- guide updated with new configuration option for customer * Thu Oct 8 2015 Daniel Vrcic - 1.4.4-5%{?dist} - bugfix in case of no downtimes defined for given date https://github.com/ARGOeu/ARGO/issues/170 diff --git a/bin/downtimes-gocdb-connector.py b/bin/downtimes-gocdb-connector.py index eded11cb..bfc9f970 100755 --- a/bin/downtimes-gocdb-connector.py +++ b/bin/downtimes-gocdb-connector.py @@ -150,12 +150,13 @@ def main(): dtslegmap[-1]['service'] = LegMapServType[dt['service']] for job, cust in jobcust: jobdir = confcust.get_fulldir(cust, job) + custname = confcust.get_custname(cust) filename = jobdir + globopts['OutputDowntimes'.lower()] % timestamp avro = AvroWriter(globopts['AvroSchemasDowntimes'.lower()], filename, dts + dtslegmap, os.path.basename(sys.argv[0])) avro.write() - logger.info('Jobs:%d Feed:%s Fetched Date:%s Endpoints:%d' % (len(jobcust), feed, args.date[0], len(dts + dtslegmap))) + logger.info('Customer:%s Jobs:%d Fetched Date:%s Endpoints:%d' % (custname, len(jobcust), args.date[0], len(dts + dtslegmap))) main() diff --git a/bin/poem-connector.py b/bin/poem-connector.py index 6c3a8998..183bf165 100755 --- a/bin/poem-connector.py +++ b/bin/poem-connector.py @@ -43,6 +43,7 @@ logger = None globopts, poemopts = {}, {} cpoem = None +custname = '' class PoemReader: def __init__(self): @@ -255,6 +256,8 @@ def main(): poempref = PrefilterPoem(confcust.get_custdir(cust)) poempref.writeProfiles(ps, timestamp) + custname = confcust.get_custname(cust) + for job in confcust.get_jobs(cust): jobdir = confcust.get_fulldir(cust, job) @@ -266,6 +269,6 @@ def main(): lfprofiles, os.path.basename(sys.argv[0])) avro.write() - logger.info('Job:'+job+' Profiles:%s Tuples:%d' % (','.join(profiles), len(lfprofiles))) + logger.info('Customer:'+custname+' Job:'+job+' Profiles:%s Tuples:%d' % (','.join(profiles), len(lfprofiles))) main() diff --git a/bin/topology-gocdb-connector.py b/bin/topology-gocdb-connector.py index 9548b41d..7861ff70 100755 --- a/bin/topology-gocdb-connector.py +++ b/bin/topology-gocdb-connector.py @@ -46,6 +46,7 @@ fetchtype = '' globopts = {} +custname = '' logger = None class GOCDBReader: @@ -99,7 +100,7 @@ def getGroupOfGroups(self): for d in gl: g = dict() g['type'] = 'PROJECT' - g['group'] = 'EGI' + g['group'] = custname g['subgroup'] = d['name'] g['tags'] = {'monitored' : 1 if d['monitored'] == 'Y' else 0, 'scope' : d['scope']} @@ -308,6 +309,8 @@ def main(): jobdir = confcust.get_fulldir(cust, job) global fetchtype fetchtype = confcust.get_gocdb_fetchtype(job) + global custname + custname = confcust.get_custname(cust) if fetchtype == 'ServiceGroups': group_endpoints = gocdb.getGroupOfServices() @@ -341,9 +344,9 @@ def main(): group_endpoints + gelegmap, os.path.basename(sys.argv[0])) avro.write() - logger.info('Job:'+job+' Fetched Endpoints:%d' % (numge + numgeleg) +' Groups(%s):%d' % (fetchtype, numgg)) + logger.info('Customer:'+custname+' Job:'+job+' Fetched Endpoints:%d' % (numge + numgeleg) +' Groups(%s):%d' % (fetchtype, numgg)) if getags or ggtags: - selstr = 'Job:%s Selected ' % (job) + selstr = 'Customer:%s Job:%s Selected ' % (custname, job) selge, selgg = '', '' if getags: for key, value in getags.items(): diff --git a/bin/topology-vo-connector.py b/bin/topology-vo-connector.py index 6c341a88..7acbf5ea 100755 --- a/bin/topology-vo-connector.py +++ b/bin/topology-vo-connector.py @@ -141,6 +141,8 @@ def main(): for job, cust in jobcust: jobdir = confcust.get_fulldir(cust, job) + custname = confcust.get_custname(cust) + filtlgroups = vo.get_groupgroups() numgg = len(filtlgroups) tags = confcust.get_vo_ggtags(job) @@ -170,9 +172,9 @@ def ismatch(elem): os.path.basename(sys.argv[0])) avro.write() - logger.info('Job:'+job+' Fetched Endpoints:%d' % (numge + len(gelegmap))+' Groups:%d' % (numgg)) + logger.info('Customer:'+custname+' Job:'+job+' Fetched Endpoints:%d' % (numge + len(gelegmap))+' Groups:%d' % (numgg)) if tags: - selstr = 'Job:%s Selected ' % (job) + selstr = 'Customer:%s Job:%s Selected ' % (custname, job) selgg = '' for key, value in tags.items(): if isinstance(value, list): diff --git a/bin/weights-gstat-connector.py b/bin/weights-gstat-connector.py index b82656d6..be619978 100755 --- a/bin/weights-gstat-connector.py +++ b/bin/weights-gstat-connector.py @@ -165,6 +165,7 @@ def main(): for job, cust in jobcust: jobdir = confcust.get_fulldir(cust, job) + custname = confcust.get_custname(cust) filename = jobdir + globopts['OutputWeights'.lower()] % timestamp datawr = gen_outdict(newData) @@ -176,6 +177,6 @@ def main(): avro = AvroWriter(globopts['AvroSchemasWeights'.lower()], filename, datawr, os.path.basename(sys.argv[0])) avro.write() - logger.info('Jobs:%d Sites:%d' % (len(jobcust), len(datawr))) + logger.info('Customer:%s Jobs:%d Sites:%d' % (custname, len(jobcust), len(datawr))) main() diff --git a/doc/sync.md b/doc/sync.md index 22b0f89b..f4ba5e9c 100644 --- a/doc/sync.md +++ b/doc/sync.md @@ -27,7 +27,7 @@ Installation narrows down to simply installing the package: `yum -y install argo-egi-connectors` -**`Components require avro package to be installed/available.`** +**`Components require avro and pyOpenSSL packages to be installed/available.`** | File Types | Destination | @@ -99,6 +99,7 @@ This configuration file lists all customers, their jobs and appropriate attribut Job folders for each customer are placed under the customer's `OutputDir` directory and appropriate directory names are read from the config file. Segment of configuration file that reflects the creation of directories is for example: [CUSTOMER_C1] + Name = C1Name1 OutputDir = /var/lib/argo-connectors/Customer1 Jobs = JOB_Test1, JOB_Test2 @@ -106,10 +107,11 @@ Job folders for each customer are placed under the customer's `OutputDir` direct Dirname = C1Testing1 [JOB_Test2] - Dirname = C2Testing2 + Dirname = C1Testing2 [CUSTOMER_C2] + Name = C2Name2 OutputDir = /var/lib/argo-connectors/Customer2 Jobs = Job_Test3, JOB_Test4 @@ -126,7 +128,7 @@ This will result in the following jobs directories: /var/lib/argo-connectors/Customer2/C2Testing1 /var/lib/argo-connectors/Customer2/C2Testing2 -So there are two customers, C1 and C2, each one identified with its `[CUSTOMER_*]` section. `CUSTOMER_` is a section keyword and must be specified when one wants to define a new customer. Each customer has two mandatory options: `OutputDir` and `Jobs`. With `OutputDir` option, customer defines his directory where he'll write job folders and other data. Customer must also specify set of jobs listed in `Jobs` options since it can not exist without associated jobs. The name of the job folder is specified with `Dirname` option of the certain job so `JOB\_Test1`, identified with `[JOB_Test1]` section, will be named `C1Testing1` and it will be placed under customer's `/var/lib/argo-connectors/Customer1/` directory. Each component will firstly try to create described directory structure if it doesn't exist yet. Only afterward it will write its data. +So there are two customers, C1 and C2, each one identified with its `[CUSTOMER_*]` section. `CUSTOMER_` is a section keyword and must be specified when one wants to define a new customer. Each customer has three mandatory options: `Name`, `OutputDir` and `Jobs`. With `OutputDir` option, customer defines his directory where he'll write job folders and other data. Customer must also specify set of jobs listed in `Jobs` options since it can not exist without associated jobs. The name of the job folder is specified with `Dirname` option of the certain job so `JOB\_Test1`, identified with `[JOB_Test1]` section, will be named `C1Testing1` and it will be placed under customer's `/var/lib/argo-connectors/Customer1/` directory. Each component will firstly try to create described directory structure if it doesn't exist yet. Only afterward it will write its data. Every connector reads this configuration file because it needs to find out how many customers are there and what are theirs customer and job directory names where they will lay down its files. So `poem-connector.py`, `downtimes-gocdb-connector.py`, `weights-gstat-connector.py`, all of them are writing theirs data in each job directory for each customer. Topology for EGI (fetched from GOCDB) is different than one for the VO so exceptions to this are `topology-gocdb-connector.py` and `topology-vo-connector.py`. They are writing data for a job based on the job's topology type specified with `TopoType` attribute. diff --git a/etc/customer.conf b/etc/customer.conf index 73179b91..488c2cc7 100644 --- a/etc/customer.conf +++ b/etc/customer.conf @@ -2,6 +2,7 @@ GridPPFeed = http://www-pnp.physics.ox.ac.uk/%7Emohammad/gridpp.feed.xml [CUSTOMER_EGI] +Name = EGI OutputDir = /var/lib/argo-connectors/EGI/ Jobs = JOB_EGICritical, JOB_EGICloudmon, JOB_GridPPTest diff --git a/modules/config.py b/modules/config.py index a9191fc6..1de2163e 100644 --- a/modules/config.py +++ b/modules/config.py @@ -164,11 +164,12 @@ def parse(self): custjobs = config.get(section, 'Jobs').split(',') custjobs = [job.strip() for job in custjobs] custdir = config.get(section, 'OutputDir') + custname = config.get(section, 'Name') except ConfigParser.NoOptionError as e: self.logger.error(e.message) raise SystemExit(1) - self._cust.update({section: {'Jobs': custjobs, 'OutputDir': custdir}}) + self._cust.update({section: {'Jobs': custjobs, 'OutputDir': custdir, 'Name': custname}}) if self._custattrs: for attr in self._custattrs: @@ -227,6 +228,9 @@ def get_fulldir(self, cust, job): def get_custdir(self, cust): return self._dir_from_sect(cust, self._cust) + def get_custname(self, cust): + return self._cust[cust]['Name'] + def make_dirstruct(self): dirs = [] for cust in self._cust.keys():