Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generate Influxdb database #219

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/source/InstallODM2AdminWithDocker.rst
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
ODM2 Admin Docker Image Creation
================================

Requirements to run [docker image](https://cloud.docker.com/repository/docker/miguelcleon/odm2-admin):
Requirements to run [docker image](https://hub.docker.com/r/lsetiawan/odm2admin):

1. Docker installed on Linux, MacOS, or Windows.

To run:

.. code:: bash

$ docker run -d -p 8010:8010 --name odm2admintest miguelcleon/odm2-admin:latest
$ docker run -d -p 8010:8010 --name odm2admintest lsetiawan/odm2admin:latest

Next, in order to login you will need to create a Django superuser login
To do that first you will need to attach bash to the container, then
Expand Down
130 changes: 130 additions & 0 deletions odm2admin/management/commands/generate_influx_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from datetime import datetime

from django.core.management import BaseCommand

# from dataloader.helpers import InfluxHelper
# from odm2admin.models import SiteSensor


from datetime import datetime

import pandas as pd
import numpy as np
import os

from django.conf import settings
from django.db.models import F

from influxdb import DataFrameClient
from influxdb.exceptions import InfluxDBClientError

from odm2admin.models import Timeseriesresultvalues
from odm2admin.models import Timeseriesresults
from odm2admin.models import Results
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "templatesAndSettings.settings")

class InfluxHelper(object):
class MissingConnectionException(Exception):
"""Client is not defined or connected"""
pass

def __init__(self, *args, **kwargs):
self.client = None
self.batch_size = 10000
self.connection_info = settings.INFLUX_CONNECTION

def connect_to_dataframe_client(self):
self.client = DataFrameClient(
host=self.connection_info['host'],
port=self.connection_info['port'],
username=self.connection_info['username'],
password=self.connection_info['password'],
database=self.connection_info['database']
)

def recreate_database(self):
read_user = self.connection_info['read_username']
database_name = self.connection_info['database']

if not self.client:
raise InfluxHelper.MissingConnectionException('InfluxDB client is not connected.')

self.client.drop_database(database_name)
self.client.create_database(database_name)
self.client.grant_privilege('read', database_name, read_user)

def write_all_sensor_values(self, sensor):
self.write_sensor_values(sensor, datetime.min)

def get_series_last_value(self, identifier):
query_string = 'select last(DataValue), time from {identifier}'.format(identifier=identifier)
result = self.client.query(query_string, database=self.connection_info['database'])
if result and len(result) == 1:
dataframe = result[identifier] # type: pd.DataFrame
return dataframe.first_valid_index().to_pydatetime()

def write_sensor_values(self, sensor, starting_datetime):
values = Timeseriesresultvalues.objects.filter(valuedatetime__gt=starting_datetime, resultid=sensor.resultid)\
.annotate(DateTime=F('valuedatetime'))\
.annotate(UTCOffset=F('valuedatetimeutcoffset'))\
.annotate(DataValue=F('datavalue'))
values_dataframe = self.prepare_data_values(values)
if values_dataframe.empty:
return 0

result = self.add_dataframe_to_database(values_dataframe, sensor.influx_identifier)
del values_dataframe
return result

def prepare_data_values(self, values_queryset):
dataframe = pd.DataFrame.from_records(values_queryset.values('DateTime', 'UTCOffset', 'DataValue'))
if dataframe.empty:
return dataframe

dataframe['DateTime'] = pd.to_datetime(dataframe['DateTime'])
dataframe.set_index(['DateTime'], inplace=True)
dataframe['DataValue'] = pd.to_numeric(dataframe['DataValue'], errors='coerce').astype(np.float64)
dataframe['UTCOffset'] = pd.to_numeric(dataframe['UTCOffset'], errors='coerce').astype(np.float64)
dataframe.dropna(how='any', inplace=True)
return dataframe

def add_dataframe_to_database(self, dataframe, identifier):
try:
write_success = self.client.write_points(dataframe, identifier, time_precision='s', batch_size=self.batch_size)
return len(dataframe) if write_success else 0
except InfluxDBClientError as e:
print('Error while writing to database {}: {}'.format(identifier, e.message))
return 0

class Command(BaseCommand):
help = 'Copy data values over to the InfluxDB instance.'

def add_arguments(self, parser):
parser.add_argument(
'--clean',
action='store_true',
dest='clean',
help='Drop the influx database before filling up data.',
)

def handle(self, *args, **options):
recreate_database = True # options.get('clean')
dfc = DataFrameClient(host='x',port=18086,
database='x', username='x', password='x')
print('list db')
print(dfc.get_list_database())
helper = InfluxHelper()
helper.client = dfc
helper.connect_to_dataframe_client()
# QS conductivity
sensors = Timeseriesresults.objects.all() # filter(resultid=18580)

if recreate_database:
helper.recreate_database()

for sensor in sensors:
print('- writing data to sensor {}'.format(sensor.resultid))
last_value = helper.get_series_last_value(sensor.resultid.resultuuid)
starting_point = last_value and last_value.replace(tzinfo=None) or datetime.min
result = helper.write_sensor_values(sensor, starting_point)
print('-- {} points written.'.format(result))
Loading