From 2378458ae37d5d8c16081118d5d922ae01becf08 Mon Sep 17 00:00:00 2001 From: Mathias Koch Date: Tue, 29 Oct 2024 13:19:00 +0100 Subject: [PATCH] Fix OTA & bump dependencies (#67) --- Cargo.toml | 6 +- src/jobs/data_types.rs | 2 +- src/ota/config.rs | 2 +- src/ota/control_interface/mod.rs | 2 - src/ota/control_interface/mqtt.rs | 177 +++++++++++++++--------------- src/ota/data_interface/mqtt.rs | 13 +-- src/ota/encoding/json.rs | 1 + src/ota/mod.rs | 40 ++++--- src/provisioning/mod.rs | 59 ++++------ src/shadows/mod.rs | 39 +++---- tests/ota_mqtt.rs | 12 +- tests/provisioning.rs | 8 +- tests/shadows.rs | 14 +-- 13 files changed, 172 insertions(+), 203 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f63eb00..9980820 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,10 +26,10 @@ bitmaps = { version = "3.1", default-features = false } heapless = { version = "0.8", features = ["serde"] } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_cbor = { version = "0.11", default-features = false, optional = true } -serde-json-core = { version = "0.5" } +serde-json-core = { version = "0.6" } shadow-derive = { path = "shadow_derive", version = "0.2.1" } embedded-storage-async = "0.4" -embedded-mqtt = { git = "ssh://git@github.com/FactbirdHQ/embedded-mqtt", rev = "5e28a55da737356a3a9c1597ae4ff123e2481b1b" } +embedded-mqtt = { git = "ssh://git@github.com/FactbirdHQ/embedded-mqtt", rev = "74eb53d" } futures = { version = "0.3.28", default-features = false } @@ -42,7 +42,7 @@ defmt = { version = "0.3", optional = true } [dev-dependencies] native-tls = { version = "0.2" } -embedded-nal-async = "0.7" +embedded-nal-async = "0.8" env_logger = "0.11" sha2 = "0.10.1" static_cell = { version = "2", features = ["nightly"] } diff --git a/src/jobs/data_types.rs b/src/jobs/data_types.rs index 36449dd..fb6100e 100644 --- a/src/jobs/data_types.rs +++ b/src/jobs/data_types.rs @@ -292,7 +292,7 @@ pub struct Jobs { pub struct ErrorResponse<'a> { pub code: ErrorCode, /// An error message string. - message: &'a str, + pub message: &'a str, /// A client token used to correlate requests and responses. Enter an /// arbitrary value here and it is reflected in the response. #[serde(rename = "clientToken")] diff --git a/src/ota/config.rs b/src/ota/config.rs index ef5cd28..d57cd75 100644 --- a/src/ota/config.rs +++ b/src/ota/config.rs @@ -13,7 +13,7 @@ impl Default for Config { Self { block_size: 1024, max_request_momentum: 3, - request_wait: Duration::from_secs(8), + request_wait: Duration::from_secs(5), status_update_frequency: 24, self_test_timeout: None, } diff --git a/src/ota/control_interface/mod.rs b/src/ota/control_interface/mod.rs index 51e7a7c..ffd8b03 100644 --- a/src/ota/control_interface/mod.rs +++ b/src/ota/control_interface/mod.rs @@ -1,7 +1,6 @@ use crate::jobs::data_types::JobStatus; use super::{ - config::Config, encoding::{json::JobStatusReason, FileContext}, error::OtaError, ProgressState, @@ -16,7 +15,6 @@ pub trait ControlInterface { &self, file_ctx: &FileContext, progress: &mut ProgressState, - config: &Config, status: JobStatus, reason: JobStatusReason, ) -> Result<(), OtaError>; diff --git a/src/ota/control_interface/mqtt.rs b/src/ota/control_interface/mqtt.rs index 826d5b8..601ad68 100644 --- a/src/ota/control_interface/mqtt.rs +++ b/src/ota/control_interface/mqtt.rs @@ -1,9 +1,8 @@ use core::fmt::Write; -use bitmaps::{Bits, BitsImpl}; use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_time::with_timeout; use embedded_mqtt::{DeferredPayload, EncodingError, Publish, QoS, Subscribe, SubscribeTopic}; -use futures::StreamExt as _; use super::ControlInterface; use crate::jobs::data_types::{ErrorResponse, JobStatus, UpdateJobExecutionResponse}; @@ -14,10 +13,7 @@ use crate::ota::encoding::{self, FileContext}; use crate::ota::error::OtaError; use crate::ota::ProgressState; -impl<'a, M: RawMutex, const SUBS: usize> ControlInterface for embedded_mqtt::MqttClient<'a, M, SUBS> -where - BitsImpl<{ SUBS }>: Bits, -{ +impl<'a, M: RawMutex> ControlInterface for embedded_mqtt::MqttClient<'a, M> { /// Check for next available OTA job from the job service by publishing a /// "get next job" message to the job service. async fn request_job(&self) -> Result<(), OtaError> { @@ -45,7 +41,6 @@ where &self, file_ctx: &FileContext, progress_state: &mut ProgressState, - config: &Config, status: JobStatus, reason: JobStatusReason, ) -> Result<(), OtaError> { @@ -63,15 +58,6 @@ where if let JobStatus::InProgress | JobStatus::Succeeded = status { let received_blocks = progress_state.total_blocks - progress_state.blocks_remaining; - // Output a status update once in a while. Always update first and - // last status - if progress_state.blocks_remaining != 0 - && received_blocks != 0 - && received_blocks % config.status_update_frequency as usize != 0 - { - return Ok(()); - } - // Don't override the progress on succeeded, nor on self-test // active. (Cases where progress counter is lost due to device // restarts) @@ -92,40 +78,40 @@ where // Downgrade progress updates to QOS 0 to avoid overloading MQTT // buffers during active streaming. But make sure to always send and await ack for first update and last update - if status == JobStatus::InProgress - && progress_state.blocks_remaining != 0 - && received_blocks != 0 - { - qos = QoS::AtMostOnce; - } + // if status == JobStatus::InProgress + // && progress_state.blocks_remaining != 0 + // && received_blocks != 0 + // { + // qos = QoS::AtMostOnce; + // } } - let mut sub = self - .subscribe::<2>( - Subscribe::builder() - .topics(&[ - SubscribeTopic::builder() - .topic_path( - JobTopic::UpdateAccepted(file_ctx.job_name.as_str()) - .format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 34 }>( - self.client_id(), - )? - .as_str(), - ) - .build(), - SubscribeTopic::builder() - .topic_path( - JobTopic::UpdateRejected(file_ctx.job_name.as_str()) - .format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 34 }>( - self.client_id(), - )? - .as_str(), - ) - .build(), - ]) - .build(), - ) - .await?; + // let mut sub = self + // .subscribe::<2>( + // Subscribe::builder() + // .topics(&[ + // SubscribeTopic::builder() + // .topic_path( + // JobTopic::UpdateAccepted(file_ctx.job_name.as_str()) + // .format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 34 }>( + // self.client_id(), + // )? + // .as_str(), + // ) + // .build(), + // SubscribeTopic::builder() + // .topic_path( + // JobTopic::UpdateRejected(file_ctx.job_name.as_str()) + // .format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 34 }>( + // self.client_id(), + // )? + // .as_str(), + // ) + // .build(), + // ]) + // .build(), + // ) + // .await?; let topic = JobTopic::Update(file_ctx.job_name.as_str()) .format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 25 }>(self.client_id())?; @@ -151,44 +137,61 @@ where ) .await?; - loop { - let message = sub.next().await.ok_or(JobError::Encoding)?; - - // Check if topic is GetAccepted - match crate::jobs::Topic::from_str(message.topic_name()) { - Some(crate::jobs::Topic::UpdateAccepted(_)) => { - // Check client token - let (response, _) = serde_json_core::from_slice::< - UpdateJobExecutionResponse>, - >(message.payload()) - .map_err(|_| JobError::Encoding)?; - - if response.client_token != Some(self.client_id()) { - error!( - "Unexpected client token received: {}, expected: {}", - response.client_token.unwrap_or("None"), - self.client_id() - ); - continue; - } - - return Ok(()); - } - Some(crate::jobs::Topic::UpdateRejected(_)) => { - let (error_response, _) = - serde_json_core::from_slice::(message.payload()) - .map_err(|_| JobError::Encoding)?; - - if error_response.client_token != Some(self.client_id()) { - continue; - } - - return Err(OtaError::UpdateRejected(error_response.code)); - } - _ => { - error!("Expected Topic name GetRejected or GetAccepted but got something else"); - } - } - } + Ok(()) + + // loop { + // let message = match with_timeout( + // embassy_time::Duration::from_secs(1), + // sub.next_message(), + // ) + // .await + // { + // Ok(res) => res.ok_or(JobError::Encoding)?, + // Err(_) => return Err(OtaError::Timeout), + // }; + + // // Check if topic is GetAccepted + // match crate::jobs::Topic::from_str(message.topic_name()) { + // Some(crate::jobs::Topic::UpdateAccepted(_)) => { + // // Check client token + // let (response, _) = serde_json_core::from_slice::< + // UpdateJobExecutionResponse>, + // >(message.payload()) + // .map_err(|_| JobError::Encoding)?; + + // if response.client_token != Some(self.client_id()) { + // error!( + // "Unexpected client token received: {}, expected: {}", + // response.client_token.unwrap_or("None"), + // self.client_id() + // ); + // continue; + // } + + // return Ok(()); + // } + // Some(crate::jobs::Topic::UpdateRejected(_)) => { + // let (error_response, _) = + // serde_json_core::from_slice::(message.payload()) + // .map_err(|_| JobError::Encoding)?; + + // if error_response.client_token != Some(self.client_id()) { + // error!( + // "Unexpected client token received: {}, expected: {}", + // error_response.client_token.unwrap_or("None"), + // self.client_id() + // ); + // continue; + // } + + // error!("OTA Update rejected: {:?}", error_response.message); + + // return Err(OtaError::UpdateRejected(error_response.code)); + // } + // _ => { + // error!("Expected Topic name GetRejected or GetAccepted but got something else"); + // } + // } + // } } } diff --git a/src/ota/data_interface/mqtt.rs b/src/ota/data_interface/mqtt.rs index 1bbaae6..b6516d1 100644 --- a/src/ota/data_interface/mqtt.rs +++ b/src/ota/data_interface/mqtt.rs @@ -2,7 +2,6 @@ use core::fmt::{Display, Write}; use core::ops::DerefMut; use core::str::FromStr; -use bitmaps::{Bits, BitsImpl}; use embassy_sync::blocking_mutex::raw::RawMutex; use embedded_mqtt::{ DeferredPayload, EncodingError, MqttClient, Publish, Subscribe, SubscribeTopic, Subscription, @@ -125,22 +124,16 @@ impl<'a> OtaTopic<'a> { } } -impl<'a, 'b, M: RawMutex, const SUBS: usize> BlockTransfer for Subscription<'a, 'b, M, SUBS, 1> -where - BitsImpl<{ SUBS }>: Bits, -{ +impl<'a, 'b, M: RawMutex> BlockTransfer for Subscription<'a, 'b, M, 1> { async fn next_block(&mut self) -> Result>, OtaError> { Ok(self.next().await) } } -impl<'a, M: RawMutex, const SUBS: usize> DataInterface for MqttClient<'a, M, SUBS> -where - BitsImpl<{ SUBS }>: Bits, -{ +impl<'a, M: RawMutex> DataInterface for MqttClient<'a, M> { const PROTOCOL: Protocol = Protocol::Mqtt; - type ActiveTransfer<'t> = Subscription<'a, 't, M, SUBS, 1> where Self: 't; + type ActiveTransfer<'t> = Subscription<'a, 't, M, 1> where Self: 't; /// Init file transfer by subscribing to the OTA data stream topic async fn init_file_transfer( diff --git a/src/ota/encoding/json.rs b/src/ota/encoding/json.rs index c258942..12fae02 100644 --- a/src/ota/encoding/json.rs +++ b/src/ota/encoding/json.rs @@ -139,6 +139,7 @@ mod tests { (JobStatusReason::Accepted, "accepted"), (JobStatusReason::Rejected, "rejected"), (JobStatusReason::Aborted, "aborted"), + (JobStatusReason::Pal(123), "pal err"), ]; for (reason, exp) in reasons { diff --git a/src/ota/mod.rs b/src/ota/mod.rs index 0a75a2a..008dd2c 100644 --- a/src/ota/mod.rs +++ b/src/ota/mod.rs @@ -126,10 +126,11 @@ impl Updater { // ... (Handle end of file) ... match pal.close_file(&file_ctx).await { Err(e) => { - job_updater.signal_update( - JobStatus::Failed, - JobStatusReason::Pal(0), - ); + // FIXME: This seems like duplicate status update, as it will also report during cleanup + // job_updater.signal_update( + // JobStatus::Failed, + // JobStatusReason::Pal(0), + // ); return Err(e.into()); } @@ -212,6 +213,11 @@ impl Updater { pal::OtaEvent::UpdateComplete }; + info!( + "OTA Download finished! Running complete callback: {:?}", + event + ); + pal.complete_callback(event).await?; Ok(()) @@ -331,15 +337,19 @@ impl Updater { continue; }; - if *request_momentum <= config.max_request_momentum { - // Increment momentum - *request_momentum += 1; + // Increment momentum + *request_momentum += 1; + if *request_momentum == 1 { + continue; + } + + if *request_momentum <= config.max_request_momentum { warn!("Momentum requesting more blocks!"); // Request data blocks - // data.request_file_blocks(file_ctx, &mut progress, config) - // .await?; + data.request_file_blocks(file_ctx, &mut progress, config) + .await?; } else { // Too much momentum, abort return Err(error::OtaError::MomentumAbort); @@ -351,6 +361,7 @@ impl Updater { } #[derive(Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct ProgressState { pub total_blocks: usize, pub blocks_remaining: usize, @@ -358,7 +369,9 @@ pub struct ProgressState { pub block_offset: u32, pub request_block_remaining: u32, pub request_momentum: Option, + #[cfg_attr(feature = "defmt", defmt(Debug2Format))] pub bitmap: Bitmap, + #[cfg_attr(feature = "defmt", defmt(Debug2Format))] pub status_details: StatusDetailsOwned, } @@ -430,7 +443,6 @@ impl<'a, C: ControlInterface> JobUpdater<'a, C> { .update_job_status( &self.file_ctx, &mut progress, - self.config, JobStatus::Succeeded, JobStatusReason::Accepted, ) @@ -484,7 +496,7 @@ impl<'a, C: ControlInterface> JobUpdater<'a, C> { // Update the job status based on the signal let mut progress = self.progress_state.lock().await; self.control - .update_job_status(self.file_ctx, &mut progress, self.config, status, reason) + .update_job_status(self.file_ctx, &mut progress, status, reason) .await?; match status { @@ -531,7 +543,6 @@ impl<'a, C: ControlInterface> JobUpdater<'a, C> { .update_job_status( &self.file_ctx, &mut progress, - self.config, JobStatus::InProgress, JobStatusReason::SelfTestActive, ) @@ -544,7 +555,6 @@ impl<'a, C: ControlInterface> JobUpdater<'a, C> { .update_job_status( &self.file_ctx, &mut progress, - self.config, JobStatus::Succeeded, JobStatusReason::Accepted, ) @@ -559,7 +569,6 @@ impl<'a, C: ControlInterface> JobUpdater<'a, C> { .update_job_status( &self.file_ctx, &mut progress, - self.config, JobStatus::Failed, JobStatusReason::Rejected, ) @@ -574,7 +583,6 @@ impl<'a, C: ControlInterface> JobUpdater<'a, C> { .update_job_status( &self.file_ctx, &mut progress, - self.config, JobStatus::Failed, JobStatusReason::Aborted, ) @@ -598,7 +606,7 @@ impl<'a, C: ControlInterface> JobUpdater<'a, C> { let mut progress = self.progress_state.lock().await; self.control - .update_job_status(&self.file_ctx, &mut progress, self.config, status, reason) + .update_job_status(&self.file_ctx, &mut progress, status, reason) .await?; Ok(()) } diff --git a/src/provisioning/mod.rs b/src/provisioning/mod.rs index 32181b8..0a27f83 100644 --- a/src/provisioning/mod.rs +++ b/src/provisioning/mod.rs @@ -4,7 +4,6 @@ pub mod topics; use core::future::Future; -use bitmaps::{Bits, BitsImpl}; use embassy_sync::blocking_mutex::raw::RawMutex; use embedded_mqtt::{ BufferProvider, DeferredPayload, EncodingError, Message, Publish, Subscribe, SubscribeTopic, @@ -41,14 +40,13 @@ pub struct Credentials<'a> { pub struct FleetProvisioner; impl FleetProvisioner { - pub async fn provision<'a, C, M: RawMutex, const SUBS: usize>( - mqtt: &embedded_mqtt::MqttClient<'a, M, SUBS>, + pub async fn provision<'a, C, M: RawMutex>( + mqtt: &embedded_mqtt::MqttClient<'a, M>, template_name: &str, parameters: Option, credential_handler: &mut impl CredentialHandler, ) -> Result, Error> where - BitsImpl<{ SUBS }>: Bits, C: DeserializeOwned, { Self::provision_inner( @@ -62,15 +60,14 @@ impl FleetProvisioner { .await } - pub async fn provision_csr<'a, C, M: RawMutex, const SUBS: usize>( - mqtt: &embedded_mqtt::MqttClient<'a, M, SUBS>, + pub async fn provision_csr<'a, C, M: RawMutex>( + mqtt: &embedded_mqtt::MqttClient<'a, M>, template_name: &str, parameters: Option, csr: &str, credential_handler: &mut impl CredentialHandler, ) -> Result, Error> where - BitsImpl<{ SUBS }>: Bits, C: DeserializeOwned, { Self::provision_inner( @@ -85,14 +82,13 @@ impl FleetProvisioner { } #[cfg(feature = "provision_cbor")] - pub async fn provision_cbor<'a, C, M: RawMutex, const SUBS: usize>( - mqtt: &embedded_mqtt::MqttClient<'a, M, SUBS>, + pub async fn provision_cbor<'a, C, M: RawMutex>( + mqtt: &embedded_mqtt::MqttClient<'a, M>, template_name: &str, parameters: Option, credential_handler: &mut impl CredentialHandler, ) -> Result, Error> where - BitsImpl<{ SUBS }>: Bits, C: DeserializeOwned, { Self::provision_inner( @@ -107,15 +103,14 @@ impl FleetProvisioner { } #[cfg(feature = "provision_cbor")] - pub async fn provision_csr_cbor<'a, C, M: RawMutex, const SUBS: usize>( - mqtt: &embedded_mqtt::MqttClient<'a, M, SUBS>, + pub async fn provision_csr_cbor<'a, C, M: RawMutex>( + mqtt: &embedded_mqtt::MqttClient<'a, M>, template_name: &str, parameters: Option, csr: &str, credential_handler: &mut impl CredentialHandler, ) -> Result, Error> where - BitsImpl<{ SUBS }>: Bits, C: DeserializeOwned, { Self::provision_inner( @@ -130,8 +125,8 @@ impl FleetProvisioner { } #[cfg(feature = "provision_cbor")] - async fn provision_inner<'a, C, M: RawMutex, const SUBS: usize>( - mqtt: &embedded_mqtt::MqttClient<'a, M, SUBS>, + async fn provision_inner<'a, C, M: RawMutex>( + mqtt: &embedded_mqtt::MqttClient<'a, M>, template_name: &str, parameters: Option, csr: Option<&str>, @@ -139,7 +134,6 @@ impl FleetProvisioner { payload_format: PayloadFormat, ) -> Result, Error> where - BitsImpl<{ SUBS }>: Bits, C: DeserializeOwned, { use embedded_mqtt::SliceBufferProvider; @@ -154,8 +148,8 @@ impl FleetProvisioner { Some(Topic::CreateKeysAndCertificateAccepted(format)) => { let response = Self::deserialize::< CreateKeysAndCertificateResponse, + M, SliceBufferProvider<'a>, - SUBS, >(format, &mut message)?; credential_handler @@ -172,8 +166,8 @@ impl FleetProvisioner { Some(Topic::CreateCertificateFromCsrAccepted(format)) => { let response = Self::deserialize::< CreateCertificateFromCsrResponse, + M, SliceBufferProvider<'a>, - SUBS, >(format, &mut message)?; credential_handler @@ -266,8 +260,8 @@ impl FleetProvisioner { Some(Topic::RegisterThingAccepted(_, format)) => { let response = Self::deserialize::< RegisterThingResponse<'_, C>, + M, SliceBufferProvider<'a>, - SUBS, >(format, &mut message)?; Ok(response.device_configuration) @@ -286,14 +280,11 @@ impl FleetProvisioner { } } - async fn begin<'a, 'b, M: RawMutex, const SUBS: usize>( - mqtt: &'b embedded_mqtt::MqttClient<'a, M, SUBS>, + async fn begin<'a, 'b, M: RawMutex>( + mqtt: &'b embedded_mqtt::MqttClient<'a, M>, csr: Option<&str>, payload_format: PayloadFormat, - ) -> Result, Error> - where - BitsImpl<{ SUBS }>: Bits, - { + ) -> Result, Error> { if let Some(csr) = csr { let subscription = mqtt .subscribe( @@ -395,13 +386,10 @@ impl FleetProvisioner { } } - fn deserialize<'a, R: Deserialize<'a>, B: BufferProvider, const SUBS: usize>( + fn deserialize<'a, R: Deserialize<'a>, M: RawMutex, B: BufferProvider>( payload_format: PayloadFormat, - message: &'a mut Message<'_, B, SUBS>, - ) -> Result - where - BitsImpl<{ SUBS }>: Bits, - { + message: &'a mut Message<'_, M, B>, + ) -> Result { trace!( "Accepted Topic {:?}. Payload len: {:?}", payload_format, @@ -415,13 +403,10 @@ impl FleetProvisioner { }) } - fn handle_error( + fn handle_error( format: PayloadFormat, - mut message: Message<'_, B, SUBS>, - ) -> Result<(), Error> - where - BitsImpl<{ SUBS }>: Bits, - { + mut message: Message<'_, M, B>, + ) -> Result<(), Error> { error!(">> {:?}", message.topic_name()); let response = match format { diff --git a/src/shadows/mod.rs b/src/shadows/mod.rs index 5d46ac0..f0f3b96 100644 --- a/src/shadows/mod.rs +++ b/src/shadows/mod.rs @@ -6,7 +6,6 @@ pub mod topics; use core::{marker::PhantomData, ops::DerefMut, sync::atomic}; -use bitmaps::{Bits, BitsImpl}; pub use data_types::Patch; use embassy_sync::{ blocking_mutex::raw::{NoopRawMutex, RawMutex}, @@ -34,22 +33,20 @@ pub trait ShadowState: ShadowPatch + Default { const MAX_PAYLOAD_SIZE: usize = 512; } -struct ShadowHandler<'a, 'm, M: RawMutex, S: ShadowState, const SUBS: usize> +struct ShadowHandler<'a, 'm, M: RawMutex, S: ShadowState> where - BitsImpl<{ SUBS }>: Bits, [(); S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD]:, { - mqtt: &'m embedded_mqtt::MqttClient<'a, M, SUBS>, - subscription: Mutex>>, + mqtt: &'m embedded_mqtt::MqttClient<'a, M>, + subscription: Mutex>>, _shadow: PhantomData, // request_lock is used to ensure that shadow operations such as subscribing, updating, or // deleting are serialized, preventing multiple concurrent requests to the same MQTT topics. request_lock: Mutex, } -impl<'a, 'm, M: RawMutex, S: ShadowState, const SUBS: usize> ShadowHandler<'a, 'm, M, S, SUBS> +impl<'a, 'm, M: RawMutex, S: ShadowState> ShadowHandler<'a, 'm, M, S> where - BitsImpl<{ SUBS }>: Bits, [(); S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD]:, { async fn handle_delta(&self) -> Result, Error> { @@ -344,7 +341,7 @@ where &self, topic: topics::Topic, payload: impl ToPayload, - ) -> Result, Error> { + ) -> Result, Error> { let (accepted, rejected) = match topic { Topic::Get => (Topic::GetAccepted, Topic::GetRejected), Topic::Update => (Topic::UpdateAccepted, Topic::UpdateRejected), @@ -394,18 +391,16 @@ where } } -pub struct PersistedShadow<'a, 'm, S: ShadowState, M: RawMutex, D: ShadowDAO, const SUBS: usize> +pub struct PersistedShadow<'a, 'm, S: ShadowState, M: RawMutex, D: ShadowDAO> where - BitsImpl<{ SUBS }>: Bits, [(); S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD]:, { - handler: ShadowHandler<'a, 'm, M, S, SUBS>, + handler: ShadowHandler<'a, 'm, M, S>, pub(crate) dao: Mutex, } -impl<'a, 'm, S, M, D, const SUBS: usize> PersistedShadow<'a, 'm, S, M, D, SUBS> +impl<'a, 'm, S, M, D> PersistedShadow<'a, 'm, S, M, D> where - BitsImpl<{ SUBS }>: Bits, S: ShadowState + Default, M: RawMutex, D: ShadowDAO, @@ -413,7 +408,7 @@ where { /// Instantiate a new shadow that will be automatically persisted to NVM /// based on the passed `DAO`. - pub fn new(mqtt: &'m embedded_mqtt::MqttClient<'a, M, SUBS>, dao: D) -> Self { + pub fn new(mqtt: &'m embedded_mqtt::MqttClient<'a, M>, dao: D) -> Self { let handler = ShadowHandler { mqtt, subscription: Mutex::new(None), @@ -522,24 +517,22 @@ where } } -pub struct Shadow<'a, 'm, S: ShadowState, M: RawMutex, const SUBS: usize> +pub struct Shadow<'a, 'm, S: ShadowState, M: RawMutex> where - BitsImpl<{ SUBS }>: Bits, [(); S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD]:, { state: S, - handler: ShadowHandler<'a, 'm, M, S, SUBS>, + handler: ShadowHandler<'a, 'm, M, S>, } -impl<'a, 'm, S, M, const SUBS: usize> Shadow<'a, 'm, S, M, SUBS> +impl<'a, 'm, S, M> Shadow<'a, 'm, S, M> where - BitsImpl<{ SUBS }>: Bits, S: ShadowState, M: RawMutex, [(); S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD]:, { /// Instantiate a new non-persisted shadow - pub fn new(state: S, mqtt: &'m embedded_mqtt::MqttClient<'a, M, SUBS>) -> Self { + pub fn new(state: S, mqtt: &'m embedded_mqtt::MqttClient<'a, M>) -> Self { let handler = ShadowHandler { mqtt, subscription: Mutex::new(None), @@ -613,9 +606,8 @@ where } } -impl<'a, 'm, S, M, const SUBS: usize> core::fmt::Debug for Shadow<'a, 'm, S, M, SUBS> +impl<'a, 'm, S, M> core::fmt::Debug for Shadow<'a, 'm, S, M> where - BitsImpl<{ SUBS }>: Bits, S: ShadowState + core::fmt::Debug, M: RawMutex, [(); S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD]:, @@ -631,9 +623,8 @@ where } #[cfg(feature = "defmt")] -impl<'a, 'm, S, M, const SUBS: usize> defmt::Format for Shadow<'a, 'm, S, M, SUBS> +impl<'a, 'm, S, M> defmt::Format for Shadow<'a, 'm, S, M> where - BitsImpl<{ SUBS }>: Bits, S: ShadowState + defmt::Format, M: RawMutex, [(); S::MAX_PAYLOAD_SIZE + PARTIAL_REQUEST_OVERHEAD]:, diff --git a/tests/ota_mqtt.rs b/tests/ota_mqtt.rs index b38d10e..ddd7b78 100644 --- a/tests/ota_mqtt.rs +++ b/tests/ota_mqtt.rs @@ -3,7 +3,6 @@ mod common; -use bitmaps::{Bits, BitsImpl}; use common::credentials; use common::file_handler::{FileHandler, State as FileHandlerState}; use common::network::TlsNetwork; @@ -44,13 +43,10 @@ impl<'a> Jobs<'a> { } } -fn handle_ota<'a, const SUBS: usize>( - message: Message<'a, SliceBufferProvider<'a>, SUBS>, +fn handle_ota<'a>( + message: Message<'a, NoopRawMutex, SliceBufferProvider<'a>>, config: &ota::config::Config, -) -> Option -where - BitsImpl: Bits, -{ +) -> Option { let job = match jobs::Topic::from_str(message.topic_name()) { Some(jobs::Topic::NotifyNext) => { let (execution_changed, _) = @@ -103,7 +99,7 @@ async fn test_mqtt_ota() { .keepalive_interval(embassy_time::Duration::from_secs(50)) .build(); - static STATE: StaticCell> = StaticCell::new(); + static STATE: StaticCell> = StaticCell::new(); let state = STATE.init(State::new()); let (mut stack, client) = embedded_mqtt::new(state, config); diff --git a/tests/provisioning.rs b/tests/provisioning.rs index 44e9570..8a51f52 100644 --- a/tests/provisioning.rs +++ b/tests/provisioning.rs @@ -80,8 +80,8 @@ async fn test_provisioning() { .keepalive_interval(embassy_time::Duration::from_secs(50)) .build(); - static STATE: StaticCell> = StaticCell::new(); - let state = STATE.init(State::::new()); + static STATE: StaticCell> = StaticCell::new(); + let state = STATE.init(State::new()); let (mut stack, client) = embedded_mqtt::new(state, config); let signing_key = credentials::signing_key(); @@ -96,14 +96,14 @@ async fn test_provisioning() { let mut credential_handler = CredentialDAO { creds: None }; #[cfg(not(feature = "provision_cbor"))] - let provision_fut = FleetProvisioner::provision::( + let provision_fut = FleetProvisioner::provision::( &client, &template_name, Some(parameters), &mut credential_handler, ); #[cfg(feature = "provision_cbor")] - let provision_fut = FleetProvisioner::provision_cbor::( + let provision_fut = FleetProvisioner::provision_cbor::( &client, &template_name, Some(parameters), diff --git a/tests/shadows.rs b/tests/shadows.rs index 86e2d93..0d9cdb0 100644 --- a/tests/shadows.rs +++ b/tests/shadows.rs @@ -40,8 +40,6 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use static_cell::StaticCell; -const MAX_SUBSCRIBERS: usize = 8; - #[derive(Debug, Default, Serialize, Deserialize, ShadowState, PartialEq)] #[shadow("state")] pub struct TestShadow { @@ -51,7 +49,7 @@ pub struct TestShadow { } /// Helper function to mimic cloud side updates using MQTT client directly -async fn cloud_update(client: &MqttClient<'static, NoopRawMutex, MAX_SUBSCRIBERS>, payload: &[u8]) { +async fn cloud_update(client: &MqttClient<'static, NoopRawMutex>, payload: &[u8]) { client .publish( Publish::builder() @@ -70,10 +68,7 @@ async fn cloud_update(client: &MqttClient<'static, NoopRawMutex, MAX_SUBSCRIBERS } /// Helper function to assert on the current shadow state -async fn assert_shadow( - client: &MqttClient<'static, NoopRawMutex, MAX_SUBSCRIBERS>, - expected: serde_json::Value, -) { +async fn assert_shadow(client: &MqttClient<'static, NoopRawMutex>, expected: serde_json::Value) { let mut get_shadow_sub = client .subscribe::<1>( Subscribe::builder() @@ -143,13 +138,12 @@ async fn test_shadow_update_from_device() { .keepalive_interval(embassy_time::Duration::from_secs(50)) .build(); - static STATE: StaticCell> = - StaticCell::new(); + static STATE: StaticCell> = StaticCell::new(); let state = STATE.init(State::new()); let (mut stack, client) = embedded_mqtt::new(state, config); // Create the shadow - let mut shadow = Shadow::::new(TestShadow::default(), &client); + let mut shadow = Shadow::::new(TestShadow::default(), &client); // let delta_fut = async { // loop {