Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MQTT dispatcher #220

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,14 @@ The following [dispatchers](./telemetry/producer.go#L10-L19) are supported
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Logger: This is a simple STDOUT logger that serializes the protos to json.

* MQTT: Configure using the config.json file. See implementation in [config/config.go](./config/config.go)
* See detailed MQTT information in the [MQTT README](./datastore/mqtt/README.md)
* Logger: Simple STDOUT logger that serializes protos to JSON

>NOTE: To add a new dispatcher, please provide integration tests and updated documentation. To serialize dispatcher data as json instead of protobufs, add a config `transmit_decoded_records` and set value to `true` as shown [here](config/test_configs_test.go#L186)

## Reliable Acks
Fleet telemetry can send ack messages back to the vehicle. This is useful for applications that need to ensure the data was received and processed. To enable this feature, set `reliable_ack_sources` to one of configured dispatchers (`kafka`,`kinesis`,`pubsub`,`zmq`) in the config file. Reliable acks can only be set to one dispatcher per recordType. See [here](./test/integration/config.json#L8) for sample config.
Fleet telemetry can send ack messages back to the vehicle. This is useful for applications that need to ensure the data was received and processed. To enable this feature, set `reliable_ack_sources` to one of configured dispatchers (`kafka`,`kinesis`,`pubsub`,`zmq`, `mqtt`) in the config file. Reliable acks can only be set to one dispatcher per recordType. See [here](./test/integration/config.json#L8) for sample config.

## Metrics
Configure and use Prometheus or a StatsD-interface supporting data store for metrics. The integration test runs fleet telemetry with [grafana](https://grafana.com/docs/grafana/latest/datasources/google-cloud-monitoring/), which is compatible with prometheus. It also has an example dashboard which tracks important metrics related to the hosted server. Sample screenshot for the [sample dashboard](./test/integration/grafana/provisioning/dashboards/dashboard.json):-
Expand Down
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/teslamotors/fleet-telemetry/datastore/googlepubsub"
"github.com/teslamotors/fleet-telemetry/datastore/kafka"
"github.com/teslamotors/fleet-telemetry/datastore/kinesis"
"github.com/teslamotors/fleet-telemetry/datastore/mqtt"
"github.com/teslamotors/fleet-telemetry/datastore/simple"
"github.com/teslamotors/fleet-telemetry/datastore/zmq"
logrus "github.com/teslamotors/fleet-telemetry/logger"
Expand Down Expand Up @@ -99,6 +100,9 @@ type Config struct {

// Airbrake config
Airbrake *Airbrake

// MQTT config
MQTT *mqtt.Config `json:"mqtt,omitempty"`
}

type Airbrake struct {
Expand Down Expand Up @@ -312,6 +316,17 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
producers[telemetry.ZMQ] = zmqProducer
}

if _, ok := requiredDispatchers[telemetry.MQTT]; ok {
if c.MQTT == nil {
return nil, errors.New("Expected MQTT to be configured")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

golang errors usually starts with lowercase :-

errors.New("expected MQTT to be configured")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other related errors in the same method use a capital letter:
"Expected ZMQ/Kinesis/PubSub/Kafka/MQTT to be configured"

Maybe it is better if someone creates a separate pull request to fix all error messages?
(and then maybe also have a look at capital letters used in errors.py)

}
mqttProducer, err := mqtt.NewProducer(context.Background(), c.MQTT, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.MQTT], logger)
if err != nil {
return nil, err
}
producers[telemetry.MQTT] = mqttProducer
}

dispatchProducerRules := make(map[string][]telemetry.Producer)
for recordName, dispatchRules := range c.Records {
var dispatchFuncs []telemetry.Producer
Expand Down
87 changes: 87 additions & 0 deletions datastore/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# MQTT Datastore

This package implements an MQTT (Message Queuing Telemetry Transport) producer for the Fleet Telemetry system. MQTT is particularly well-suited for fleet telemetry systems due to its lightweight, publish-subscribe architecture.

## Overview

The MQTT datastore allows the Fleet Telemetry system to publish vehicle data, alerts, and errors to an MQTT broker. It uses the Paho MQTT client library for Go and implements the `telemetry.Producer` interface.

## Key Design Decisions

1. **Separate topics for different data types**: We use distinct topic structures for metrics, alerts, and errors to allow easy filtering and processing by subscribers.

2. **Individual field publishing**: Each metric field is published as a separate MQTT message, allowing for granular updates and subscriptions.

3. **Current state and history for alerts**: We maintain both the current state and history of alerts, supporting both clients that require real-time monitoring and clients that require historical analysis.

4. **Configurable QoS and retention**: The MQTT QoS level and message retention can be configured to balance between performance and reliability.

5. **Reliable acknowledgment support**: The producer supports reliable acknowledgment for specified transaction types. However, it's important to note that the entire packet from the vehicle will be not be acknowledged if any of the related MQTT publish operations fail. This ensures data integrity by preventing partial updates and allows for retrying the complete set of data in case of any publishing issues.

## Configuration

The MQTT producer is configured using a JSON object with the following fields:

- `broker`: (string) The MQTT broker "host:port". (for example "localhost:1883")
- `client_id`: (string) A unique identifier for the MQTT client.
- `username`: (string) The username for MQTT broker authentication. (optional)
- `password`: (string) The password for MQTT broker authentication. (optional)
- `topic_base`: (string) The base topic for all MQTT messages.
- `qos`: (number) The Quality of Service level (0, 1, or 2). Default: 0
- `retained`: (boolean) Whether messages should be retained by the broker. Default: false
- `connect_timeout_ms`: (number) Connection timeout in milliseconds. Default: 30000
- `publish_timeout_ms`: (number) Publish operation timeout in milliseconds. Default: 2500
- `disconnect_timeout_ms`: (number) Disconnection timeout in milliseconds. Default: 250
- `connect_retry_interval_ms`: (number) Interval between connection retry attempts in milliseconds. Default: 10000
- `keep_alive_seconds`: (number) Keep-alive interval in seconds. Default: 30

Example configuration:

```json
{
"mqtt": {
"broker": "localhost:1883",
"client_id": "fleet-telemetry",
"username": "your_username",
"password": "your_password",
"topic_base": "telemetry",
"qos": 1,
"retained": false,
"connect_timeout_ms": 30000,
"publish_timeout_ms": 2500,
"disconnect_timeout_ms": 250,
"connect_retry_interval_ms": 10000,
"keep_alive_seconds": 30
}
}
```

The MQTT producer will use default values for any omitted fields as specified above.

## Topic Structure

- Metrics: `<topic_base>/<VIN>/v/<field_name>`
- Alerts (current state): `<topic_base>/<VIN>/alerts/<alert_name>/current`
- Alerts (history): `<topic_base>/<VIN>/alerts/<alert_name>/history`
- Errors: `<topic_base>/<VIN>/errors/<error_name>`

## Payload Formats

- Metrics: `{"value": <field_value>}`
- Alerts: `{"Name": <string>, "StartedAt": <timestamp>, "EndedAt": <timestamp>, "Audiences": [<string>]}`
- Errors: `{"Name": <string>, "Body": <string>, "Tags": {<string>: <string>}, "CreatedAt": <timestamp>}`

Note: The field contents and type are determined by the car. Fields may have their types updated with different software and vehicle versions to optimize for precision or space. For example, a float value like the vehicle's speed might be received as 12.3 (numeric) in one version and as "12.3" (string) in another version.

## Error Handling and Reliability

- The producer implements reconnection logic with configurable retry intervals.
- Publish operations have a configurable timeout to prevent blocking indefinitely.
- The producer supports reliable acknowledgment for specified transaction types, ensuring critical data is not lost.

## Performance Considerations

- Each field is published as a separate MQTT message, which can increase network traffic but allows for more granular subscriptions.
- QoS levels can be configured to balance between performance and reliability.
- The producer uses goroutines to handle message publishing asynchronously.

246 changes: 246 additions & 0 deletions datastore/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package mqtt

import (
"context"
"sync"
"time"

pahomqtt "github.com/eclipse/paho.mqtt.golang"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/protos"
"github.com/teslamotors/fleet-telemetry/server/airbrake"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

type MQTTProducer struct {
client pahomqtt.Client
config *Config
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
namespace string
ctx context.Context
ackChan chan (*telemetry.Record)
reliableAckTxTypes map[string]interface{}
}

type Config struct {
Broker string `json:"broker"`
ClientID string `json:"client_id"`
Username string `json:"username"`
Password string `json:"password"`
TopicBase string `json:"topic_base"`
QoS byte `json:"qos"`
Retained bool `json:"retained"`
ConnectTimeout int `json:"connect_timeout_ms"`
PublishTimeout int `json:"publish_timeout_ms"`
DisconnectTimeout int `json:"disconnect_timeout_ms"`
ConnectRetryInterval int `json:"connect_retry_interval_ms"`
KeepAlive int `json:"keep_alive_seconds"`
}

type Metrics struct {
errorCount adapter.Counter
publishCount adapter.Counter
byteTotal adapter.Counter
reliableAckCount adapter.Counter
}

const (
DefaultPublishTimeout = 2500
DefaultConnectTimeout = 30000
DefaultConnectRetryInterval = 10000
DefaultDisconnectTimeout = 250
DefaultKeepAlive = 30
DefaultQoS = 0
)

var (
metricsRegistry Metrics
metricsOnce sync.Once
)

// Allow us to mock the mqtt.NewClient function for testing
var PahoNewClient = pahomqtt.NewClient

func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricCollector, namespace string, airbrakeHandler *airbrake.AirbrakeHandler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metrics)

// Set default values
if config.PublishTimeout == 0 {
config.PublishTimeout = DefaultPublishTimeout
}
if config.ConnectTimeout == 0 {
config.ConnectTimeout = DefaultConnectTimeout
}
if config.DisconnectTimeout == 0 {
config.DisconnectTimeout = DefaultDisconnectTimeout
}
if config.ConnectRetryInterval == 0 {
config.ConnectRetryInterval = DefaultConnectRetryInterval
}
if config.QoS == 0 {
config.QoS = DefaultQoS
}
if config.KeepAlive == 0 {
config.KeepAlive = DefaultKeepAlive
}

opts := pahomqtt.NewClientOptions().
AddBroker(config.Broker).
SetClientID(config.ClientID).
SetUsername(config.Username).
SetPassword(config.Password).
SetConnectRetry(true).
SetAutoReconnect(true).
SetConnectRetryInterval(time.Duration(config.ConnectRetryInterval) * time.Millisecond).
SetConnectTimeout(time.Duration(config.ConnectTimeout) * time.Millisecond).
SetOrderMatters(false).
SetKeepAlive(time.Duration(config.KeepAlive) * time.Second)

client := PahoNewClient(opts)
client.Connect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do any error handling here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectRetry is set. So, the client.Connect() might fail, but that's not a problem because the Paho client will keep trying to connect.
This will make sure a telemetry container can start even if a configured MQTT broker is not online.


return &MQTTProducer{
client: client,
config: config,
logger: logger,
airbrakeHandler: airbrakeHandler,
namespace: namespace,
ctx: ctx,
ackChan: ackChan,
reliableAckTxTypes: reliableAckTxTypes,
}, nil
}

func (p *MQTTProducer) Produce(rec *telemetry.Record) {
if p.ctx.Err() != nil {
return
}

payload, err := rec.GetProtoMessage()
if err != nil {
p.ReportError("mqtt_payload_unmarshal_error", err, p.createLogInfo(rec))
return
}

var tokens []pahomqtt.Token

switch payload := payload.(type) {
case *protos.Payload:
tokens, err = p.processVehicleFields(rec, payload)
case *protos.VehicleAlerts:
tokens, err = p.processVehicleAlerts(rec, payload)
case *protos.VehicleErrors:
tokens, err = p.processVehicleErrors(rec, payload)
default:
p.ReportError("mqtt_unknown_payload_type", nil, p.createLogInfo(rec))
return
}
if err != nil {
metricsRegistry.errorCount.Inc(map[string]string{"record_type": rec.TxType})
p.ReportError("mqtt_process_payload_error", err, p.createLogInfo(rec))
return
}

// Wait for all topics to be published
var publishError bool
startTime := time.Now()
timeout := time.Duration(p.config.PublishTimeout) * time.Millisecond
for _, token := range tokens {
remainingTimeout := timeout - time.Since(startTime)
if remainingTimeout < 0 {
remainingTimeout = 0
}
if err := waitTokenTimeout(token, remainingTimeout); err != nil {
metricsRegistry.errorCount.Inc(map[string]string{"record_type": rec.TxType})
p.ReportError("mqtt_publish_error", err, p.createLogInfo(rec))
publishError = true
}
}

// Only process reliable ACK if no token errors were reported
if !publishError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is tricky with the things you have correctly set up. you are asking the vehicle to resend the whole data if partial uploads to mqtt failed. I think we should just attempt to send the whole payload

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above which explains why the MQTT topic structure is important. Topics are not the same as raw packets from the car.

p.ProcessReliableAck(rec)
}
}

// waitTokenTimeout waits for a token to complete or timeout.
// It also handles the edge case where the wait time is 0.
func waitTokenTimeout(t pahomqtt.Token, d time.Duration) error {
if d == 0 {
select {
case <-t.Done():
return t.Error()
default:
return pahomqtt.TimedOut
}
}
if !t.WaitTimeout(d) {
return pahomqtt.TimedOut
}
return t.Error()
}

func (p *MQTTProducer) updateMetrics(txType string, byteCount int) {
metricsRegistry.byteTotal.Add(int64(byteCount), map[string]string{"record_type": txType})
metricsRegistry.publishCount.Inc(map[string]string{"record_type": txType})
}

func (p *MQTTProducer) createLogInfo(rec *telemetry.Record) logrus.LogInfo {
logInfo := logrus.LogInfo{
"topic_name": telemetry.BuildTopicName(p.namespace, rec.TxType),
"txid": rec.Txid,
"vin": rec.Vin,
}
return logInfo
}

func (p *MQTTProducer) ProcessReliableAck(entry *telemetry.Record) {
_, ok := p.reliableAckTxTypes[entry.TxType]
if ok {
p.ackChan <- entry
metricsRegistry.reliableAckCount.Inc(map[string]string{"record_type": entry.TxType})
}
}

func (p *MQTTProducer) ReportError(message string, err error, logInfo logrus.LogInfo) {
p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo)
p.logger.ErrorLog(message, err, logInfo)
}

func (p *MQTTProducer) Close() error {
p.client.Disconnect(uint(p.config.DisconnectTimeout))
return nil
}

func registerMetrics(metricsCollector metrics.MetricCollector) {
metricsRegistry.errorCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "mqtt_err",
Help: "The number of errors while publishing to MQTT.",
Labels: []string{"record_type"},
})

metricsRegistry.publishCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "mqtt_publish_total",
Help: "The number of values published to MQTT.",
Labels: []string{"record_type"},
})

metricsRegistry.byteTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "mqtt_publish_total_bytes",
Help: "The number of JSON bytes published to MQTT.",
Labels: []string{"record_type"},
})

metricsRegistry.reliableAckCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "mqtt_reliable_ack_total",
Help: "The number of records published to MQTT topics for which we sent a reliable ACK.",
Labels: []string{"record_type"},
})
}

func registerMetricsOnce(metricsCollector metrics.MetricCollector) {
metricsOnce.Do(func() { registerMetrics(metricsCollector) })
}
Loading
Loading