From a824c4be33e29418e65b39001b306e3a44f93602 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 4 Mar 2023 15:03:22 +0100 Subject: [PATCH] Add decoder for acquiring data in NDJSON format --- CHANGES.rst | 1 + kotori/daq/decoder/__init__.py | 7 ++++++ kotori/daq/decoder/ndjson.py | 40 ++++++++++++++++++++++++++++++++++ setup.py | 4 ++++ test/settings/mqttkit.py | 1 + test/test_daq_mqtt.py | 38 +++++++++++++++++++++++++++++++- test/util.py | 7 ++++++ 7 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 kotori/daq/decoder/ndjson.py diff --git a/CHANGES.rst b/CHANGES.rst index 2e5e4636..5a5e81da 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -8,6 +8,7 @@ incomplete - Add possibility to acquire bulk readings in JSON format - Add possibility to acquire bulk readings in compact JSON format, with timestamps as keys +- Add decoder for acquiring data in NDJSON format in progress diff --git a/kotori/daq/decoder/__init__.py b/kotori/daq/decoder/__init__.py index f9a8a874..a96826ca 100644 --- a/kotori/daq/decoder/__init__.py +++ b/kotori/daq/decoder/__init__.py @@ -2,6 +2,7 @@ # (c) 2019-2021 Andreas Motl from kotori.daq.decoder.airrohr import AirrohrDecoder from kotori.daq.decoder.json import CompactTimestampedJsonDecoder +from kotori.daq.decoder.ndjson import NdJsonDecoder from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder from kotori.daq.decoder.schema import MessageType @@ -24,6 +25,12 @@ def probe(self): if 'slot' not in self.topology: return False + # NDJSON format + if self.topology.slot.endswith('data.ndjson'): + self.info.message_type = MessageType.DATA_CONTAINER + self.info.decoder = NdJsonDecoder + return True + # Compact JSON format, with timestamps as keys if self.topology.slot.endswith('tc.json'): self.info.message_type = MessageType.DATA_CONTAINER diff --git a/kotori/daq/decoder/ndjson.py b/kotori/daq/decoder/ndjson.py new file mode 100644 index 00000000..2e6f0b0f --- /dev/null +++ b/kotori/daq/decoder/ndjson.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# (c) 2023 Andreas Motl + + +class NdJsonDecoder: + """ + Decode NDJSON payloads. NDJSON is a newline-delimited JSON format. + It is suitable for submitting multiple JSON records in bulk, or for + streaming them. + + NDJSON has been called LDJSON, and is also known as JSON Lines, see + also JSON streaming. + + - http://ndjson.org/ + - https://jsonlines.org/ + - https://en.wikipedia.org/wiki/JSON_streaming + + Documentation + ============= + - https://getkotori.org/docs/handbook/decoders/ndjson.html (not yet) + + Example + ======= + :: + + {"temperature":21.42,"humidity":41.55} + {"temperature":42.84,"humidity":83.1} + + """ + + @staticmethod + def decode(payload): + + # Decode from NDJSON, using pandas. + import pandas as pd + df = pd.read_json(payload, lines=True) + + # Transform to records again. + data = df.to_dict(orient="records") + return data diff --git a/setup.py b/setup.py index b02a4c1b..39397021 100644 --- a/setup.py +++ b/setup.py @@ -75,6 +75,10 @@ #'txmongo==16.3.0', 'pymongo>=3.11.0,<5', ], + 'daq_ndjson': [ + pandas_spec, + numpy_spec, + ], 'daq_geospatial': [ 'Geohash>=1.0,<2', 'geopy>=1.12.0,<3', diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index e7e4b910..e195eb49 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -28,6 +28,7 @@ class TestSettings: mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json' mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__' mqtt_topic_json_compact = 'mqttkit-1/itest/foo/bar/tc.json' + mqtt_topic_ndjson = 'mqttkit-1/itest/foo/bar/data.ndjson' mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json' # HTTP channel settings. diff --git a/test/test_daq_mqtt.py b/test/test_daq_mqtt.py index eb0cb857..0ee6a18b 100644 --- a/test/test_daq_mqtt.py +++ b/test/test_daq_mqtt.py @@ -7,7 +7,7 @@ from twisted.internet import threads from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_MQTT -from test.util import mqtt_json_sensor, sleep, mqtt_sensor +from test.util import mqtt_json_sensor, sleep, mqtt_sensor, mqtt_ndjson_sensor logger = logging.getLogger(__name__) @@ -105,6 +105,42 @@ def test_mqtt_to_influxdb_json_compact_bulk(machinery, create_influxdb, reset_in assert record == {u'time': '2021-01-19T18:56:08Z', u'temperature': 42.84, u'humidity': 83.1} +@pytest_twisted.inlineCallbacks +@pytest.mark.mqtt +def test_mqtt_to_influxdb_ndjson_bulk(machinery, create_influxdb, reset_influxdb): + """ + Publish multiple readings in NDJSON format to MQTT broker + and proof they are stored in the InfluxDB database. + + TODO: Grafana provisioning failed! + """ + + # Submit multiple measurements, without timestamp. + data = [ + { + 'temperature': 21.42, + 'humidity': 41.55, + }, + { + 'temperature': 42.84, + 'humidity': 83.1, + }, + ] + yield threads.deferToThread(mqtt_ndjson_sensor, settings.mqtt_topic_ndjson, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB. + record = influx_sensors.get_record(index=0) + del record['time'] + assert record == {u'temperature': 21.42, u'humidity': 41.55} + + record = influx_sensors.get_record(index=1) + del record['time'] + assert record == {u'temperature': 42.84, u'humidity': 83.2} + + @pytest_twisted.inlineCallbacks @pytest.mark.mqtt @pytest.mark.legacy diff --git a/test/util.py b/test/util.py index 6569996d..f43dbdd5 100644 --- a/test/util.py +++ b/test/util.py @@ -7,6 +7,7 @@ import string import sys +import pandas as pd import pytest import requests from influxdb import InfluxDBClient @@ -203,6 +204,12 @@ def mqtt_json_sensor(topic, data): return mqtt_sensor(topic, payload) +def mqtt_ndjson_sensor(topic, data): + df = pd.DataFrame.from_records(data) + payload = df.to_json(orient="records", lines=True) + return mqtt_sensor(topic, payload) + + def mqtt_sensor(topic, payload): logger.info('MQTT: Submitting reading')