Skip to content

Commit

Permalink
Merge pull request #63 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Version 1.4.4-6
  • Loading branch information
skanct committed Oct 29, 2015
2 parents f8055ec + 153d843 commit 4f8ca5b
Show file tree
Hide file tree
Showing 13 changed files with 913 additions and 279 deletions.
35 changes: 33 additions & 2 deletions argo-egi-connectors.spec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Name: argo-egi-connectors
Version: 1.4.3
Release: 3%{?dist}
Version: 1.4.4
Release: 6%{?dist}

Group: EGI/SA4
License: ASL 2.0
Expand All @@ -11,6 +11,7 @@ Vendor: SRCE <[email protected], [email protected]>
Obsoletes: ar-sync
Prefix: %{_prefix}
Requires: avro
Requires: pyOpenSSL
Source0: %{name}-%{version}.tar.gz

BuildArch: noarch
Expand Down Expand Up @@ -45,6 +46,36 @@ rm -rf $RPM_BUILD_ROOT
%attr(0750,root,root) %dir %{_localstatedir}/log/argo-egi-connectors/

%changelog
* Thu Oct 15 2015 Daniel Vrcic <[email protected]> - 1.4.4-6%{?dist}
- bugfix handling lowercase defined POEM profiles
- remove hardcoded customer name for topology-gocdb-connector
https://github.com/ARGOeu/ARGO/issues/173
- guide updated with new configuration option for customer
* Thu Oct 8 2015 Daniel Vrcic <[email protected]> - 1.4.4-5%{?dist}
- bugfix in case of no downtimes defined for given date
https://github.com/ARGOeu/ARGO/issues/170
* Wed Oct 7 2015 Daniel Vrcic <[email protected]> - 1.4.4-4%{?dist}
- poem-connector urlparse bugfix
* Wed Oct 7 2015 Daniel Vrcic <[email protected]> - 1.4.4-3%{?dist}
- grab all distinct scopes for feed
* Tue Oct 6 2015 Daniel Vrcic <[email protected]> - 1.4.4-2%{?dist}
- fix initialization of loggers in config parsers
- backward compatible exception messages
* Fri Oct 2 2015 Daniel Vrcic <[email protected]> - 1.4.4-1%{?dist}
- filter SRM endpoints too
- refactored use of logging
- connectors can verify server certificate
https://github.com/ARGOeu/ARGO/issues/153
- report correct number of fetched endpoints even if SRM endpoints were being filtered
- connectors handle help argument and describe basic info and usage
https://github.com/ARGOeu/ARGO/issues/169
- removed hardcoded scopes and grab them dynamically from config
https://github.com/ARGOeu/ARGO/issues/168
- report config parser errors via logger
- downtimes connector complain if wrong date specified
- remove notion of default scope
- doc moved to repo
- updated doc with server's cert validate options
* Wed Aug 19 2015 Daniel Vrcic <[email protected]> - 1.4.3-3%{?dist}
- fix exception in case of returned HTTP 500 for other connectors
* Sat Aug 15 2015 Daniel Vrcic <[email protected]> - 1.4.3-2%{?dist}
Expand Down
80 changes: 45 additions & 35 deletions bin/downtimes-gocdb-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@
import os
import sys
import xml.dom.minidom
from xml.parsers.expat import ExpatError
import copy
import socket
from urlparse import urlparse

from argo_egi_connectors.writers import AvroWriter, Logger
from argo_egi_connectors.writers import AvroWriter
from argo_egi_connectors.writers import SingletonLogger as Logger
from argo_egi_connectors.config import Global, CustomerConf
from argo_egi_connectors.tools import verify_cert, errmsg_from_excp
from OpenSSL.SSL import Error as SSLError

logger = None

Expand All @@ -43,31 +49,25 @@

class GOCDBReader(object):
def __init__(self, feed):
self.gocdbUrl = feed
self.gocdbHost = self._getHostFeed(feed)
self._o = urlparse(feed)
self._parsed = True
self.gocdbHost = self._o.netloc
self.hostKey = globopts['AuthenticationHostKey'.lower()]
self.hostCert = globopts['AuthenticationHostCert'.lower()]
self.argDateFormat = "%Y-%m-%d"
self.WSDateFormat = "%Y-%m-%d %H:%M"

def _getHostFeed(self, feed):
host = feed
if "https://" in feed:
host = feed.split("https://")[1]
if "/" in host:
host = host.split('/')[0]
return host

def getDowntimes(self, start, end):
filteredDowntimes = list()
try:
if eval(globopts['AuthenticationVerifyServerCert'.lower()]):
verify_cert(self.gocdbHost, globopts['AuthenticationCAPath'.lower()], 180)
conn = httplib.HTTPSConnection(self.gocdbHost, 443, self.hostKey, self.hostCert)
conn.request('GET', '/gocdbpi/private/' + '?method=get_downtime&windowstart=%s&windowend=%s' % (start.strftime(self.argDateFormat), end.strftime(self.argDateFormat)))
conn.request('GET', '/gocdbpi/private/?method=get_downtime&windowstart=%s&windowend=%s' % (start.strftime(self.argDateFormat), end.strftime(self.argDateFormat)))
res = conn.getresponse()
if res.status == 200:
doc = xml.dom.minidom.parseString(res.read())
downtimes = doc.getElementsByTagName('DOWNTIME')
assert len(downtimes) > 0
for downtime in downtimes:
classification = downtime.getAttributeNode('CLASSIFICATION').nodeValue
hostname = downtime.getElementsByTagName('HOSTNAME')[0].childNodes[0].data
Expand All @@ -94,16 +94,20 @@ def getDowntimes(self, start, end):
else:
logger.error('GOCDBReader.getDowntimes(): HTTP response: %s %s' % (str(res.status), res.reason))
raise SystemExit(1)
except AssertionError:
logger.error("GOCDBReader.getDowntimes():", "Error parsing feed")
except ExpatError as e:
logger.error("GOCDBReader.getDowntimes(): Error parsing feed %s - %s" %
(self._o.scheme + '://' + self._o.netloc + '/gocdbpi/private/?method=get_downtime', e.message))
self._parsed = False
except(SSLError, socket.error, socket.timeout) as e:
logger.error('Connection error %s - %s' % (self.gocdbHost, errmsg_from_excp(e)))
raise SystemExit(1)

return filteredDowntimes
return filteredDowntimes, self._parsed

def main():
global logger
logger = Logger(os.path.basename(sys.argv[0]))
certs = {'Authentication': ['HostKey', 'HostCert']}
certs = {'Authentication': ['HostKey', 'HostCert', 'CAPath', 'VerifyServerCert']}
schemas = {'AvroSchemas': ['Downtimes']}
output = {'Output': ['Downtimes']}
cglob = Global(certs, schemas, output)
Expand All @@ -115,7 +119,7 @@ def main():
confcust.make_dirstruct()
feeds = confcust.get_mapfeedjobs(sys.argv[0], deffeed='https://goc.egi.eu/gocdbpi/')

parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(description='Fetch downtimes from GOCDB for given date')
parser.add_argument('-d', dest='date', nargs=1, metavar='YEAR-MONTH-DAY', required=True)
args = parser.parse_args()

Expand All @@ -124,29 +128,35 @@ def main():
raise SystemExit(1)

# calculate start and end times
start = datetime.datetime.strptime(args.date[0], '%Y-%m-%d')
end = datetime.datetime.strptime(args.date[0], '%Y-%m-%d')
timestamp = start.strftime('%Y_%m_%d')
try:
start = datetime.datetime.strptime(args.date[0], '%Y-%m-%d')
end = datetime.datetime.strptime(args.date[0], '%Y-%m-%d')
timestamp = start.strftime('%Y_%m_%d')
except ValueError as e:
logger.error(e)
raise SystemExit(1)
start = start.replace(hour=0, minute=0, second=0)
end = end.replace(hour=23, minute=59, second=59)

for feed, jobcust in feeds.items():
gocdb = GOCDBReader(feed)
dts = gocdb.getDowntimes(start, end)
dts, parsed = gocdb.getDowntimes(start, end)

dtslegmap = []
for dt in dts:
if dt['service'] in LegMapServType.keys():
dtslegmap.append(copy.copy(dt))
dtslegmap[-1]['service'] = LegMapServType[dt['service']]
for job, cust in jobcust:
jobdir = confcust.get_fulldir(cust, job)

filename = jobdir + globopts['OutputDowntimes'.lower()] % timestamp
avro = AvroWriter(globopts['AvroSchemasDowntimes'.lower()], filename,
dts + dtslegmap, os.path.basename(sys.argv[0]), logger)
avro.write()

logger.info('Fetched Date:%s Endpoints:%d' % (args.date[0], len(dts + dtslegmap)))
if parsed:
for dt in dts:
if dt['service'] in LegMapServType.keys():
dtslegmap.append(copy.copy(dt))
dtslegmap[-1]['service'] = LegMapServType[dt['service']]
for job, cust in jobcust:
jobdir = confcust.get_fulldir(cust, job)
custname = confcust.get_custname(cust)

filename = jobdir + globopts['OutputDowntimes'.lower()] % timestamp
avro = AvroWriter(globopts['AvroSchemasDowntimes'.lower()], filename,
dts + dtslegmap, os.path.basename(sys.argv[0]))
avro.write()

logger.info('Customer:%s Jobs:%d Fetched Date:%s Endpoints:%d' % (custname, len(jobcust), args.date[0], len(dts + dtslegmap)))

main()
74 changes: 28 additions & 46 deletions bin/poem-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,17 @@
import urlparse
import socket
import re
from argo_egi_connectors.writers import AvroWriter, Logger
from argo_egi_connectors.config import CustomerConf, PoemConf, Global

writers = ['file', 'avro']
from argo_egi_connectors.writers import AvroWriter
from argo_egi_connectors.writers import SingletonLogger as Logger
from argo_egi_connectors.config import CustomerConf, PoemConf, Global
from argo_egi_connectors.tools import verify_cert, errmsg_from_excp
from OpenSSL.SSL import Error as SSLError

logger = None
globopts, poemopts = {}, {}
cpoem = None

def resolve_http_redirect(url, depth=0):
if depth > 10:
raise Exception("Redirected "+depth+" times, giving up.")

o = urlparse.urlparse(url,allow_fragments=True)
conn = httplib.HTTPSConnection(o.netloc, 443,
globopts['AuthenticationHostKey'.lower()],
globopts['AuthenticationHostCert'.lower()])
path = o.path
if o.query:
path +='?'+o.query

try:
conn.request("HEAD", path)
res = conn.getresponse()
headers = dict(res.getheaders())
if headers.has_key('location') and headers['location'] != url:
return resolve_http_redirect(headers['location'],
globopts['AuthenticationHostKey'.lower()],
globopts['AuthenticationHostCert'.lower()],
depth+1)
else:
return url
except:
return url;
custname = ''

class PoemReader:
def __init__(self):
Expand Down Expand Up @@ -160,15 +137,17 @@ def loadProfilesFromServer(self, server, vo, filterProfiles):
if len(filterProfiles) > 0:
doFilterProfiles = True

if 'https://' not in server:
if not server.startswith('http'):
server = 'https://' + server

logger.info('Server:%s VO:%s' % (server, vo))
url = self.poemRequest % (server, vo)
o = urlparse.urlparse(url, allow_fragments=True)

url = resolve_http_redirect(self.poemRequest % (server,vo))

o = urlparse.urlparse(url,allow_fragments=True)
try:
assert o.scheme != '' and o.netloc != '' and o.path != ''
logger.info('Server:%s VO:%s' % (o.netloc, vo))
if eval(globopts['AuthenticationVerifyServerCert'.lower()]):
verify_cert(o.netloc, globopts['AuthenticationCAPath'.lower()], 180)
conn = httplib.HTTPSConnection(o.netloc, 443,
globopts['AuthenticationHostKey'.lower()],
globopts['AuthenticationHostCert'.lower()])
Expand All @@ -178,16 +157,19 @@ def loadProfilesFromServer(self, server, vo, filterProfiles):
if res.status == 200:
json_data = json.loads(res.read())
for profile in json_data[0]['profiles']:
if not doFilterProfiles or (profile['namespace']+'.'+profile['name']).upper() in filterProfiles:
validProfiles[(profile['namespace']+'.'+profile['name']).upper()] = profile
if not doFilterProfiles or profile['namespace'].upper()+'.'+profile['name'] in filterProfiles:
validProfiles[profile['namespace'].upper()+'.'+profile['name']] = profile
elif res.status in (301, 302):
logger.warning('Redirect: ' + urlparse.urljoin(url, res.getheader('location', '')))

else:
logger.error('POEMReader.loadProfilesFromServer(): HTTP response: %s %s' % (str(res.status), res.reason))
raise SystemExit(1)
except (socket.error, httplib.HTTPException) as e:
logger.error('Connection to %s failed: ' % (server) + str(e))
except(SSLError, socket.error, socket.timeout, httplib.HTTPException) as e:
logger.error('Connection error %s - %s' % (server, errmsg_from_excp(e)))
raise SystemExit(1)
except AssertionError:
logger.error('Invalid POEM PI URL: %s' % (url))
raise SystemExit(1)

return validProfiles
Expand All @@ -206,7 +188,7 @@ def createProfileEntries(self, server, ngi, profile):
entries.append(entry)
return entries

class FileWriter:
class PrefilterPoem:
def __init__(self, outdir):
self.outputDir = outdir
self.outputFileTemplate = 'poem_sync_%s.out'
Expand Down Expand Up @@ -247,7 +229,7 @@ def main():
global logger
logger = Logger(os.path.basename(sys.argv[0]))

certs = {'Authentication': ['HostKey', 'HostCert']}
certs = {'Authentication': ['HostKey', 'HostCert', 'VerifyServerCert', 'CAPath']}
schemas = {'AvroSchemas': ['Poem']}
output = {'Output': ['Poem']}
cglob = Global(certs, schemas, output)
Expand All @@ -271,10 +253,10 @@ def main():

for cust in confcust.get_customers():
# write profiles
for writer in writers:
if writer == 'file':
writerInstance = FileWriter(confcust.get_custdir(cust))
writerInstance.writeProfiles(ps, timestamp)
poempref = PrefilterPoem(confcust.get_custdir(cust))
poempref.writeProfiles(ps, timestamp)

custname = confcust.get_custname(cust)

for job in confcust.get_jobs(cust):
jobdir = confcust.get_fulldir(cust, job)
Expand All @@ -284,9 +266,9 @@ def main():

filename = jobdir + globopts['OutputPoem'.lower()]% timestamp
avro = AvroWriter(globopts['AvroSchemasPoem'.lower()], filename,
lfprofiles, os.path.basename(sys.argv[0]), logger)
lfprofiles, os.path.basename(sys.argv[0]))
avro.write()

logger.info('Job:'+job+' Profiles:%s Tuples:%d' % (','.join(profiles), len(lfprofiles)))
logger.info('Customer:'+custname+' Job:'+job+' Profiles:%s Tuples:%d' % (','.join(profiles), len(lfprofiles)))

main()
20 changes: 12 additions & 8 deletions bin/prefilter-egi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
# the EGI-InSPIRE project through the European Commission's 7th
# Framework Programme (contract # INFSO-RI-261323)

import sys
import os
import datetime
import avro.schema
import argparse
import avro.schema
import datetime
import os
import sys
import time
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

from argo_egi_connectors.config import Global, CustomerConf
from argo_egi_connectors.writers import Logger
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

globopts, confcust = {}, None
logger = None
Expand Down Expand Up @@ -298,7 +298,8 @@ def main():

stats = ()

parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(description="""Filters consumer logs based on various criteria
(allowed NGIs, service flavours, metrics...)""")
group = parser.add_mutually_exclusive_group()
group.add_argument('-d', dest='date', nargs=1, metavar='YEAR-MONTH-DAY')
group.add_argument('-f', dest='cfile', nargs=1, metavar='consumer_log_YEAR-MONTH-DAY.avro')
Expand All @@ -309,8 +310,11 @@ def main():
date = args.cfile[0].split('_')[-1]
date = date.split('.')[0]
date = date.split('-')
else:
elif args.date:
date = args.date[0].split('-')
else:
parser.print_help()
raise SystemExit(1)

if len(date) == 0 or len(date) != 3:
logger.error('Consumer file does not end with correctly formatted date')
Expand Down
Loading

0 comments on commit 4f8ca5b

Please sign in to comment.