Skip to content

Commit

Permalink
MQTT publisher improvements
Browse files Browse the repository at this point in the history
Wait for reconnection rather than attempting publishing.
If failed to send a message, don't increase metrics
  • Loading branch information
mkuratczyk committed Sep 11, 2024
1 parent 28bc164 commit d5ea932
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions pkg/mqtt_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,22 @@ func (p MqttPublisher) StartRateLimited(ctx context.Context) {
}

func (p MqttPublisher) Send() {
if !p.Connection.IsConnected() {
time.Sleep(1 * time.Second)
return
}
utils.UpdatePayload(p.Config.UseMillis, &p.msg)
startTime := time.Now()
token := p.Connection.Publish(p.Topic, byte(p.Config.MqttPublisher.QoS), false, p.msg)
token.Wait()
latency := time.Since(startTime)
if token.Error() != nil {
log.Error("message sending failure", "id", p.Id, "error", token.Error())
time.Sleep(1 * time.Second)
} else {
metrics.MessagesPublished.Inc()
metrics.PublishingLatency.Update(latency.Seconds())
log.Debug("message sent", "id", p.Id, "destination", p.Topic, "latency", latency)
}
metrics.MessagesPublished.Inc()
metrics.PublishingLatency.Update(latency.Seconds())
log.Debug("message sent", "id", p.Id, "destination", p.Topic, "latency", latency)
}

func (p MqttPublisher) Stop(reason string) {
Expand Down

0 comments on commit d5ea932

Please sign in to comment.