diff --git a/liota/core/offline_database.py b/liota/core/offline_database.py new file mode 100644 index 00000000..79f0261e --- /dev/null +++ b/liota/core/offline_database.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +import sqlite3 +import threading +import time +from liota.dcc_comms.dcc_comms import DCCComms +from liota.dcc_comms.check_connection import CheckConnection + +log = logging.getLogger(__name__) + +class OfflineDatabase: + def __init__(self, table_name, comms, conn=None, data_drain_size=1, draining_frequency=0): + """ + :param table_name: table_name in which message will be stored + :param comms: comms instance of DCCComms + :param data_drain_size: how many messages will be drained within each draining_frequency secs defined. + :param draining_frequency: frequency with which data will be published after internet connectivity established(like seconds). + """ + if not isinstance(table_name, basestring): + log.error("Table name should be a string.") + raise TypeError("Table name should be a string.") + if not isinstance(comms, DCCComms): + log.error("DCCComms object is expected.") + raise TypeError("DCCComms object is expected.") + if not isinstance(draining_frequency, float) and not isinstance(draining_frequency, int): + log.error("draining_frequency is expected of float or int type.") + raise TypeError("draining_frequency is expected of float or int type.") + try: + assert draining_frequency>=0 + except AssertionError as e: + log.error("draining_frequency can't be negative.") + raise e("draining_frequency can't be negative.") + self.table_name = table_name + if conn is None: + self.internet_conn = CheckConnection() + else: + self.internet_conn = conn + self.draining_frequency = draining_frequency + self.data_drain_size = data_drain_size + self.comms = comms + self.flag_conn_open = False + self.draining_in_progress = False + self._offline_db_lock = threading.Lock() + self._create_table() + + def _create_table(self): + if self.flag_conn_open is False: + self.conn = sqlite3.connect('storage.db') + try: + with self.conn: + if not self.conn.execute("SELECT name FROM sqlite_master WHERE TYPE='table' AND name= ? ", (self.table_name,)).fetchone(): + self.conn.text_factory = str + self.flag_conn_open = True + self.cursor = self.conn.cursor() + self.cursor.execute("CREATE TABLE "+self.table_name+" (Message TEXT)") + self.cursor.close() + del self.cursor + else: + log.info("Table already there!!!") + except Exception as e: + raise e + finally: + self.flag_conn_open = False + self.conn.close() + + def add(self, message): + try: + self.conn = sqlite3.connect('storage.db') + self.flag_conn_open = True + with self.conn: + self.cursor = self.conn.cursor() + log.info("Adding data to "+ self.table_name) + self.cursor.execute("INSERT INTO "+self.table_name+"(Message) VALUES (?);", (message,)) + self.cursor.close() + del self.cursor + except sqlite3.OperationalError as e: + raise e + finally: + self.conn.close() + self.flag_conn_open = False + + def _drain(self): + self._offline_db_lock.acquire() + self.conn = sqlite3.connect('storage.db') + self.flag_conn_open = True + self.draining_in_progress = True + self.cursor = self.conn.cursor() + self.del_cursor = self.conn.cursor() + data_drained = 0 + try: + for row in self.cursor.execute("SELECT Message FROM "+self.table_name): + if self.comms is not None and self.internet_conn.check : + try: + self.comms.send(row[0]) + log.info("Data Drain: {}".format(row[0])) + data_drained+=1 + self.del_cursor.execute("Delete from "+self.table_name+" where rowid IN (Select rowid from "+self.table_name+" limit 1);") + self.conn.commit() + except Exception as e: + raise e + else: #internet connectivity breaks while draining + log.warning("Internet broke while draining") + break + if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn. + data_drained=0 + time.sleep(self.draining_frequency) + except Exception as e: + raise e + log.warning("Internet connectivity broke while draining.") + finally: + self.del_cursor.close() + del self.del_cursor + self.conn.close() + self.flag_conn_open = False + self.draining_in_progress = False + self._offline_db_lock.release() + + def start_drain(self): + queueDrain = threading.Thread(target=self._drain) + queueDrain.daemon = True + queueDrain.start() diff --git a/liota/core/offline_queue.py b/liota/core/offline_queue.py new file mode 100644 index 00000000..09f07771 --- /dev/null +++ b/liota/core/offline_queue.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +from collections import deque +import threading +import time +from liota.dcc_comms.dcc_comms import DCCComms +from liota.dcc_comms.check_connection import CheckConnection + +log = logging.getLogger(__name__) + +class OfflineQueue: + def __init__(self, queue_size, comms, conn=None, data_drain_size=1, drop_oldest=True, draining_frequency=0): + """ + :param size: size of the offline_queue, if negative implies infinite. + :param drop_oldest: if True oldest data will be dropped after size of queue is exceeded. + :param comms: comms instance of DCCComms + :param data_drain_size: how many messages will be drained within each draining_frequency secs defined. + :param draining_frequency: frequency with which data will be published after internet connectivity established(like seconds). + """ + if not isinstance(queue_size, int): + log.error("Size is expected of int type.") + raise TypeError("Size is expected of int type.") + if not isinstance(comms, DCCComms): + log.error("DCCComms object is expected.") + raise TypeError("DCCComms object is expected.") + if not isinstance(drop_oldest, bool): + log.error("drop_oldest/newest is expected of bool type.") + raise TypeError("drop_oldest is expected of bool type.") + if not isinstance(draining_frequency, float) and not isinstance(draining_frequency, int): + log.error("draining_frequency is expected of float or int type.") + raise TypeError("draining_frequency is expected of float or int type.") + try: + assert queue_size!=0 and draining_frequency>=0 + except AssertionError as e: + log.error("Size can't be zero, draining_frequency can't be negative.") + raise e("Size can't be zero, draining_frequency can't be negative.") + self.size = queue_size + self.drop_oldest = drop_oldest + if (self.size>0 and drop_oldest): + self.deque = deque(maxlen=self.size) + else: + self.deque = deque() + self.comms = comms + self.data_drain_size = data_drain_size + if conn is None: + self.conn = CheckConnection() + else: + self.conn = conn + self.draining_frequency = draining_frequency + self.draining_in_progress = False + self._offlineQLock = threading.Lock() + + def append(self, data): + if (self.size<0): #for infinite length deque + self.deque.append(data) + elif (self.size>0 and self.drop_oldest): #for deque with drop_oldest=True + if len(self.deque) is self.size: + log.info("Message dropped: {}".format(self.deque[0])) + self.deque.append(data) + else: #for deque with drop_oldest=False + if len(self.deque) is self.size: + log.info("Message dropped: {}".format(data)) + else: + self.deque.append(data) + + def _drain(self): + self._offlineQLock.acquire() + data_drained = 0 + self.draining_in_progress = True + try: + while self.deque: + if self.conn.check: + data = self.deque.popleft() + self.comms.send(data) + data_drained+=1 + log.info("Data Drain: {}".format(data)) + else: #if internet conncetivity breaks while draining + log.warning("Internet broke while draining.") + break + if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn. + data_drained=0 + time.sleep(self.draining_frequency) + except Exception as e: + log.warning("Internet connectivity broke while draining.") + raise e + finally: + self.draining_in_progress = False + self._offlineQLock.release() + + def start_drain(self): + queueDrain = threading.Thread(target=self._drain) + queueDrain.daemon = True + queueDrain.start() diff --git a/liota/dcc_comms/check_connection.py b/liota/dcc_comms/check_connection.py new file mode 100644 index 00000000..d3e8df4a --- /dev/null +++ b/liota/dcc_comms/check_connection.py @@ -0,0 +1,25 @@ +import os +import threading +import time + +class CheckConnection: + def __init__(self, interval=1, hostname = "8.8.8.8"): + self.interval = interval + self.hostname = hostname + self.check = 1 + self.thread = threading.Thread(target=self.run) + self.thread.daemon = True + self.thread.start() + + def run(self): + while True: + self.check = self.check_internet() + time.sleep(self.interval) + + def check_internet(self): + response = os.system("ping -c 1 " + self.hostname + " > /dev/null 2>&1") + if response == 0: + pingstatus = 1 + else: + pingstatus = 0 + return pingstatus diff --git a/liota/dccs/aws_iot.py b/liota/dccs/aws_iot.py index 9b5a6934..ab11d848 100644 --- a/liota/dccs/aws_iot.py +++ b/liota/dccs/aws_iot.py @@ -49,13 +49,14 @@ class AWSIoT(DataCenterComponent): """ DCC for AWSIoT Platform. """ - def __init__(self, con, enclose_metadata=False): + def __init__(self, con, enclose_metadata=False, buffering_params=None): """ :param con: DccComms Object :param enclose_metadata: Include Gateway, Device and Metric names as part of payload or not + :param buffering_params: BufferingParams object, for offline_storage of data """ super(AWSIoT, self).__init__( - comms=con + comms=con, buffering_params=buffering_params ) self.enclose_metadata = enclose_metadata diff --git a/liota/dccs/dcc.py b/liota/dccs/dcc.py index dfa867f1..c206c89a 100644 --- a/liota/dccs/dcc.py +++ b/liota/dccs/dcc.py @@ -31,28 +31,40 @@ # ----------------------------------------------------------------------------# import logging +import json from abc import ABCMeta, abstractmethod from liota.entities.entity import Entity from liota.dcc_comms.dcc_comms import DCCComms from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.dcc_comms.check_connection import CheckConnection +from liota.core.offline_queue import OfflineQueue +from liota.core.offline_database import OfflineDatabase +from liota.lib.utilities.offline_buffering import BufferingParams log = logging.getLogger(__name__) - class DataCenterComponent: - """ Abstract base class for all DCCs. """ __metaclass__ = ABCMeta @abstractmethod - def __init__(self, comms): + def __init__(self, comms, buffering_params): if not isinstance(comms, DCCComms): log.error("DCCComms object is expected.") raise TypeError("DCCComms object is expected.") self.comms = comms + self.buffering_params = buffering_params + if self.buffering_params is not None: + self.persistent_storage = self.buffering_params.persistent_storage + self.data_drain_size = self.buffering_params.data_drain_size + self.draining_frequency = self.buffering_params.draining_frequency + self.drop_oldest = self.buffering_params.drop_oldest + self.queue_size = self.buffering_params.queue_size + self.conn = CheckConnection() + self.offline_buffering_enabled = False #False means offline buffering/storage is off else on # ----------------------------------------------------------------------- # Implement this method in subclasses and do actual registration. @@ -80,11 +92,63 @@ def publish(self, reg_metric): if not isinstance(reg_metric, RegisteredMetric): log.error("RegisteredMetric object is expected.") raise TypeError("RegisteredMetric object is expected.") + message = self._format_data(reg_metric) - if hasattr(reg_metric, 'msg_attr'): - self.comms.send(message, reg_metric.msg_attr) + if message is not None: + if self.buffering_params is not None: + if self.conn.check: + if self.offline_buffering_enabled: #checking if buffering is enabled or not, incase internet comes back after disconnectivity + self.offline_buffering_enabled = False + if self.persistent_storage is True: + log.info("Draining starts.") + self.offline_database.start_drain() + else: + self.offlineQ.start_drain() + try: + if hasattr(reg_metric, 'msg_attr'): + self.comms.send(message, reg_metric.msg_attr) + else: + self.comms.send(message, None) + except Exception as e: + raise e + else: #if no internet connectivity + if self.persistent_storage is True: + table_name = self.__class__.__name__ + type(self.comms).__name__ + self._start_database_storage(table_name, message) + else: + self._start_queuing(message) + else: + if hasattr(reg_metric, 'msg_attr'): + self.comms.send(message, reg_metric.msg_attr) + else: + self.comms.send(message, None) + + def _start_queuing(self, message): + if self.offline_buffering_enabled is False: + self.offline_buffering_enabled = True + try: + if self.offlineQ.draining_in_progress: + self.offlineQ.append(message) + except Exception as e: + self.offlineQ = OfflineQueue(comms=self.comms, conn=self.conn, queue_size=self.queue_size, + data_drain_size=self.data_drain_size, drop_oldest=self.drop_oldest, + draining_frequency=self.draining_frequency) + log.info("Offline queueing started.") + self.offlineQ.append(message) + + def _start_database_storage(self, table_name, message): + if self.offline_buffering_enabled is False: + self.offline_buffering_enabled = True + try: + if self.offline_database.draining_in_progress: + self.offline_database.add(message) + except Exception as e: + self.offline_database = OfflineDatabase(table_name=table_name, comms=self.comms, conn=self.conn, + data_drain_size=self.data_drain_size, draining_frequency=self.draining_frequency) + log.info("Database created.") + self.offline_database.add(message) else: - self.comms.send(message, None) + self.offline_database.add(message) @abstractmethod def set_properties(self, reg_entity, properties): @@ -95,4 +159,5 @@ def unregister(self, entity_obj): if not isinstance(entity_obj, Entity): raise TypeError -class RegistrationFailure(Exception): pass +class RegistrationFailure(Exception): + pass diff --git a/liota/dccs/graphite.py b/liota/dccs/graphite.py index 24b2ff55..0d835d23 100644 --- a/liota/dccs/graphite.py +++ b/liota/dccs/graphite.py @@ -29,20 +29,22 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # # THE POSSIBILITY OF SUCH DAMAGE. # # ----------------------------------------------------------------------------# + import logging from liota.dccs.dcc import DataCenterComponent from liota.entities.metrics.registered_metric import RegisteredMetric from liota.entities.metrics.metric import Metric from liota.entities.registered_entity import RegisteredEntity - log = logging.getLogger(__name__) - class Graphite(DataCenterComponent): - def __init__(self, comms): + def __init__(self, comms, buffering_params=None): + ''' + :param buffering_params: BufferingParams object, for offline_storage of data + ''' super(Graphite, self).__init__( - comms=comms + comms=comms,buffering_params=buffering_params ) def register(self, entity_obj): diff --git a/liota/dccs/iotcc.py b/liota/dccs/iotcc.py index f885a3c4..740c57bb 100755 --- a/liota/dccs/iotcc.py +++ b/liota/dccs/iotcc.py @@ -57,7 +57,13 @@ class IotControlCenter(DataCenterComponent): """ - def __init__(self, con): + def __init__(self, con, buffering_params=None): + ''' + :param buffering_params: BufferingParams object, for offline_storage of data + ''' + super(IotControlCenter, self).__init__( + buffering_params=buffering_params + ) log.info("Logging into DCC") self._version = 20171023 self.comms = con diff --git a/liota/lib/utilities/README.md b/liota/lib/utilities/README.md index 8b137891..5a389e73 100644 --- a/liota/lib/utilities/README.md +++ b/liota/lib/utilities/README.md @@ -1 +1,42 @@ +# Offline data storage +If the client faces network disconnectivity, publish message can be stored as a persistent storage or in a temporary offline queue in which publish data will be added to an internal queue until the number of queued-up requests reaches the size limit of the queue. If the size of the queue is defined as negative integer it will act as a infinite queue. One can also choose the queue behaviour after it reaches it's specified size. If drop_oldest behaviour is set to be true, oldest publish message is dropped else the newest publish messages are dropped. One should specify the draining frequency in each case, which implies how data which has been stored will be published once the network connectivity is established. +You can also specify data_drain_size which speicifes ow much data will be drained at once after the internet connectivity is established again. By default both are set to 1. +# Example +By default buffering_params is set to None, i.e buffering mechanism is disabled. +Suppose we want to create a persistent storage, while creating instance of DCC, we would pass the an instance of Buffering class along with it. + +``` +buffering = Buffering(persistent_storage=True, data_drain_size=10, draining_frequency=1) +graphite = Graphite(SocketDccComms(ip=config['GraphiteIP'],port=8080), + offline_buffering=buffering) +``` +Here data_drain_size is 1 and draining_frequency is 1 which specifies 10 messages will be sent per second. +For persistent storage a database will be created by the name of storage.db which will store all the messages while network connectivity is broken. +Once network connectivity is back messages will be removed from database as they get published. +In case of ```persistent_storage``` as ```False``` the queueing mechanism will be used by default, you can specify queue_size and other parameters like drop_oldest, data_drain_size and draining_frequency: +``` +buffering = Buffering(queue_size=-1,data_drain_size=10, draining_frequency=1) +``` +will create a queueing mechanism with infinite size and drop_behaviour by default is true, data_drain_size and draining_frequency can be any positive integer. +For queue with size 3 and drop_oldest behaviour set to true, +``` +buffering = Buffering(queue_size=3, drop_oldest=True, draining_frequency=1) +``` +As the publish message arrives the queue will be like this after 3 publish message arrive: +``` +['msg1', 'msg2', 'msg3'] +``` +As the fourth publish message arrives: +``` +['msg2', 'msg3', 'msg4'] +``` +For the fifth publish message: +``` +['msg3', 'msg4', 'msg5'] +``` +Similarly, if the drop_oldest behaviour is set to False: +``` +['msg1', 'msg2', 'msg3'] +``` +After this any new coming publish message will be dropped. diff --git a/liota/lib/utilities/offline_buffering.py b/liota/lib/utilities/offline_buffering.py new file mode 100644 index 00000000..1c4c65c3 --- /dev/null +++ b/liota/lib/utilities/offline_buffering.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging + +log = logging.getLogger(__name__) + +class BufferingParams: + def __init__(self, queue_size=0, persistent_storage=False, data_drain_size=10, drop_oldest=True, draining_frequency=1): + self.persistent_storage = persistent_storage + self.queue_size = queue_size + self.data_drain_size = data_drain_size + self.drop_oldest = drop_oldest + self.draining_frequency = draining_frequency diff --git a/packages/graphite_offline_buffering.py b/packages/graphite_offline_buffering.py new file mode 100644 index 00000000..3199e32a --- /dev/null +++ b/packages/graphite_offline_buffering.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["edge_systems/dell5k/edge_system"] + + +class PackageClass(LiotaPackage): + """ + This package creates a Graphite DCC object and registers system on + Graphite to acquire "registered edge system", i.e. graphite_edge_system. + """ + + def run(self, registry): + import copy + from liota.dccs.graphite import Graphite + from liota.dcc_comms.socket_comms import SocketDccComms + from liota.lib.utilities.offline_buffering import BufferingParams + + # Acquire resources from registry + # Creating a copy of system object to keep original object "clean" + edge_system = copy.copy(registry.get("edge_system")) + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # This is for persistent storage. + offline_buffering = BufferingParams(persistent_storage=True, data_drain_size=10, draining_frequency=1) + # Initialize DCC object with transport + self.graphite = Graphite( + SocketDccComms(ip=config['GraphiteIP'], + port=config['GraphitePort']), buffering_params=offline_buffering + ) + + # Register gateway system + graphite_edge_system = self.graphite.register(edge_system) + + registry.register("graphite", self.graphite) + registry.register("graphite_edge_system", graphite_edge_system) + + def clean_up(self): + self.graphite.comms.client.close()