Skip to content

Commit

Permalink
metric structure and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
KennethKnudsen97 committed Dec 27, 2024
1 parent 7fc002b commit ae63c48
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"rust-analyzer.checkOnSave.allTargets": false,
"rust-analyzer.checkOnSave.allTargets": true,
"rust-analyzer.cargo.features": ["log"],
"rust-analyzer.cargo.target": "x86_64-unknown-linux-gnu"
}
84 changes: 84 additions & 0 deletions src/defender_metrics/aws_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use core::str::FromStr;

use heapless::{LinearMap, String, Vec};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct TcpConnections {
#[serde(rename = "ec")]
pub established_connections: Option<EstablishedConnections>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct EstablishedConnections {
#[serde(rename = "cs")]
pub connections: Option<Vec<Connection, { MAX_CONNECTIONS }>>,

#[serde(rename = "t")]
pub total: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Connection {
#[serde(rename = "rad")]
pub remote_addr: String<REMOTE_ADDR_SIZE>,

/// Port number, must be >= 0
#[serde(rename = "lp")]
pub local_port: Option<u16>,

/// Interface name
#[serde(rename = "li")]
pub local_interface: Option<String<LOCAL_INTERFACE_SIZE>>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ListeningTcpPorts {
#[serde(rename = "pts")]
pub ports: Option<Vec<TcpPort, MAX_PORTS>>,

#[serde(rename = "t")]
pub total: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TcpPort {
#[serde(rename = "pt")]
pub port: u16,

#[serde(rename = "if")]
pub interface: Option<String<LOCAL_INTERFACE_SIZE>>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ListeningUdpPorts {
#[serde(rename = "pts")]
pub ports: Option<Vec<UdpPort, MAX_PORTS>>,

#[serde(rename = "t")]
pub total: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UdpPort {
#[serde(rename = "pt")]
pub port: u16,

#[serde(rename = "if")]
pub interface: Option<String<LOCAL_INTERFACE_SIZE>>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct NetworkStats {
#[serde(rename = "bi")]
pub bytes_in: Option<u64>,

#[serde(rename = "bo")]
pub bytes_out: Option<u64>,

#[serde(rename = "pi")]
pub packets_in: Option<u64>,

#[serde(rename = "po")]
pub packets_out: Option<u64>,
}
120 changes: 120 additions & 0 deletions src/defender_metrics/data_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use core::str::FromStr;

use heapless::{LinearMap, String, Vec};
use serde::{Deserialize, Serialize};

// Constants for heapless container sizes
pub const HEADER_VERSION_SIZE: usize = 6;
pub const REMOTE_ADDR_SIZE: usize = 64;
pub const LOCAL_INTERFACE_SIZE: usize = 32;
pub const MAX_METRICS: usize = 8;
pub const MAX_CUSTOM_METRICS: usize = 16;
pub const MAX_CUSTOM_METRICS_NAME: usize = 32;

pub enum MetricError {
Malformed,
Throttled,
MissingHeader,
Other,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Metric {
#[serde(rename = "hed")]
pub header: Header,

#[serde(rename = "cmet")]
pub custom_metrics: Option<
LinearMap<String<MAX_CUSTOM_METRICS_NAME>, Vec<CustomMetric, 1>, MAX_CUSTOM_METRICS>,
>,
}

impl Metric {
pub fn new(
custom_metrics: Option<
LinearMap<String<MAX_CUSTOM_METRICS_NAME>, Vec<CustomMetric, 1>, MAX_CUSTOM_METRICS>,
>,
timestamp: i64,
) -> Self {
let header = Header {
report_id: timestamp,
version: String::<HEADER_VERSION_SIZE>::from_str("1.0").unwrap(), //FIXME: Don't
};

Self {
header,
custom_metrics,
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Header {
/// Monotonically increasing value. Epoch timestamp recommended.
#[serde(rename = "rid")]
pub report_id: i64,

/// Version in Major.Minor format.
#[serde(rename = "v")]
pub version: String<HEADER_VERSION_SIZE>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CustomMetric {
Number(u64),
NumberList(Vec<u64, MAX_METRICS>),
StringList(Vec<String<LOCAL_INTERFACE_SIZE>, MAX_METRICS>),
IpList(Vec<String<REMOTE_ADDR_SIZE>, MAX_METRICS>),
}

impl CustomMetric {
pub fn new_number(value: u64) -> heapless::Vec<Self, 1> {
let mut custom_metric_map = Vec::new();

custom_metric_map.push(CustomMetric::Number(value)).unwrap();

custom_metric_map
}

pub fn new_number_list(values: &[u64]) -> heapless::Vec<Self, 1> {
let mut custom_metric_map = Vec::new();

let mut vec = Vec::new();
for &v in values {
vec.push(v).unwrap();
}

custom_metric_map
.push(CustomMetric::NumberList(vec))
.unwrap();

custom_metric_map
}

pub fn new_string_list(values: &[&str]) -> heapless::Vec<Self, 1> {
let mut custom_metric_map = Vec::new();

let mut vec = Vec::new();
for &v in values {
vec.push(String::from_str(v).unwrap()).unwrap();
}
custom_metric_map
.push(CustomMetric::StringList(vec))
.unwrap();

custom_metric_map
}

pub fn new_ip_list(values: &[&str]) -> heapless::Vec<Self, 1> {
let mut custom_metric_map = Vec::new();

let mut vec = Vec::new();
for &v in values {
vec.push(String::from_str(v).unwrap()).unwrap();
}
custom_metric_map.push(CustomMetric::IpList(vec)).unwrap();

custom_metric_map
}
}
162 changes: 161 additions & 1 deletion src/defender_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,63 @@
use data_types::{
CustomMetric, Metric, MetricError, HEADER_VERSION_SIZE, MAX_CUSTOM_METRICS,
MAX_CUSTOM_METRICS_NAME,
};
use embassy_sync::blocking_mutex::raw::RawMutex;
use embedded_mqtt::{Publish, Subscribe, SubscribeTopic, ToPayload};
use embedded_mqtt::{DeferredPayload, Publish, Subscribe, SubscribeTopic, ToPayload};
use futures::StreamExt;
use heapless::LinearMap;
use topics::Topic;

use crate::shadows::Error;

// pub mod aws_types;
pub mod data_types;
pub mod topics;

pub struct MetricHandler<'a, 'm, M: RawMutex> {
mqtt: &'m embedded_mqtt::MqttClient<'a, M>,
}

impl<'a, 'm, M: RawMutex> MetricHandler<'a, 'm, M> {
pub fn new(mqtt: &'m embedded_mqtt::MqttClient<'a, M>) -> Self {
Self { mqtt }
}

pub async fn publish_metric(
&self,
custom_metric: LinearMap<
heapless::String<{ MAX_CUSTOM_METRICS_NAME }>,
heapless::Vec<CustomMetric, 1>,
{ MAX_CUSTOM_METRICS },
>,
metric_name: &str,
timestamp: i64,
) -> Result<(), MetricError> {
let metric = Metric::new(Some(custom_metric), timestamp);

let payload = DeferredPayload::new(
|buf: &mut [u8]| {
serde_json_core::to_slice(&metric, buf)
.map_err(|_| embedded_mqtt::EncodingError::BufferSize)
},
4000,
);

let mut subsctiption = self
.publish_and_subscribe(payload, metric_name)
.await
.map_err(|_| MetricError::Other)?;

loop {
let message = subsctiption.next().await.ok_or(MetricError::Malformed)?;

match Topic::from_str(message.topic_name()) {
Some(Topic::Accepted) => return Ok(()),
Some(Topic::Rejected) => return Err(MetricError::Other),
_ => (),
};
}
}
async fn publish_and_subscribe(
&self,
payload: impl ToPayload,
Expand Down Expand Up @@ -56,3 +103,116 @@ impl<'a, 'm, M: RawMutex> MetricHandler<'a, 'm, M> {
Ok(sub)
}
}

#[cfg(test)]
mod tests {
use core::str::FromStr;

use super::data_types::*;

use heapless::{LinearMap, String};

#[test]
fn number() {
let mut custom_metrics = LinearMap::new();

let name_of_metric = String::from_str("myMetric").unwrap();

custom_metrics
.insert(name_of_metric, CustomMetric::new_number(123))
.unwrap();

let metric = Metric::new(Some(custom_metrics), 123123123123);

let payload: String<4000> = serde_json_core::to_string(&metric).unwrap();

println!("buffer = {}", payload);

assert!(true)
}

#[test]
fn number_list() {
let mut custom_metrics = LinearMap::new();

// NUMBER LIST
let my_number_list = String::from_str("my_number_list").unwrap();

custom_metrics
.insert(
my_number_list,
CustomMetric::new_number_list(&[123, 456, 789]),
)
.unwrap();

let metric = Metric::new(Some(custom_metrics), 123123123123);

let payload: String<4000> = serde_json_core::to_string(&metric).unwrap();

println!("buffer = {}", payload);

assert!(true)
}

#[test]
fn string_list() {
let mut custom_metrics = LinearMap::new();

// STRING LIST
let my_string_list = String::from_str("my_string_list").unwrap();

custom_metrics
.insert(
my_string_list,
CustomMetric::new_string_list(&["value_1", "value_2"]),
)
.unwrap();

let metric = Metric::new(Some(custom_metrics), 123123123123);

let payload: String<4000> = serde_json_core::to_string(&metric).unwrap();

println!("buffer = {}", payload);

assert!(true)
}

#[test]
fn all_types() {
let mut custom_metrics = LinearMap::new();

let my_number = String::from_str("MyMetricOfType_Number").unwrap();
custom_metrics
.insert(my_number, CustomMetric::new_number(1))
.unwrap();

let my_number_list = String::from_str("MyMetricOfType_NumberList").unwrap();
custom_metrics
.insert(my_number_list, CustomMetric::new_number_list(&[1, 2, 3]))
.unwrap();

let my_string_list = String::from_str("MyMetricOfType_StringList").unwrap();
custom_metrics
.insert(
my_string_list,
CustomMetric::new_string_list(&["value_1", "value_2"]),
)
.unwrap();

let my_ip_list = String::from_str("MyMetricOfType_IpList").unwrap();
custom_metrics
.insert(
my_ip_list,
CustomMetric::new_ip_list(&["172.0.0.0", "172.0.0.10"]),
)
.unwrap();

let metric = Metric::new(Some(custom_metrics), 123123123123);

let payload: String<4000> = serde_json_core::to_string(&metric).unwrap();

println!("buffer = {}", payload);

assert!(true)
}
}
Loading

0 comments on commit ae63c48

Please sign in to comment.