Skip to content

Commit

Permalink
Merge pull request #240 from ARGOeu/devel
Browse files Browse the repository at this point in the history
PR to Release V2.2.0
  • Loading branch information
themiszamani authored Jul 28, 2022
2 parents db646f1 + bff23eb commit b703a65
Show file tree
Hide file tree
Showing 12 changed files with 470 additions and 21 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [2.2.0] - 2022-07-28

### Added

* ARGO-3695 [NEANIAS] Use ARGO for downtimes

## [2.1.0] - 2022-06-07

### Added
Expand Down
2 changes: 1 addition & 1 deletion argo-connectors.spec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%global __python /usr/bin/python3

Name: argo-connectors
Version: 2.1.0
Version: 2.2.0
Release: 1%{?dist}
Group: EGI/SA4
License: ASL 2.0
Expand Down
92 changes: 92 additions & 0 deletions exec/downtimes-csv-connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/python3

import argparse
import datetime
import os
import sys

import asyncio
import uvloop

from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError
from argo_connectors.log import Logger
from argo_connectors.tasks.flat_downtimes import TaskCsvDowntimes
from argo_connectors.tasks.common import write_state

from argo_connectors.config import Global, CustomerConf

logger = None
globopts = {}


def get_webapi_opts(cglob, confcust):
webapi_custopts = confcust.get_webapiopts()
webapi_opts = cglob.merge_opts(webapi_custopts, 'webapi')
webapi_complete, missopt = cglob.is_complete(webapi_opts, 'webapi')
if not webapi_complete:
logger.error('Customer:%s %s options incomplete, missing %s' % (logger.customer, 'webapi', ' '.join(missopt)))
raise SystemExit(1)
return webapi_opts


def main():
global logger, globopts
parser = argparse.ArgumentParser(description='Fetch downtimes from CSV for given date')
parser.add_argument('-d', dest='date', nargs=1, metavar='YEAR-MONTH-DAY', required=True)
parser.add_argument('-c', dest='custconf', nargs=1, metavar='customer.conf', help='path to customer configuration file', type=str, required=False)
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf', help='path to global configuration file', type=str, required=False)
args = parser.parse_args()

logger = Logger(os.path.basename(sys.argv[0]))
confpath = args.gloconf[0] if args.gloconf else None
cglob = Global(sys.argv[0], confpath)
globopts = cglob.parse()

confpath = args.custconf[0] if args.custconf else None
confcust = CustomerConf(sys.argv[0], confpath)
confcust.parse()
confcust.make_dirstruct()
confcust.make_dirstruct(globopts['InputStateSaveDir'.lower()])
feed = confcust.get_downfeed()
logger.customer = confcust.get_custname()

if len(args.date) == 0:
print(parser.print_help())
raise SystemExit(1)

# calculate start and end times
try:
current_date = datetime.datetime.strptime(args.date[0], '%Y-%m-%d')
timestamp = current_date.strftime('%Y_%m_%d')
current_date = current_date.replace(hour=0, minute=0, second=0)

except ValueError as exc:
logger.error(exc)
raise SystemExit(1)

uidservtype = confcust.get_uidserviceendpoints()

webapi_opts = get_webapi_opts(cglob, confcust)

loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

try:
cust = list(confcust.get_customers())[0]
task = TaskCsvDowntimes(loop, logger, sys.argv[0], globopts,
webapi_opts, confcust,
confcust.get_custname(cust), feed,
current_date, uidservtype, args.date[0],
timestamp)
loop.run_until_complete(task.run())

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
logger.error(repr(exc))
loop.run_until_complete(
write_state(sys.argv[0], globopts, confcust, timestamp, False)
)

loop.close()

if __name__ == '__main__':
main()
24 changes: 17 additions & 7 deletions modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def __init__(self, caller, confpath=None, **kwargs):
self._merge_dict(self.shared_secopts,
self.conf_topo_schemas,
self.conf_topo_output),
'downtimes-csv-connector.py':
self._merge_dict(self.shared_secopts,
self.conf_downtimes_schemas,
self.conf_downtimes_output),
'downtimes-gocdb-connector.py':
self._merge_dict(self.shared_secopts,
self.conf_downtimes_schemas,
Expand Down Expand Up @@ -209,6 +213,7 @@ class CustomerConf(object):
'topology-provider-connector.py': [''],
'metricprofile-webapi-connector.py': ['MetricProfileNamespace'],
'downtimes-gocdb-connector.py': ['DowntimesFeed', 'TopoUIDServiceEndpoints'],
'downtimes-csv-connector.py': ['DowntimesFeed', 'TopoUIDServiceEndpoints'],
'weights-vapor-connector.py': ['WeightsFeed',
'TopoFetchType'],
'service-types-gocdb-connector.py': ['ServiceTypesFeed'],
Expand Down Expand Up @@ -266,6 +271,7 @@ def parse(self):
topofeedservicegroups = config.get(section, 'TopoFeedServiceGroups', fallback=None)
topofeedpaging = config.get(section, 'TopoFeedPaging', fallback='GOCDB')
servicetypesfeed = config.get(section, 'ServiceTypesFeed', fallback=None)
downtimesfeed = config.get(section, 'DowntimesFeed', fallback=None)

if not custdir.endswith('/'):
custdir = '{}/'.format(custdir)
Expand All @@ -286,17 +292,18 @@ def parse(self):

self._cust.update({section: {'Jobs': custjobs, 'OutputDir':
custdir, 'Name': custname,
'TopoFetchType': topofetchtype,
'TopoFeedPaging': topofeedpaging,
'TopoScope': toposcope,
'DowntimesFeed': downtimesfeed,
'ServiceTypesFeed': servicetypesfeed,
'TopoFeed': topofeed,
'TopoFeedSites': topofeedsites,
'TopoFeedServiceGroups': topofeedservicegroups,
'TopoFeedEndpoints': topofeedendpoints,
'TopoFeedEndpointsExtensions': topofeedendpointsextensions,
'TopoUIDServiceEnpoints': topouidservendpoints,
'TopoFeedPaging': topofeedpaging,
'TopoFeedServiceGroups': topofeedservicegroups,
'TopoFeedSites': topofeedsites,
'TopoFetchType': topofetchtype,
'TopoScope': toposcope,
'TopoType': topotype,
'ServiceTypesFeed': servicetypesfeed
'TopoUIDServiceEnpoints': topouidservendpoints
}})
if optopts:
auth, webapi, empty_data, bdii = {}, {}, {}, {}
Expand Down Expand Up @@ -504,6 +511,9 @@ def _get_cust_options(self, opt):
target_option = options[option]
return target_option

def get_downfeed(self):
return self._get_cust_options('DowntimesFeed')

def get_topofeed(self):
return self._get_cust_options('TopoFeed')

Expand Down
1 change: 1 addition & 0 deletions modules/io/webapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

class WebAPI(object):
methods = {
'downtimes-csv-connector.py': 'downtimes',
'downtimes-gocdb-connector.py': 'downtimes',
'topology-gocdb-connector.py': 'topology',
'topology-csv-connector.py': 'topology',
Expand Down
55 changes: 55 additions & 0 deletions modules/parse/flat_downtimes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import datetime
import xml.dom.minidom

from xml.parsers.expat import ExpatError

from argo_connectors.exceptions import ConnectorParseError
from argo_connectors.parse.base import ParseHelpers
from argo_connectors.utils import construct_fqdn
from argo_connectors.utils import module_class_name


class ParseDowntimes(ParseHelpers):
def __init__(self, logger, data, current_date, uid=False):
self.logger = logger
self.data = self.csv_to_json(data)
self.start = current_date
self.end = current_date.replace(hour=23, minute=59, second=59)
self.uid = uid

def get_data(self):
downtimes = list()

for downtime in self.data:
entry = dict()

service_id = downtime['unique_id']
classification = downtime['Severity']
if not service_id or classification != 'OUTAGE':
continue

hostname = construct_fqdn(downtime['url'])
service_type = downtime['service_type']
start_time = datetime.datetime.strptime(downtime['start_time'], "%m/%d/%Y %H:%M")
end_time = datetime.datetime.strptime(downtime['end_time'], "%m/%d/%Y %H:%M")

if self.uid:
entry['hostname'] = '{0}_{1}'.format(hostname, service_id)
else:
entry['hostname'] = hostname

start_date = start_time.replace(hour=0, minute=0, second=0)
end_date = end_time.replace(hour=0, minute=0, second=0)
if self.start >= start_date and self.start <= end_date:
if start_time < self.start:
start_time = self.start
if end_time > self.end:
end_time = self.end

entry['service'] = service_type
entry['start_time'] = start_time.strftime('%Y-%m-%dT%H:%M:00Z')
entry['end_time'] = end_time.strftime('%Y-%m-%dT%H:%M:00Z')

downtimes.append(entry)

return downtimes
77 changes: 77 additions & 0 deletions modules/tasks/flat_downtimes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os

from urllib.parse import urlparse

from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError
from argo_connectors.io.http import SessionWithRetry
from argo_connectors.io.webapi import WebAPI
from argo_connectors.parse.flat_downtimes import ParseDowntimes
from argo_connectors.tasks.common import write_state, write_downtimes_avro as write_avro


class TaskCsvDowntimes(object):
def __init__(self, loop, logger, connector_name, globopts, webapi_opts,
confcust, custname, feed, current_date,
uidservtype, targetdate, timestamp):
self.event_loop = loop
self.logger = logger
self.connector_name = connector_name
self.globopts = globopts
self.webapi_opts = webapi_opts
self.confcust = confcust
self.custname = custname
self.feed = feed
self.current_date = current_date
self.uidservtype = uidservtype
self.targetdate = targetdate
self.timestamp = timestamp

async def fetch_data(self):
session = SessionWithRetry(self.logger,
os.path.basename(self.connector_name),
self.globopts)
res = await session.http_get(self.feed)

return res

def parse_source(self, res):
csv_downtimes = ParseDowntimes(self.logger, res, self.current_date,
self.uidservtype)
return csv_downtimes.get_data()

async def send_webapi(self, dts):
webapi = WebAPI(self.connector_name, self.webapi_opts['webapihost'],
self.webapi_opts['webapitoken'], self.logger,
int(self.globopts['ConnectionRetry'.lower()]),
int(self.globopts['ConnectionTimeout'.lower()]),
int(self.globopts['ConnectionSleepRetry'.lower()]),
date=self.targetdate)
await webapi.send(dts, downtimes_component=True)

async def run(self):
try:
write_empty = self.confcust.send_empty(self.connector_name)
if not write_empty:
res = await self.fetch_data()
dts = self.parse_source(res)
else:
dts = []

await write_state(self.connector_name, self.globopts, self.confcust, self.timestamp, True)

if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await self.send_webapi(dts)

# we don't have multiple tenant definitions in one
# customer file so we can safely assume one tenant/customer
if dts or write_empty:
cust = list(self.confcust.get_customers())[0]
self.logger.info('Customer:%s Fetched Date:%s Endpoints:%d' %
(self.confcust.get_custname(cust), self.targetdate, len(dts)))

if eval(self.globopts['GeneralWriteAvro'.lower()]):
write_avro(self.logger, self.globopts, self.confcust, dts, self.timestamp)

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
self.logger.error(repr(exc))
await write_state(self.connector_name, self.globopts, self.confcust, self.timestamp, False)
25 changes: 14 additions & 11 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def get_ver():
setup(name=NAME,
version=get_ver(),
author='SRCE',
author_email='[email protected], [email protected]',
author_email='[email protected]',
description='Components generate input data for ARGO Compute Engine',
classifiers=[
"Development Status :: 5 - Production/Stable",
Expand All @@ -36,14 +36,17 @@ def get_ver():
'argo_connectors.parse', 'argo_connectors.mesh',
'argo_connectors.tasks'],
data_files=[('/etc/argo-connectors', glob.glob('etc/*.conf.template')),
('/usr/libexec/argo-connectors', ['exec/downtimes-gocdb-connector.py',
'exec/metricprofile-webapi-connector.py',
'exec/topology-provider-connector.py',
'exec/topology-gocdb-connector.py',
'exec/service-types-gocdb-connector.py',
'exec/service-types-json-connector.py',
'exec/service-types-csv-connector.py',
'exec/topology-json-connector.py',
'exec/weights-vapor-connector.py',
'exec/topology-csv-connector.py']),
('/usr/libexec/argo-connectors', [
'exec/downtimes-csv-connector.py',
'exec/downtimes-gocdb-connector.py',
'exec/metricprofile-webapi-connector.py',
'exec/service-types-csv-connector.py',
'exec/service-types-gocdb-connector.py',
'exec/service-types-json-connector.py',
'exec/topology-csv-connector.py',
'exec/topology-gocdb-connector.py',
'exec/topology-json-connector.py',
'exec/topology-provider-connector.py',
'exec/weights-vapor-connector.py',
]),
('/etc/argo-connectors/schemas', glob.glob('etc/schemas/*.avsc'))])
Loading

0 comments on commit b703a65

Please sign in to comment.