Skip to content

Commit

Permalink
Initial structure of defender metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
KennethKnudsen97 committed Dec 23, 2024
1 parent 7e7eab8 commit 7fc002b
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 1 deletion.
58 changes: 58 additions & 0 deletions src/defender_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use embassy_sync::blocking_mutex::raw::RawMutex;
use embedded_mqtt::{Publish, Subscribe, SubscribeTopic, ToPayload};
use topics::Topic;

use crate::shadows::Error;

pub mod topics;

pub struct MetricHandler<'a, 'm, M: RawMutex> {
mqtt: &'m embedded_mqtt::MqttClient<'a, M>,
}

impl<'a, 'm, M: RawMutex> MetricHandler<'a, 'm, M> {
async fn publish_and_subscribe(
&self,
payload: impl ToPayload,
metric_name: &str,
) -> Result<embedded_mqtt::Subscription<'a, '_, M, 2>, Error> {
let sub = self
.mqtt
.subscribe::<2>(
Subscribe::builder()
.topics(&[
SubscribeTopic::builder()
.topic_path(
Topic::Accepted
.format::<64>(self.mqtt.client_id(), metric_name)?
.as_str(),
)
.build(),
SubscribeTopic::builder()
.topic_path(
Topic::Rejected
.format::<64>(self.mqtt.client_id(), metric_name)?
.as_str(),
)
.build(),
])
.build(),
)
.await
.map_err(Error::MqttError)?;

//*** PUBLISH REQUEST ***/
let topic_name = Topic::Publish.format::<64>(self.mqtt.client_id(), metric_name)?;
self.mqtt
.publish(
Publish::builder()
.topic_name(topic_name.as_str())
.payload(payload)
.build(),
)
.await
.map_err(Error::MqttError)?;

Ok(sub)
}
}
127 changes: 127 additions & 0 deletions src/defender_metrics/topics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#![allow(dead_code)]
use core::fmt::Write;

use embedded_mqtt::QoS;
use heapless::String;

use crate::{jobs::MAX_THING_NAME_LEN, shadows::Error};

#[derive(PartialEq, Eq, Clone, Copy)]
pub enum Topic {
Accepted,
Rejected,
Publish,
}

impl Topic {
const PREFIX: &'static str = "$aws/things";
const NAME: &'static str = "defender/metrics";
//TODO: Feature gate json or cbor
const PAYLOAD_FORMAT: &'static str = "cbor";

pub fn format<const L: usize>(
&self,
thing_name: &str,
metric_name: &str,
) -> Result<String<L>, Error> {
let mut topic_path = String::new();

match self {
Self::Accepted => topic_path.write_fmt(format_args!(
"{}/{}/{}/{}/{}",
Self::PREFIX,
thing_name,
Self::NAME,
Self::PAYLOAD_FORMAT,
metric_name
)),
Self::Rejected => topic_path.write_fmt(format_args!(
"{}/{}/{}/{}/{}",
Self::PREFIX,
thing_name,
Self::NAME,
Self::PAYLOAD_FORMAT,
metric_name
)),
Self::Publish => topic_path.write_fmt(format_args!(
"{}/{}/{}/{}",
Self::PREFIX,
thing_name,
Self::NAME,
Self::PAYLOAD_FORMAT,
)),
}
.map_err(|_| Error::Overflow)?;

Ok(topic_path)
}
}

#[derive(Default)]
pub struct Subscribe<const N: usize> {
topics: heapless::Vec<(Topic, QoS), N>,
}

impl<const N: usize> Subscribe<N> {
pub fn new() -> Self {
Self::default()
}

pub fn add_topic(self, topic: Topic, qos: QoS) -> Self {
if self.topics.iter().any(|(t, _)| t == &topic) {
return self;
}

let mut topics = self.topics;
topics.push((topic, qos)).ok();

Self { topics }
}

pub fn topics(
self,
thing_name: &str,
metric_name: &str,
) -> Result<heapless::Vec<(heapless::String<N>, QoS), N>, Error> {
assert!(thing_name.len() <= MAX_THING_NAME_LEN);

self.topics
.iter()
.map(|(topic, qos)| Ok(((*topic).format(thing_name, metric_name)?, *qos)))
.collect()
}
}

#[derive(Default)]
pub struct Unsubscribe<const N: usize> {
topics: heapless::Vec<Topic, N>,
}

impl<const N: usize> Unsubscribe<N> {
pub fn new() -> Self {
Self::default()
}

pub fn topic(self, topic: Topic) -> Self {
if self.topics.iter().any(|t| t == &topic) {
return self;
}

let mut topics = self.topics;
topics.push(topic).ok();
Self { topics }
}

pub fn topics(
self,
thing_name: &str,
metric_name: &str,
) -> Result<heapless::Vec<heapless::String<256>, N>, Error> {
assert!(thing_name.len() <= MAX_THING_NAME_LEN);

self.topics
.iter()
.map(|topic| (*topic).format(thing_name, metric_name))
.collect()
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// This mod MUST go first, so that the others see its macros.
pub(crate) mod fmt;

pub mod defender_metrics;
pub mod jobs;
#[cfg(any(feature = "ota_mqtt_data", feature = "ota_http_data"))]
pub mod ota;
Expand Down
2 changes: 1 addition & 1 deletion src/shadows/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod dao;
pub mod data_types;
mod error;
pub mod error;
mod shadow_diff;
pub mod topics;

Expand Down

0 comments on commit 7fc002b

Please sign in to comment.