Skip to content

Commit

Permalink
Fix OTA & bump dependencies (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
MathiasKoch authored Oct 29, 2024
1 parent c3456cd commit 2378458
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 203 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ bitmaps = { version = "3.1", default-features = false }
heapless = { version = "0.8", features = ["serde"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_cbor = { version = "0.11", default-features = false, optional = true }
serde-json-core = { version = "0.5" }
serde-json-core = { version = "0.6" }
shadow-derive = { path = "shadow_derive", version = "0.2.1" }
embedded-storage-async = "0.4"
embedded-mqtt = { git = "ssh://[email protected]/FactbirdHQ/embedded-mqtt", rev = "5e28a55da737356a3a9c1597ae4ff123e2481b1b" }
embedded-mqtt = { git = "ssh://[email protected]/FactbirdHQ/embedded-mqtt", rev = "74eb53d" }

futures = { version = "0.3.28", default-features = false }

Expand All @@ -42,7 +42,7 @@ defmt = { version = "0.3", optional = true }

[dev-dependencies]
native-tls = { version = "0.2" }
embedded-nal-async = "0.7"
embedded-nal-async = "0.8"
env_logger = "0.11"
sha2 = "0.10.1"
static_cell = { version = "2", features = ["nightly"] }
Expand Down
2 changes: 1 addition & 1 deletion src/jobs/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub struct Jobs {
pub struct ErrorResponse<'a> {
pub code: ErrorCode,
/// An error message string.
message: &'a str,
pub message: &'a str,
/// A client token used to correlate requests and responses. Enter an
/// arbitrary value here and it is reflected in the response.
#[serde(rename = "clientToken")]
Expand Down
2 changes: 1 addition & 1 deletion src/ota/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl Default for Config {
Self {
block_size: 1024,
max_request_momentum: 3,
request_wait: Duration::from_secs(8),
request_wait: Duration::from_secs(5),
status_update_frequency: 24,
self_test_timeout: None,
}
Expand Down
2 changes: 0 additions & 2 deletions src/ota/control_interface/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::jobs::data_types::JobStatus;

use super::{
config::Config,
encoding::{json::JobStatusReason, FileContext},
error::OtaError,
ProgressState,
Expand All @@ -16,7 +15,6 @@ pub trait ControlInterface {
&self,
file_ctx: &FileContext,
progress: &mut ProgressState,
config: &Config,
status: JobStatus,
reason: JobStatusReason,
) -> Result<(), OtaError>;
Expand Down
177 changes: 90 additions & 87 deletions src/ota/control_interface/mqtt.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use core::fmt::Write;

use bitmaps::{Bits, BitsImpl};
use embassy_sync::blocking_mutex::raw::RawMutex;
use embassy_time::with_timeout;
use embedded_mqtt::{DeferredPayload, EncodingError, Publish, QoS, Subscribe, SubscribeTopic};
use futures::StreamExt as _;

use super::ControlInterface;
use crate::jobs::data_types::{ErrorResponse, JobStatus, UpdateJobExecutionResponse};
Expand All @@ -14,10 +13,7 @@ use crate::ota::encoding::{self, FileContext};
use crate::ota::error::OtaError;
use crate::ota::ProgressState;

impl<'a, M: RawMutex, const SUBS: usize> ControlInterface for embedded_mqtt::MqttClient<'a, M, SUBS>
where
BitsImpl<{ SUBS }>: Bits,
{
impl<'a, M: RawMutex> ControlInterface for embedded_mqtt::MqttClient<'a, M> {
/// Check for next available OTA job from the job service by publishing a
/// "get next job" message to the job service.
async fn request_job(&self) -> Result<(), OtaError> {
Expand Down Expand Up @@ -45,7 +41,6 @@ where
&self,
file_ctx: &FileContext,
progress_state: &mut ProgressState,
config: &Config,
status: JobStatus,
reason: JobStatusReason,
) -> Result<(), OtaError> {
Expand All @@ -63,15 +58,6 @@ where
if let JobStatus::InProgress | JobStatus::Succeeded = status {
let received_blocks = progress_state.total_blocks - progress_state.blocks_remaining;

// Output a status update once in a while. Always update first and
// last status
if progress_state.blocks_remaining != 0
&& received_blocks != 0
&& received_blocks % config.status_update_frequency as usize != 0
{
return Ok(());
}

// Don't override the progress on succeeded, nor on self-test
// active. (Cases where progress counter is lost due to device
// restarts)
Expand All @@ -92,40 +78,40 @@ where

// Downgrade progress updates to QOS 0 to avoid overloading MQTT
// buffers during active streaming. But make sure to always send and await ack for first update and last update
if status == JobStatus::InProgress
&& progress_state.blocks_remaining != 0
&& received_blocks != 0
{
qos = QoS::AtMostOnce;
}
// if status == JobStatus::InProgress
// && progress_state.blocks_remaining != 0
// && received_blocks != 0
// {
// qos = QoS::AtMostOnce;
// }
}

let mut sub = self
.subscribe::<2>(
Subscribe::builder()
.topics(&[
SubscribeTopic::builder()
.topic_path(
JobTopic::UpdateAccepted(file_ctx.job_name.as_str())
.format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 34 }>(
self.client_id(),
)?
.as_str(),
)
.build(),
SubscribeTopic::builder()
.topic_path(
JobTopic::UpdateRejected(file_ctx.job_name.as_str())
.format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 34 }>(
self.client_id(),
)?
.as_str(),
)
.build(),
])
.build(),
)
.await?;
// let mut sub = self
// .subscribe::<2>(
// Subscribe::builder()
// .topics(&[
// SubscribeTopic::builder()
// .topic_path(
// JobTopic::UpdateAccepted(file_ctx.job_name.as_str())
// .format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 34 }>(
// self.client_id(),
// )?
// .as_str(),
// )
// .build(),
// SubscribeTopic::builder()
// .topic_path(
// JobTopic::UpdateRejected(file_ctx.job_name.as_str())
// .format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 34 }>(
// self.client_id(),
// )?
// .as_str(),
// )
// .build(),
// ])
// .build(),
// )
// .await?;

let topic = JobTopic::Update(file_ctx.job_name.as_str())
.format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 25 }>(self.client_id())?;
Expand All @@ -151,44 +137,61 @@ where
)
.await?;

loop {
let message = sub.next().await.ok_or(JobError::Encoding)?;

// Check if topic is GetAccepted
match crate::jobs::Topic::from_str(message.topic_name()) {
Some(crate::jobs::Topic::UpdateAccepted(_)) => {
// Check client token
let (response, _) = serde_json_core::from_slice::<
UpdateJobExecutionResponse<encoding::json::OtaJob<'_>>,
>(message.payload())
.map_err(|_| JobError::Encoding)?;

if response.client_token != Some(self.client_id()) {
error!(
"Unexpected client token received: {}, expected: {}",
response.client_token.unwrap_or("None"),
self.client_id()
);
continue;
}

return Ok(());
}
Some(crate::jobs::Topic::UpdateRejected(_)) => {
let (error_response, _) =
serde_json_core::from_slice::<ErrorResponse>(message.payload())
.map_err(|_| JobError::Encoding)?;

if error_response.client_token != Some(self.client_id()) {
continue;
}

return Err(OtaError::UpdateRejected(error_response.code));
}
_ => {
error!("Expected Topic name GetRejected or GetAccepted but got something else");
}
}
}
Ok(())

// loop {
// let message = match with_timeout(
// embassy_time::Duration::from_secs(1),
// sub.next_message(),
// )
// .await
// {
// Ok(res) => res.ok_or(JobError::Encoding)?,
// Err(_) => return Err(OtaError::Timeout),
// };

// // Check if topic is GetAccepted
// match crate::jobs::Topic::from_str(message.topic_name()) {
// Some(crate::jobs::Topic::UpdateAccepted(_)) => {
// // Check client token
// let (response, _) = serde_json_core::from_slice::<
// UpdateJobExecutionResponse<encoding::json::OtaJob<'_>>,
// >(message.payload())
// .map_err(|_| JobError::Encoding)?;

// if response.client_token != Some(self.client_id()) {
// error!(
// "Unexpected client token received: {}, expected: {}",
// response.client_token.unwrap_or("None"),
// self.client_id()
// );
// continue;
// }

// return Ok(());
// }
// Some(crate::jobs::Topic::UpdateRejected(_)) => {
// let (error_response, _) =
// serde_json_core::from_slice::<ErrorResponse>(message.payload())
// .map_err(|_| JobError::Encoding)?;

// if error_response.client_token != Some(self.client_id()) {
// error!(
// "Unexpected client token received: {}, expected: {}",
// error_response.client_token.unwrap_or("None"),
// self.client_id()
// );
// continue;
// }

// error!("OTA Update rejected: {:?}", error_response.message);

// return Err(OtaError::UpdateRejected(error_response.code));
// }
// _ => {
// error!("Expected Topic name GetRejected or GetAccepted but got something else");
// }
// }
// }
}
}
13 changes: 3 additions & 10 deletions src/ota/data_interface/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use core::fmt::{Display, Write};
use core::ops::DerefMut;
use core::str::FromStr;

use bitmaps::{Bits, BitsImpl};
use embassy_sync::blocking_mutex::raw::RawMutex;
use embedded_mqtt::{
DeferredPayload, EncodingError, MqttClient, Publish, Subscribe, SubscribeTopic, Subscription,
Expand Down Expand Up @@ -125,22 +124,16 @@ impl<'a> OtaTopic<'a> {
}
}

impl<'a, 'b, M: RawMutex, const SUBS: usize> BlockTransfer for Subscription<'a, 'b, M, SUBS, 1>
where
BitsImpl<{ SUBS }>: Bits,
{
impl<'a, 'b, M: RawMutex> BlockTransfer for Subscription<'a, 'b, M, 1> {
async fn next_block(&mut self) -> Result<Option<impl DerefMut<Target = [u8]>>, OtaError> {
Ok(self.next().await)
}
}

impl<'a, M: RawMutex, const SUBS: usize> DataInterface for MqttClient<'a, M, SUBS>
where
BitsImpl<{ SUBS }>: Bits,
{
impl<'a, M: RawMutex> DataInterface for MqttClient<'a, M> {
const PROTOCOL: Protocol = Protocol::Mqtt;

type ActiveTransfer<'t> = Subscription<'a, 't, M, SUBS, 1> where Self: 't;
type ActiveTransfer<'t> = Subscription<'a, 't, M, 1> where Self: 't;

/// Init file transfer by subscribing to the OTA data stream topic
async fn init_file_transfer(
Expand Down
1 change: 1 addition & 0 deletions src/ota/encoding/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ mod tests {
(JobStatusReason::Accepted, "accepted"),
(JobStatusReason::Rejected, "rejected"),
(JobStatusReason::Aborted, "aborted"),
(JobStatusReason::Pal(123), "pal err"),
];

for (reason, exp) in reasons {
Expand Down
Loading

0 comments on commit 2378458

Please sign in to comment.