Skip to content

Commit

Permalink
Adding support for subsetting of the lidar frames and outputting comp…
Browse files Browse the repository at this point in the history
…ressed frames onto mqtt.
  • Loading branch information
freol35241 committed May 10, 2022
1 parent 1b8497d commit afb0cc5
Showing 1 changed file with 38 additions and 11 deletions.
49 changes: 38 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Main entrypoint for this application"""
import logging
from typing import Any
import warnings
import threading
from contextlib import closing
from functools import partial
from datetime import datetime, timezone
import zlib

import numpy as np
from scipy.spatial.transform import Rotation
Expand All @@ -28,6 +30,7 @@
MQTT_PASSWORD: str = env("MQTT_PASSWORD", None)

MQTT_TOPIC_POINTCLOUD: str = env("MQTT_TOPIC_POINTCLOUD")
MQTT_TOPIC_POINTCLOUD_COMPRESSED: str = env("MQTT_TOPIC_POINTCLOUD_COMPRESSED", None)

OUSTER_HOSTNAME: str = env("OUSTER_HOSTNAME")

Expand All @@ -36,7 +39,8 @@
OUSTER_ATTITUDE: list = env.list(
"OUSTER_ATTITUDE", [0, 0, 0], subcast=float, validate=lambda x: len(x) == 3
)
POINTCLOUD_FREQUENCY = env.float("POINTCLOUD_FREQUENCY", default=2)
POINTCLOUD_FREQUENCY: float = env.float("POINTCLOUD_FREQUENCY", default=2)
POINTCLOUD_SUBSET_SAMPLING: int = env.int("POINTCLOUD_SUBSET_SAMPLING", default=1)

LOG_LEVEL = env.log_level("LOG_LEVEL", logging.WARNING)

Expand Down Expand Up @@ -67,9 +71,17 @@ def rotate_pcd(pcd: np.ndarray, attitude: list) -> np.ndarray:
np.ndarray: The rotated point cloud (in NED frame)
"""
points = pcd.reshape(-1, pcd.shape[-1])
LOGGER.debug("Rotating %d points using attitude: %s", len(points), attitude)
subset = points[::POINTCLOUD_SUBSET_SAMPLING]

LOGGER.debug(
"Subsetting %d points from a total of %d received points",
len(subset),
len(points),
)

LOGGER.debug("Rotating %d points using attitude: %s", len(subset), attitude)
transform = Rotation.from_euler("zyx", attitude[::-1], degrees=True)
return transform.apply(points)
return transform.apply(subset)


def to_brefv(pcd: np.ndarray) -> Envelope:
Expand All @@ -82,14 +94,16 @@ def to_brefv(pcd: np.ndarray) -> Envelope:

LOGGER.debug("Assembled into brefv envelope: %s", envelope)

return envelope
return envelope.json()


def compress(payload: str) -> bytes:
"""Compress the payload uzing zlib"""
return zlib.compress(payload.encode())

def to_mqtt(envelope: dict):
"""Publish an envelope to a mqtt topic"""

topic = MQTT_TOPIC_POINTCLOUD
payload = envelope.json()
def to_mqtt(payload: Any, topic: str):
"""Publish a payload to a mqtt topic"""

LOGGER.debug("Publishing on %s with payload: %s", topic, payload)
try:
Expand All @@ -106,9 +120,22 @@ def to_mqtt(envelope: dict):
# Build pipeline
LOGGER.info("Building pipeline...")
source = Stream()
source.latest().rate_limit(1 / POINTCLOUD_FREQUENCY).map(
partial(rotate_pcd, attitude=OUSTER_ATTITUDE)
).map(to_brefv).sink(to_mqtt)
pipe_to_brefv = (
source.latest()
.rate_limit(1 / POINTCLOUD_FREQUENCY)
.map(partial(rotate_pcd, attitude=OUSTER_ATTITUDE))
.map(to_brefv)
)
pipe_to_brefv.sink(partial(to_mqtt, topic=MQTT_TOPIC_POINTCLOUD))

if MQTT_TOPIC_POINTCLOUD_COMPRESSED:
LOGGER.info(
"Setting up handling of compressed output on topic %s",
MQTT_TOPIC_POINTCLOUD_COMPRESSED,
)
pipe_to_brefv.map(compress).sink(
partial(to_mqtt, topic=MQTT_TOPIC_POINTCLOUD_COMPRESSED)
)

LOGGER.info("Connecting to MQTT broker...")
mq.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
Expand Down

0 comments on commit afb0cc5

Please sign in to comment.