diff --git a/.vscode/settings.json b/.vscode/settings.json index 48fb5ea..8870c9a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,5 @@ { - "rust-analyzer.checkOnSave.allTargets": false, + "rust-analyzer.checkOnSave.allTargets": true, "rust-analyzer.cargo.features": ["log"], "rust-analyzer.cargo.target": "x86_64-unknown-linux-gnu" } \ No newline at end of file diff --git a/src/defender_metrics/aws_types.rs b/src/defender_metrics/aws_types.rs new file mode 100644 index 0000000..903254d --- /dev/null +++ b/src/defender_metrics/aws_types.rs @@ -0,0 +1,84 @@ +use core::str::FromStr; + +use heapless::{LinearMap, String, Vec}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct TcpConnections { + #[serde(rename = "ec")] + pub established_connections: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EstablishedConnections { + #[serde(rename = "cs")] + pub connections: Option>, + + #[serde(rename = "t")] + pub total: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Connection { + #[serde(rename = "rad")] + pub remote_addr: String, + + /// Port number, must be >= 0 + #[serde(rename = "lp")] + pub local_port: Option, + + /// Interface name + #[serde(rename = "li")] + pub local_interface: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ListeningTcpPorts { + #[serde(rename = "pts")] + pub ports: Option>, + + #[serde(rename = "t")] + pub total: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TcpPort { + #[serde(rename = "pt")] + pub port: u16, + + #[serde(rename = "if")] + pub interface: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ListeningUdpPorts { + #[serde(rename = "pts")] + pub ports: Option>, + + #[serde(rename = "t")] + pub total: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct UdpPort { + #[serde(rename = "pt")] + pub port: u16, + + #[serde(rename = "if")] + pub interface: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct NetworkStats { + #[serde(rename = "bi")] + pub bytes_in: Option, + + #[serde(rename = "bo")] + pub bytes_out: Option, + + #[serde(rename = "pi")] + pub packets_in: Option, + + #[serde(rename = "po")] + pub packets_out: Option, +} diff --git a/src/defender_metrics/data_types.rs b/src/defender_metrics/data_types.rs new file mode 100644 index 0000000..8d3dab1 --- /dev/null +++ b/src/defender_metrics/data_types.rs @@ -0,0 +1,120 @@ +use core::str::FromStr; + +use heapless::{LinearMap, String, Vec}; +use serde::{Deserialize, Serialize}; + +// Constants for heapless container sizes +pub const HEADER_VERSION_SIZE: usize = 6; +pub const REMOTE_ADDR_SIZE: usize = 64; +pub const LOCAL_INTERFACE_SIZE: usize = 32; +pub const MAX_METRICS: usize = 8; +pub const MAX_CUSTOM_METRICS: usize = 16; +pub const MAX_CUSTOM_METRICS_NAME: usize = 32; + +pub enum MetricError { + Malformed, + Throttled, + MissingHeader, + Other, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Metric { + #[serde(rename = "hed")] + pub header: Header, + + #[serde(rename = "cmet")] + pub custom_metrics: Option< + LinearMap, Vec, MAX_CUSTOM_METRICS>, + >, +} + +impl Metric { + pub fn new( + custom_metrics: Option< + LinearMap, Vec, MAX_CUSTOM_METRICS>, + >, + timestamp: i64, + ) -> Self { + let header = Header { + report_id: timestamp, + version: String::::from_str("1.0").unwrap(), //FIXME: Don't + }; + + Self { + header, + custom_metrics, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Header { + /// Monotonically increasing value. Epoch timestamp recommended. + #[serde(rename = "rid")] + pub report_id: i64, + + /// Version in Major.Minor format. + #[serde(rename = "v")] + pub version: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CustomMetric { + Number(u64), + NumberList(Vec), + StringList(Vec, MAX_METRICS>), + IpList(Vec, MAX_METRICS>), +} + +impl CustomMetric { + pub fn new_number(value: u64) -> heapless::Vec { + let mut custom_metric_map = Vec::new(); + + custom_metric_map.push(CustomMetric::Number(value)).unwrap(); + + custom_metric_map + } + + pub fn new_number_list(values: &[u64]) -> heapless::Vec { + let mut custom_metric_map = Vec::new(); + + let mut vec = Vec::new(); + for &v in values { + vec.push(v).unwrap(); + } + + custom_metric_map + .push(CustomMetric::NumberList(vec)) + .unwrap(); + + custom_metric_map + } + + pub fn new_string_list(values: &[&str]) -> heapless::Vec { + let mut custom_metric_map = Vec::new(); + + let mut vec = Vec::new(); + for &v in values { + vec.push(String::from_str(v).unwrap()).unwrap(); + } + custom_metric_map + .push(CustomMetric::StringList(vec)) + .unwrap(); + + custom_metric_map + } + + pub fn new_ip_list(values: &[&str]) -> heapless::Vec { + let mut custom_metric_map = Vec::new(); + + let mut vec = Vec::new(); + for &v in values { + vec.push(String::from_str(v).unwrap()).unwrap(); + } + custom_metric_map.push(CustomMetric::IpList(vec)).unwrap(); + + custom_metric_map + } +} diff --git a/src/defender_metrics/mod.rs b/src/defender_metrics/mod.rs index afe4e4d..1789f17 100644 --- a/src/defender_metrics/mod.rs +++ b/src/defender_metrics/mod.rs @@ -1,9 +1,17 @@ +use data_types::{ + CustomMetric, Metric, MetricError, HEADER_VERSION_SIZE, MAX_CUSTOM_METRICS, + MAX_CUSTOM_METRICS_NAME, +}; use embassy_sync::blocking_mutex::raw::RawMutex; -use embedded_mqtt::{Publish, Subscribe, SubscribeTopic, ToPayload}; +use embedded_mqtt::{DeferredPayload, Publish, Subscribe, SubscribeTopic, ToPayload}; +use futures::StreamExt; +use heapless::LinearMap; use topics::Topic; use crate::shadows::Error; +// pub mod aws_types; +pub mod data_types; pub mod topics; pub struct MetricHandler<'a, 'm, M: RawMutex> { @@ -11,6 +19,45 @@ pub struct MetricHandler<'a, 'm, M: RawMutex> { } impl<'a, 'm, M: RawMutex> MetricHandler<'a, 'm, M> { + pub fn new(mqtt: &'m embedded_mqtt::MqttClient<'a, M>) -> Self { + Self { mqtt } + } + + pub async fn publish_metric( + &self, + custom_metric: LinearMap< + heapless::String<{ MAX_CUSTOM_METRICS_NAME }>, + heapless::Vec, + { MAX_CUSTOM_METRICS }, + >, + metric_name: &str, + timestamp: i64, + ) -> Result<(), MetricError> { + let metric = Metric::new(Some(custom_metric), timestamp); + + let payload = DeferredPayload::new( + |buf: &mut [u8]| { + serde_json_core::to_slice(&metric, buf) + .map_err(|_| embedded_mqtt::EncodingError::BufferSize) + }, + 4000, + ); + + let mut subsctiption = self + .publish_and_subscribe(payload, metric_name) + .await + .map_err(|_| MetricError::Other)?; + + loop { + let message = subsctiption.next().await.ok_or(MetricError::Malformed)?; + + match Topic::from_str(message.topic_name()) { + Some(Topic::Accepted) => return Ok(()), + Some(Topic::Rejected) => return Err(MetricError::Other), + _ => (), + }; + } + } async fn publish_and_subscribe( &self, payload: impl ToPayload, @@ -56,3 +103,116 @@ impl<'a, 'm, M: RawMutex> MetricHandler<'a, 'm, M> { Ok(sub) } } + +#[cfg(test)] +mod tests { + use core::str::FromStr; + + use super::data_types::*; + + use heapless::{LinearMap, String}; + + #[test] + fn number() { + let mut custom_metrics = LinearMap::new(); + + let name_of_metric = String::from_str("myMetric").unwrap(); + + custom_metrics + .insert(name_of_metric, CustomMetric::new_number(123)) + .unwrap(); + + let metric = Metric::new(Some(custom_metrics), 123123123123); + + let payload: String<4000> = serde_json_core::to_string(&metric).unwrap(); + + println!("buffer = {}", payload); + + assert!(true) + } + + #[test] + fn number_list() { + let mut custom_metrics = LinearMap::new(); + + // NUMBER LIST + let my_number_list = String::from_str("my_number_list").unwrap(); + + custom_metrics + .insert( + my_number_list, + CustomMetric::new_number_list(&[123, 456, 789]), + ) + .unwrap(); + + let metric = Metric::new(Some(custom_metrics), 123123123123); + + let payload: String<4000> = serde_json_core::to_string(&metric).unwrap(); + + println!("buffer = {}", payload); + + assert!(true) + } + + #[test] + fn string_list() { + let mut custom_metrics = LinearMap::new(); + + // STRING LIST + let my_string_list = String::from_str("my_string_list").unwrap(); + + custom_metrics + .insert( + my_string_list, + CustomMetric::new_string_list(&["value_1", "value_2"]), + ) + .unwrap(); + + let metric = Metric::new(Some(custom_metrics), 123123123123); + + let payload: String<4000> = serde_json_core::to_string(&metric).unwrap(); + + println!("buffer = {}", payload); + + assert!(true) + } + + #[test] + fn all_types() { + let mut custom_metrics = LinearMap::new(); + + let my_number = String::from_str("MyMetricOfType_Number").unwrap(); + custom_metrics + .insert(my_number, CustomMetric::new_number(1)) + .unwrap(); + + let my_number_list = String::from_str("MyMetricOfType_NumberList").unwrap(); + custom_metrics + .insert(my_number_list, CustomMetric::new_number_list(&[1, 2, 3])) + .unwrap(); + + let my_string_list = String::from_str("MyMetricOfType_StringList").unwrap(); + custom_metrics + .insert( + my_string_list, + CustomMetric::new_string_list(&["value_1", "value_2"]), + ) + .unwrap(); + + let my_ip_list = String::from_str("MyMetricOfType_IpList").unwrap(); + custom_metrics + .insert( + my_ip_list, + CustomMetric::new_ip_list(&["172.0.0.0", "172.0.0.10"]), + ) + .unwrap(); + + let metric = Metric::new(Some(custom_metrics), 123123123123); + + let payload: String<4000> = serde_json_core::to_string(&metric).unwrap(); + + println!("buffer = {}", payload); + + assert!(true) + } +} diff --git a/src/defender_metrics/topics.rs b/src/defender_metrics/topics.rs index 41d545a..bca9e66 100644 --- a/src/defender_metrics/topics.rs +++ b/src/defender_metrics/topics.rs @@ -17,7 +17,7 @@ impl Topic { const PREFIX: &'static str = "$aws/things"; const NAME: &'static str = "defender/metrics"; //TODO: Feature gate json or cbor - const PAYLOAD_FORMAT: &'static str = "cbor"; + const PAYLOAD_FORMAT: &'static str = "json"; pub fn format( &self, @@ -55,6 +55,22 @@ impl Topic { Ok(topic_path) } + + pub fn from_str(s: &str) -> Option { + let tt = s.splitn(7, '/').collect::>(); + match (tt.get(0), tt.get(1), tt.get(3), tt.get(4)) { + (Some(&"$aws"), Some(&"things"), Some(&"defender"), Some(&"metrics")) => { + // This is a defender metric topic, now figure out which one. + + match tt.get(6) { + Some(&"accepted") => Some(Topic::Accepted), + Some(&"rejected") => Some(Topic::Rejected), + _ => return None, + } + } + _ => None, + } + } } #[derive(Default)] diff --git a/src/shadows/mod.rs b/src/shadows/mod.rs index 9c728f2..958637f 100644 --- a/src/shadows/mod.rs +++ b/src/shadows/mod.rs @@ -123,7 +123,7 @@ where }; let payload = DeferredPayload::new( - |buf| { + |buf: &mut [u8]| { serde_json_core::to_slice(&request, buf) .map_err(|_| embedded_mqtt::EncodingError::BufferSize) },