Skip to content
This repository has been archived by the owner on Mar 15, 2021. It is now read-only.

Commit

Permalink
Offline buffering mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
lucifercr07 committed Nov 14, 2017
1 parent fb2009d commit 5d4c63c
Show file tree
Hide file tree
Showing 10 changed files with 547 additions and 14 deletions.
152 changes: 152 additions & 0 deletions liota/core/offline_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------#
# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. #
# #
# Licensed under the BSD 2-Clause License (the “License”); you may not use #
# this file except in compliance with the License. #
# #
# The BSD 2-Clause License #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met:#
# #
# - Redistributions of source code must retain the above copyright notice, #
# this list of conditions and the following disclaimer. #
# #
# - Redistributions in binary form must reproduce the above copyright #
# notice, this list of conditions and the following disclaimer in the #
# documentation and/or other materials provided with the distribution. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"#
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF #
# THE POSSIBILITY OF SUCH DAMAGE. #
# ----------------------------------------------------------------------------#

import logging
import sqlite3
import threading
import time
from liota.dcc_comms.dcc_comms import DCCComms
from liota.dcc_comms.check_connection import CheckConnection

log = logging.getLogger(__name__)

class OfflineDatabase:
def __init__(self, table_name, comms, conn=None, data_drain_size=1, draining_frequency=0):
"""
:param table_name: table_name in which message will be stored
:param comms: comms instance of DCCComms
:param data_drain_size: how many messages will be drained within each draining_frequency secs defined.
:param draining_frequency: frequency with which data will be published after internet connectivity established(like seconds).
"""
if not isinstance(table_name, basestring):
log.error("Table name should be a string.")
raise TypeError("Table name should be a string.")
if not isinstance(comms, DCCComms):
log.error("DCCComms object is expected.")
raise TypeError("DCCComms object is expected.")
if not isinstance(draining_frequency, float) and not isinstance(draining_frequency, int):
log.error("draining_frequency is expected of float or int type.")
raise TypeError("draining_frequency is expected of float or int type.")
try:
assert draining_frequency>=0
except AssertionError as e:
log.error("draining_frequency can't be negative.")
raise e("draining_frequency can't be negative.")
self.table_name = table_name
if conn is None:
self.internet_conn = CheckConnection()
else:
self.internet_conn = conn
self.draining_frequency = draining_frequency
self.data_drain_size = data_drain_size
self.comms = comms
self.flag_conn_open = False
self.draining_in_progress = False
self._offline_db_lock = threading.Lock()
self._create_table()

def _create_table(self):
if self.flag_conn_open is False:
self.conn = sqlite3.connect('storage.db')
try:
with self.conn:
if not self.conn.execute("SELECT name FROM sqlite_master WHERE TYPE='table' AND name= ? ", (self.table_name,)).fetchone():
self.conn.text_factory = str
self.flag_conn_open = True
self.cursor = self.conn.cursor()
self.cursor.execute("CREATE TABLE "+self.table_name+" (Message TEXT)")
self.cursor.close()
del self.cursor
else:
log.info("Table already there!!!")
except Exception as e:
raise e
finally:
self.flag_conn_open = False
self.conn.close()

def add(self, message):
try:
self.conn = sqlite3.connect('storage.db')
self.flag_conn_open = True
with self.conn:
self.cursor = self.conn.cursor()
log.info("Adding data to "+ self.table_name)
self.cursor.execute("INSERT INTO "+self.table_name+"(Message) VALUES (?);", (message,))
self.cursor.close()
del self.cursor
except sqlite3.OperationalError as e:
raise e
finally:
self.conn.close()
self.flag_conn_open = False

def _drain(self):
self._offline_db_lock.acquire()
self.conn = sqlite3.connect('storage.db')
self.flag_conn_open = True
self.draining_in_progress = True
self.cursor = self.conn.cursor()
self.del_cursor = self.conn.cursor()
data_drained = 0
try:
for row in self.cursor.execute("SELECT Message FROM "+self.table_name):
if self.comms is not None and self.internet_conn.check :
try:
self.comms.send(row[0])
log.info("Data Drain: {}".format(row[0]))
data_drained+=1
self.del_cursor.execute("Delete from "+self.table_name+" where rowid IN (Select rowid from "+self.table_name+" limit 1);")
self.conn.commit()
except Exception as e:
raise e
else: #internet connectivity breaks while draining
log.warning("Internet broke while draining")
break
if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn.
data_drained=0
time.sleep(self.draining_frequency)
except Exception as e:
raise e
log.warning("Internet connectivity broke while draining.")
finally:
self.del_cursor.close()
del self.del_cursor
self.conn.close()
self.flag_conn_open = False
self.draining_in_progress = False
self._offline_db_lock.release()

def start_drain(self):
queueDrain = threading.Thread(target=self._drain)
queueDrain.daemon = True
queueDrain.start()
124 changes: 124 additions & 0 deletions liota/core/offline_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------#
# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. #
# #
# Licensed under the BSD 2-Clause License (the “License”); you may not use #
# this file except in compliance with the License. #
# #
# The BSD 2-Clause License #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met:#
# #
# - Redistributions of source code must retain the above copyright notice, #
# this list of conditions and the following disclaimer. #
# #
# - Redistributions in binary form must reproduce the above copyright #
# notice, this list of conditions and the following disclaimer in the #
# documentation and/or other materials provided with the distribution. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"#
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF #
# THE POSSIBILITY OF SUCH DAMAGE. #
# ----------------------------------------------------------------------------#

import logging
from collections import deque
import threading
import time
from liota.dcc_comms.dcc_comms import DCCComms
from liota.dcc_comms.check_connection import CheckConnection

log = logging.getLogger(__name__)

class OfflineQueue:
def __init__(self, queue_size, comms, conn=None, data_drain_size=1, drop_oldest=True, draining_frequency=0):
"""
:param size: size of the offline_queue, if negative implies infinite.
:param drop_oldest: if True oldest data will be dropped after size of queue is exceeded.
:param comms: comms instance of DCCComms
:param data_drain_size: how many messages will be drained within each draining_frequency secs defined.
:param draining_frequency: frequency with which data will be published after internet connectivity established(like seconds).
"""
if not isinstance(queue_size, int):
log.error("Size is expected of int type.")
raise TypeError("Size is expected of int type.")
if not isinstance(comms, DCCComms):
log.error("DCCComms object is expected.")
raise TypeError("DCCComms object is expected.")
if not isinstance(drop_oldest, bool):
log.error("drop_oldest/newest is expected of bool type.")
raise TypeError("drop_oldest is expected of bool type.")
if not isinstance(draining_frequency, float) and not isinstance(draining_frequency, int):
log.error("draining_frequency is expected of float or int type.")
raise TypeError("draining_frequency is expected of float or int type.")
try:
assert queue_size!=0 and draining_frequency>=0
except AssertionError as e:
log.error("Size can't be zero, draining_frequency can't be negative.")
raise e("Size can't be zero, draining_frequency can't be negative.")
self.size = queue_size
self.drop_oldest = drop_oldest
if (self.size>0 and drop_oldest):
self.deque = deque(maxlen=self.size)
else:
self.deque = deque()
self.comms = comms
self.data_drain_size = data_drain_size
if conn is None:
self.conn = CheckConnection()
else:
self.conn = conn
self.draining_frequency = draining_frequency
self.draining_in_progress = False
self._offlineQLock = threading.Lock()

def append(self, data):
if (self.size<0): #for infinite length deque
self.deque.append(data)
elif (self.size>0 and self.drop_oldest): #for deque with drop_oldest=True
if len(self.deque) is self.size:
log.info("Message dropped: {}".format(self.deque[0]))
self.deque.append(data)
else: #for deque with drop_oldest=False
if len(self.deque) is self.size:
log.info("Message dropped: {}".format(data))
else:
self.deque.append(data)

def _drain(self):
self._offlineQLock.acquire()
data_drained = 0
self.draining_in_progress = True
try:
while self.deque:
if self.conn.check:
data = self.deque.popleft()
self.comms.send(data)
data_drained+=1
log.info("Data Drain: {}".format(data))
else: #if internet conncetivity breaks while draining
log.warning("Internet broke while draining.")
break
if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn.
data_drained=0
time.sleep(self.draining_frequency)
except Exception as e:
log.warning("Internet connectivity broke while draining.")
raise e
finally:
self.draining_in_progress = False
self._offlineQLock.release()

def start_drain(self):
queueDrain = threading.Thread(target=self._drain)
queueDrain.daemon = True
queueDrain.start()
25 changes: 25 additions & 0 deletions liota/dcc_comms/check_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import threading
import time

class CheckConnection:
def __init__(self, interval=1, hostname = "8.8.8.8"):
self.interval = interval
self.hostname = hostname
self.check = 1
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()

def run(self):
while True:
self.check = self.check_internet()
time.sleep(self.interval)

def check_internet(self):
response = os.system("ping -c 1 " + self.hostname + " > /dev/null 2>&1")
if response == 0:
pingstatus = 1
else:
pingstatus = 0
return pingstatus
5 changes: 3 additions & 2 deletions liota/dccs/aws_iot.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ class AWSIoT(DataCenterComponent):
"""
DCC for AWSIoT Platform.
"""
def __init__(self, con, enclose_metadata=False):
def __init__(self, con, enclose_metadata=False, buffering_params=None):
"""
:param con: DccComms Object
:param enclose_metadata: Include Gateway, Device and Metric names as part of payload or not
:param buffering_params: BufferingParams object, for offline_storage of data
"""
super(AWSIoT, self).__init__(
comms=con
comms=con, buffering_params=buffering_params
)
self.enclose_metadata = enclose_metadata

Expand Down
Loading

0 comments on commit 5d4c63c

Please sign in to comment.