From 12bd330017d6f4434f10f9b7969ae018980d293b Mon Sep 17 00:00:00 2001 From: nnaveen979 Date: Mon, 4 Nov 2024 19:25:39 +0530 Subject: [PATCH] feat: RPPL-2681 thunderBroker migration workaround changes --- core/main/src/broker/broker_utils.rs | 2 +- core/main/src/utils/rpc_utils.rs | 18 +- core/sdk/src/api/device/device_operator.rs | 27 + core/sdk/src/utils/rpc_utils.rs | 10 + device/thunder_ripple_sdk/Cargo.toml | 8 +- .../src/bootstrap/boot_thunder.rs | 36 +- .../src/bootstrap/setup_thunder_processors.rs | 41 +- .../src/client/plugin_manager.rs | 1 + .../src/client/thunder_async_client.rs | 304 ++++++++ .../src/client/thunder_client.rs | 47 +- .../src/client/thunder_client2.rs | 286 +++++++ .../src/client/thunder_client_pool.rs | 28 +- .../src/client/thunder_plugins_status_mgr.rs | 712 ++++++++++++++++++ device/thunder_ripple_sdk/src/lib.rs | 3 + .../thunder_ripple_sdk/src/thunder_state.rs | 2 +- device/thunder_ripple_sdk/src/utils.rs | 12 +- 16 files changed, 1478 insertions(+), 59 deletions(-) create mode 100644 device/thunder_ripple_sdk/src/client/thunder_async_client.rs create mode 100644 device/thunder_ripple_sdk/src/client/thunder_client2.rs create mode 100644 device/thunder_ripple_sdk/src/client/thunder_plugins_status_mgr.rs diff --git a/core/main/src/broker/broker_utils.rs b/core/main/src/broker/broker_utils.rs index 642997fff..11c40bebc 100644 --- a/core/main/src/broker/broker_utils.rs +++ b/core/main/src/broker/broker_utils.rs @@ -17,12 +17,12 @@ use std::time::Duration; -use crate::utils::rpc_utils::extract_tcp_port; use futures::stream::{SplitSink, SplitStream}; use futures_util::StreamExt; use ripple_sdk::{ log::{error, info}, tokio::{self, net::TcpStream}, + utils::rpc_utils::extract_tcp_port, }; use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream}; diff --git a/core/main/src/utils/rpc_utils.rs b/core/main/src/utils/rpc_utils.rs index 9097a5224..7fe8f36c4 100644 --- a/core/main/src/utils/rpc_utils.rs +++ b/core/main/src/utils/rpc_utils.rs @@ -110,12 +110,12 @@ pub fn get_base_method(method: &str) -> String { method_vec.first().unwrap().to_string().to_lowercase() } -pub fn extract_tcp_port(url: &str) -> String { - let url_split: Vec<&str> = url.split("://").collect(); - if let Some(domain) = url_split.get(1) { - let domain_split: Vec<&str> = domain.split('/').collect(); - domain_split.first().unwrap().to_string() - } else { - url.to_owned() - } -} +// pub fn extract_tcp_port(url: &str) -> String { +// let url_split: Vec<&str> = url.split("://").collect(); +// if let Some(domain) = url_split.get(1) { +// let domain_split: Vec<&str> = domain.split('/').collect(); +// domain_split.first().unwrap().to_string() +// } else { +// url.to_owned() +// } +// } diff --git a/core/sdk/src/api/device/device_operator.rs b/core/sdk/src/api/device/device_operator.rs index 05ac2ba02..13d9c3f6f 100644 --- a/core/sdk/src/api/device/device_operator.rs +++ b/core/sdk/src/api/device/device_operator.rs @@ -44,6 +44,33 @@ pub enum DeviceChannelRequest { Unsubscribe(DeviceUnsubscribeRequest), } +impl DeviceChannelRequest { + pub fn get_callsign_method(&self) -> (String, String) { + match self { + DeviceChannelRequest::Call(c) => { + let mut collection: Vec<&str> = c.method.split('.').collect(); + let method = collection.pop().unwrap_or_default(); + let callsign = collection.join("."); + (callsign, method.into()) + } + DeviceChannelRequest::Subscribe(s) => (s.module.clone(), s.event_name.clone()), + DeviceChannelRequest::Unsubscribe(u) => (u.module.clone(), u.event_name.clone()), + } + } + + pub fn is_subscription(&self) -> bool { + !matches!(self, DeviceChannelRequest::Call(_)) + } + + pub fn is_unsubscribe(&self) -> Option { + if let DeviceChannelRequest::Unsubscribe(u) = self { + Some(u.clone()) + } else { + None + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeviceCallRequest { pub method: String, diff --git a/core/sdk/src/utils/rpc_utils.rs b/core/sdk/src/utils/rpc_utils.rs index c77ea07a9..9d2d79bb4 100644 --- a/core/sdk/src/utils/rpc_utils.rs +++ b/core/sdk/src/utils/rpc_utils.rs @@ -20,3 +20,13 @@ use jsonrpsee::core::Error; pub fn rpc_err(msg: impl Into) -> Error { Error::Custom(msg.into()) } + +pub fn extract_tcp_port(url: &str) -> String { + let url_split: Vec<&str> = url.split("://").collect(); + if let Some(domain) = url_split.get(1) { + let domain_split: Vec<&str> = domain.split('/').collect(); + domain_split.first().unwrap().to_string() + } else { + url.to_owned() + } +} diff --git a/device/thunder_ripple_sdk/Cargo.toml b/device/thunder_ripple_sdk/Cargo.toml index 9fc186ca6..b82b416ba 100644 --- a/device/thunder_ripple_sdk/Cargo.toml +++ b/device/thunder_ripple_sdk/Cargo.toml @@ -35,6 +35,8 @@ contract_tests = [ "tree_magic_mini", "rstest" ] +thunderBroker_enabled = [] +default = ["thunderBroker_enabled"] [dependencies] base64.workspace = true @@ -44,6 +46,9 @@ regex.workspace = true jsonrpsee = { workspace = true, features = ["macros", "ws-client"] } serde.workspace = true url.workspace = true +serde_json.workspace = true +futures-channel.workspace = true +futures.workspace = true strum = { version = "0.24", default-features = false } strum_macros = "0.24" @@ -56,8 +61,9 @@ csv = "1.1" # Allowing minor updates home = { version = "=0.5.5", optional = true } tree_magic_mini = { version = "=3.0.3", optional = true } rstest = { version = "0.18.2", optional = true, default-features = false } +tokio-tungstenite = { workspace = true, features = ["handshake"] } +futures-util = { version = "0.3.28", features = ["sink", "std"], default-features = false} [dev-dependencies] -tokio-tungstenite = { workspace = true, features = ["native-tls"] } ripple_sdk = { path = "../../core/sdk", features = ["tdk"] } diff --git a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs index 61ba1de8b..f29f82156 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs @@ -19,6 +19,9 @@ use crate::{ bootstrap::setup_thunder_processors::SetupThunderProcessor, client::plugin_manager::ThunderPluginBootParam, thunder_state::ThunderBootstrapStateWithClient, }; + +use crate::client::thunder_client2; +use crate::thunder_state::ThunderState; use ripple_sdk::{ extn::client::extn_client::ExtnClient, log::{error, info}, @@ -31,15 +34,32 @@ pub async fn boot_thunder( plugin_param: ThunderPluginBootParam, ) -> Option { info!("Booting thunder"); - if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await { - if let Ok(state) = ThunderPoolStep::setup(state).await { - SetupThunderProcessor::setup(state.clone()).await; - return Some(state); + + #[cfg(feature = "thunderBroker_enabled")] + { + info!("thunderBroker_enabled feature is enabled"); + if let Ok(thndr_client) = thunder_client2::ThunderClientBuilder::get_client().await { + let thunder_state = ThunderState::new(state.clone(), thndr_client); + SetupThunderProcessor::setup(thunder_state, state.clone()).await; } else { - error!("Unable to connect to Thunder, error in ThunderPoolStep"); + error!("Unable to connect to Thunder_Broker, error in ThunderClientBuilder"); } - } else { - error!("Unable to connect to Thunder, error in ThunderGetConfigStep"); + None + } + + #[cfg(not(feature = "thunderBroker_enabled"))] + { + info!("thunderBroker_enabled feature is not enabled, go for thunderclient"); + if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await { + if let Ok(state) = ThunderPoolStep::setup(state).await { + SetupThunderProcessor::setup(state.clone()).await; + return Some(state); + } else { + error!("Unable to connect to Thunder, error in ThunderPoolStep"); + } + } else { + error!("Unable to connect to Thunder, error in ThunderGetConfigStep"); + }; + None } - None } diff --git a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs index 80e34c7d8..6d9131137 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs @@ -23,6 +23,8 @@ use crate::processors::thunder_package_manager::ThunderPackageManagerRequestProc use crate::processors::thunder_rfc::ThunderRFCProcessor; use crate::processors::thunder_telemetry::ThunderTelemetryProcessor; use crate::thunder_state::ThunderBootstrapStateWithClient; +use crate::thunder_state::ThunderState; +use ripple_sdk::extn::client::extn_client::ExtnClient; use crate::processors::{ thunder_browser::ThunderBrowserRequestProcessor, @@ -40,7 +42,7 @@ impl SetupThunderProcessor { pub fn get_name() -> String { "SetupThunderProcessor".into() } - + #[cfg(not(feature = "thunderBroker_enabled"))] pub async fn setup(state: ThunderBootstrapStateWithClient) { let mut extn_client = state.state.get_client(); extn_client @@ -73,4 +75,41 @@ impl SetupThunderProcessor { extn_client.add_request_processor(ThunderRFCProcessor::new(state.clone().state)); let _ = extn_client.event(ExtnStatus::Ready); } + + #[cfg(feature = "thunderBroker_enabled")] + pub async fn setup(thunder_state: ThunderState, thndrextn_client: ExtnClient) { + let mut extn_client = thndrextn_client.clone(); + extn_client.add_request_processor(ThunderDeviceInfoRequestProcessor::new( + thunder_state.clone(), + )); + extn_client + .add_request_processor(ThunderBrowserRequestProcessor::new(thunder_state.clone())); + extn_client.add_request_processor(ThunderWifiRequestProcessor::new(thunder_state.clone())); + extn_client + .add_request_processor(ThunderStorageRequestProcessor::new(thunder_state.clone())); + extn_client.add_request_processor(ThunderWindowManagerRequestProcessor::new( + thunder_state.clone(), + )); + extn_client.add_request_processor(ThunderRemoteAccessoryRequestProcessor::new( + thunder_state.clone(), + )); + extn_client.add_request_processor(ThunderOpenEventsProcessor::new(thunder_state.clone())); + + let package_manager_processor = + ThunderPackageManagerRequestProcessor::new(thunder_state.clone()); + extn_client.add_request_processor(package_manager_processor); + + if extn_client.get_bool_config("rdk_telemetry") { + match extn_client + .request(OperationalMetricRequest::Subscribe) + .await + { + Ok(_) => extn_client + .add_event_processor(ThunderTelemetryProcessor::new(thunder_state.clone())), + Err(_) => error!("Telemetry not setup"), + } + } + extn_client.add_request_processor(ThunderRFCProcessor::new(thunder_state.clone())); + let _ = extn_client.event(ExtnStatus::Ready); + } } diff --git a/device/thunder_ripple_sdk/src/client/plugin_manager.rs b/device/thunder_ripple_sdk/src/client/plugin_manager.rs index cd25b823b..873007ca9 100644 --- a/device/thunder_ripple_sdk/src/client/plugin_manager.rs +++ b/device/thunder_ripple_sdk/src/client/plugin_manager.rs @@ -33,6 +33,7 @@ use ripple_sdk::{ use serde::{Deserialize, Serialize}; use super::thunder_plugin::ThunderPlugin::Controller; +#[cfg(not(feature = "thunderBroker_enabled"))] use super::{thunder_client::ThunderClient, thunder_plugin::ThunderPlugin}; pub struct ActivationSubscriber { diff --git a/device/thunder_ripple_sdk/src/client/thunder_async_client.rs b/device/thunder_ripple_sdk/src/client/thunder_async_client.rs new file mode 100644 index 000000000..5c512dd8e --- /dev/null +++ b/device/thunder_ripple_sdk/src/client/thunder_async_client.rs @@ -0,0 +1,304 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::time::Duration; + +use futures::stream::{SplitSink, SplitStream}; +use futures_util::{SinkExt, StreamExt}; +use ripple_sdk::{ + api::{ + device::device_operator::DeviceChannelRequest, gateway::rpc_gateway_api::JsonRpcApiResponse, + }, + log::{debug, error, info}, + tokio::{ + self, + net::TcpStream, + sync::mpsc::{Receiver, Sender}, + }, + utils::{error::RippleError, rpc_utils::extract_tcp_port}, +}; + +use super::thunder_plugins_status_mgr::{BrokerCallback, BrokerSender, StatusManager}; +use crate::utils::get_next_id; +use serde_json::json; +use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream}; + +#[derive(Clone, Debug)] +pub struct ThunderAsyncClient { + status_manager: StatusManager, + sender: BrokerSender, + callback: BrokerCallback, +} + +#[derive(Clone, Debug)] +pub struct ThunderAsyncRequest { + pub id: u64, + request: DeviceChannelRequest, +} + +impl ThunderAsyncRequest { + pub fn new(request: DeviceChannelRequest) -> Self { + Self { + id: get_next_id(), + request, + } + } +} + +#[derive(Clone, Debug)] +pub struct ThunderAsyncResponse { + pub id: Option, + pub result: Result, +} + +impl ThunderAsyncResponse { + fn new_response(response: JsonRpcApiResponse) -> Self { + Self { + id: None, + result: Ok(response), + } + } + + fn new_error(id: u64, e: RippleError) -> Self { + Self { + id: Some(id), + result: Err(e), + } + } +} + +impl ThunderAsyncClient { + pub fn get_sender(&self) -> BrokerSender { + self.sender.clone() + } + + async fn create_ws( + endpoint: &str, + ) -> ( + SplitSink, Message>, + SplitStream>, + ) { + info!("Broker Endpoint url {}", endpoint); + + let url = url::Url::parse(endpoint).unwrap(); + let port = extract_tcp_port(endpoint); + info!("Url host str {}", url.host_str().unwrap()); + let mut index = 0; + + loop { + // Try connecting to the tcp port first + if let Ok(v) = TcpStream::connect(&port).await { + // Setup handshake for websocket with the tcp port + // Some WS servers lock on to the Port but not setup handshake till they are fully setup + if let Ok((stream, _)) = client_async(endpoint, v).await { + break stream.split(); + } + } + if (index % 10).eq(&0) { + error!( + "Broker with {} failed with retry for last {} secs in {}", + endpoint, index, port + ); + } + index += 1; + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + fn prepare_request(&self, request: &ThunderAsyncRequest) -> Result, RippleError> { + let mut requests = Vec::new(); + let id = request.id; + let (callsign, method) = request.request.get_callsign_method(); + if method.is_empty() { + return Err(RippleError::InvalidInput); + } + // check if the plugin is activated. + let status = match self.status_manager.get_status(callsign.clone()) { + Some(v) => v.clone(), + None => { + self.status_manager + .add_broker_request_to_pending_list(callsign.clone(), request.clone()); + // PluginState is not available with StateManager, create an internal thunder request to activate the plugin + let request = self + .status_manager + .generate_plugin_status_request(callsign.clone()); + requests.push(request.to_string()); + return Ok(requests); + } + }; + + if status.state.is_missing() { + error!("Plugin {} is missing", callsign); + return Err(RippleError::ServiceError); + } + + if status.state.is_activating() { + info!("Plugin {} is activating", callsign); + return Err(RippleError::ServiceNotReady); + } + + if !status.state.is_activated() { + // add the broker request to pending list + self.status_manager + .add_broker_request_to_pending_list(callsign.clone(), request.clone()); + // create an internal thunder request to activate the plugin + let request = self + .status_manager + .generate_plugin_activation_request(callsign.clone()); + requests.push(request.to_string()); + return Ok(requests); + } + + match &request.request { + DeviceChannelRequest::Call(c) => { + // Simple request and response handling + requests.push( + json!({ + "jsonrpc": "2.0", + "id": id, + "method": c.method, + "params": c.params + }) + .to_string(), + ) + } + DeviceChannelRequest::Unsubscribe(_) => requests.push( + json!({ + "jsonrpc": "2.0", + "id": id, + "method": format!("{}.unregister", callsign), + "params": { + "event": method, + "id": "client.events" + } + }) + .to_string(), + ), + DeviceChannelRequest::Subscribe(_) => requests.push( + json!({ + "jsonrpc": "2.0", + "id": id, + "method": format!("{}.register", callsign), + "params": json!({ + "event": method, + "id": "client.events" + }) + }) + .to_string(), + ), + } + + Ok(requests) + } + + pub fn new(callback: BrokerCallback, tx: Sender) -> Self { + Self { + status_manager: StatusManager::new(), + sender: BrokerSender { sender: tx.clone() }, + callback, + } + } + + pub async fn start( + &self, + url: &str, + mut tr: Receiver, + ) -> Receiver { + let callback = self.callback.clone(); + let (mut ws_tx, mut ws_rx) = Self::create_ws(url).await; + // send the first request to the broker. This is the controller statechange subscription request + let status_request = self + .status_manager + .generate_state_change_subscribe_request(); + + let _feed = ws_tx + .feed(tokio_tungstenite::tungstenite::Message::Text( + status_request.to_string(), + )) + .await; + let _flush = ws_tx.flush().await; + let client_c = self.clone(); + let callback_for_sender = callback.clone(); + tokio::pin! { + let read = ws_rx.next(); + } + loop { + tokio::select! { + Some(value) = &mut read => { + match value { + Ok(v) => { + if let tokio_tungstenite::tungstenite::Message::Text(t) = v { + if client_c.status_manager.is_controller_response(client_c.get_sender(), callback.clone(), t.as_bytes()).await { + client_c.status_manager.handle_controller_response(client_c.get_sender(), callback.clone(), t.as_bytes()).await; + } + else { + // send the incoming text without context back to the sender + Self::handle_jsonrpc_response(t.as_bytes(),callback.clone()).await + } + } + }, + Err(e) => { + error!("Broker Websocket error on read {:?}", e); + // Time to reconnect Thunder with existing subscription + break; + } + } + }, + Some(request) = tr.recv() => { + debug!("Got request from receiver for broker {:?}", request); + match client_c.prepare_request(&request) { + Ok(updated_request) => { + debug!("Sending request to broker {:?}", updated_request); + for r in updated_request { + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; + let _flush = ws_tx.flush().await; + } + } + Err(e) => { + let response = ThunderAsyncResponse::new_error(request.id,e.clone()); + match e { + RippleError::ServiceNotReady => { + info!("Thunder Service not ready, request is now in pending list {:?}", request); + }, + _ => { + error!("error preparing request {:?}", e) + } + } + callback_for_sender.send(response).await + } + } + } + } + } + // when WS is disconnected return the tr back to caller helps restabilish connection + tr + } + + /// Default handler method for the broker to remove the context and send it back to the + /// client for consumption + async fn handle_jsonrpc_response(result: &[u8], callback: BrokerCallback) { + if let Ok(message) = serde_json::from_slice::(result) { + callback + .send(ThunderAsyncResponse::new_response(message)) + .await + } else { + error!("Invalid JSON RPC message sent by Thunder"); + } + } + + pub async fn send(&self, request: ThunderAsyncRequest) {} +} diff --git a/device/thunder_ripple_sdk/src/client/thunder_client.rs b/device/thunder_ripple_sdk/src/client/thunder_client.rs index 99b1036d4..76fd2a273 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_client.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_client.rs @@ -62,16 +62,16 @@ use super::{ plugin_manager::{PluginActivatedResult, PluginManagerCommand}, }; use std::{env, process::Command}; - +#[cfg(not(feature = "thunderBroker_enabled"))] pub struct ThunderClientBuilder; - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug)] pub struct ThunderCallMessage { pub method: String, pub params: Option, pub callback: OneShotSender, } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderCallMessage { pub fn callsign(&self) -> String { JsonRpcMethodLocator::from_str(&self.method) @@ -86,13 +86,13 @@ impl ThunderCallMessage { .method_name } } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug, Serialize, Deserialize)] pub struct ThunderRegisterParams { pub event: String, pub id: String, } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug)] pub struct ThunderSubscribeMessage { pub module: String, @@ -102,7 +102,7 @@ pub struct ThunderSubscribeMessage { pub callback: Option>, pub sub_id: Option, } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderSubscribeMessage { pub fn resubscribe(&self) -> ThunderSubscribeMessage { ThunderSubscribeMessage { @@ -115,21 +115,21 @@ impl ThunderSubscribeMessage { } } } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug, Clone)] pub struct ThunderUnsubscribeMessage { pub module: String, pub event_name: String, pub subscription_id: Option, } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug)] pub enum ThunderMessage { ThunderCallMessage(ThunderCallMessage), ThunderSubscribeMessage(ThunderSubscribeMessage), ThunderUnsubscribeMessage(ThunderUnsubscribeMessage), } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderMessage { pub fn clone(&self, intercept_tx: OneShotSender) -> ThunderMessage { match self { @@ -158,6 +158,7 @@ impl ThunderMessage { } #[derive(Debug, Clone)] +#[cfg(not(feature = "thunderBroker_enabled"))] pub struct ThunderClient { pub sender: Option>, pub pooled_sender: Option>, @@ -165,12 +166,12 @@ pub struct ThunderClient { pub plugin_manager_tx: Option>, pub subscriptions: Option>>>, } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug, Serialize, Deserialize)] pub struct DefaultThunderResult { pub success: bool, } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderClient { /// Sends a message to thunder. If this client is pooled /// then it will wrap the message in a pool command before sending @@ -187,7 +188,7 @@ impl ThunderClient { } } } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[async_trait] impl DeviceOperator for ThunderClient { async fn call(&self, request: DeviceCallRequest) -> DeviceResponseMessage { @@ -237,7 +238,7 @@ impl DeviceOperator for ThunderClient { self.send_message(msg).await; } } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug)] pub struct ThunderSubscription { handle: JoinHandle<()>, @@ -245,7 +246,7 @@ pub struct ThunderSubscription { listeners: HashMap>, rpc_response: DeviceResponseMessage, } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderClient { async fn subscribe( client_id: Uuid, @@ -477,7 +478,7 @@ impl ThunderClient { None } } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderClientBuilder { fn parse_subscribe_method(subscribe_method: &str) -> Option<(String, String)> { if let Some(client_start) = subscribe_method.find("client.") { @@ -669,12 +670,12 @@ impl ThunderClientBuilder { } } } - +#[cfg(not(feature = "thunderBroker_enabled"))] pub struct ThunderRawBoolRequest { method: String, v: bool, } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderRawBoolRequest { async fn send_request(self: Box) -> Value { let host = { @@ -715,11 +716,11 @@ impl ThunderRawBoolRequest { } } } - +#[cfg(not(feature = "thunderBroker_enabled"))] pub struct ThunderNoParamRequest { method: String, } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderNoParamRequest { async fn send_request(self: Box, client: &Client) -> Value { let result = client.request(&self.method, None).await; @@ -730,13 +731,13 @@ impl ThunderNoParamRequest { result.unwrap() } } - +#[cfg(not(feature = "thunderBroker_enabled"))] pub struct ThunderParamRequest<'a> { method: &'a str, params: &'a str, json_based: bool, } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl<'a> ThunderParamRequest<'a> { async fn send_request(self: Box, client: &Client) -> Value { let result = client.request(self.method, self.get_params()).await; @@ -762,12 +763,12 @@ impl<'a> ThunderParamRequest<'a> { } } } - +#[cfg(not(feature = "thunderBroker_enabled"))] fn return_message(callback: OneShotSender, response: Value) { let msg = DeviceResponseMessage::call(response); oneshot_send_and_log(callback, msg, "returning message"); } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[cfg(test)] mod tests { use super::*; diff --git a/device/thunder_ripple_sdk/src/client/thunder_client2.rs b/device/thunder_ripple_sdk/src/client/thunder_client2.rs new file mode 100644 index 000000000..624634db3 --- /dev/null +++ b/device/thunder_ripple_sdk/src/client/thunder_client2.rs @@ -0,0 +1,286 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::collections::{BTreeMap, HashMap}; +use std::str::FromStr; +use std::sync::{Arc, RwLock}; + +use jsonrpsee::core::client::{Client, ClientT, SubscriptionClientT}; +use jsonrpsee::ws_client::WsClientBuilder; + +use jsonrpsee::core::{async_trait, error::Error as JsonRpcError}; +use jsonrpsee::types::ParamsSer; +use regex::Regex; +use ripple_sdk::api::device::device_operator::DeviceChannelRequest; +use ripple_sdk::serde_json::json; +use ripple_sdk::tokio::sync::mpsc::Receiver; +use ripple_sdk::{ + api::device::device_operator::DeviceResponseMessage, + tokio::sync::mpsc::{self, Sender as MpscSender}, + tokio::{sync::Mutex, task::JoinHandle, time::sleep}, +}; +use ripple_sdk::{ + api::device::device_operator::{ + DeviceCallRequest, DeviceSubscribeRequest, DeviceUnsubscribeRequest, + }, + serde_json::{self, Value}, + tokio, +}; +use ripple_sdk::{ + api::device::device_operator::{DeviceChannelParams, DeviceOperator}, + uuid::Uuid, +}; +use ripple_sdk::{ + log::{error, info, warn}, + utils::channel_utils::{mpsc_send_and_log, oneshot_send_and_log}, +}; +use ripple_sdk::{ + tokio::sync::oneshot::{self, Sender as OneShotSender}, + utils::error::RippleError, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::thunder_state::ThunderConnectionState; +use crate::utils::{get_error_value, get_next_id}; + +use super::thunder_async_client::{ThunderAsyncClient, ThunderAsyncRequest, ThunderAsyncResponse}; +use super::thunder_client_pool::ThunderPoolCommand; + +use super::thunder_plugins_status_mgr::{BrokerCallback, BrokerSender}; +use super::{ + jsonrpc_method_locator::JsonRpcMethodLocator, + plugin_manager::{PluginActivatedResult, PluginManagerCommand}, +}; +use std::{env, process::Command}; + +#[derive(Debug)] +// pub enum ThunderMessage { +// ThunderCallMessage(ThunderCallMessage), +// ThunderSubscribeMessage(ThunderSubscribeMessage), +// ThunderUnsubscribeMessage(ThunderUnsubscribeMessage), +// } + +// impl ThunderMessage { +// pub fn clone(&self, intercept_tx: OneShotSender) -> ThunderMessage { +// match self { +// ThunderMessage::ThunderCallMessage(m) => { +// ThunderMessage::ThunderCallMessage(ThunderCallMessage { +// method: m.method.clone(), +// params: m.params.clone(), +// callback: intercept_tx, +// }) +// } +// ThunderMessage::ThunderSubscribeMessage(m) => { +// ThunderMessage::ThunderSubscribeMessage(ThunderSubscribeMessage { +// params: m.params.clone(), +// callback: Some(intercept_tx), +// module: m.module.clone(), +// event_name: m.event_name.clone(), +// handler: m.handler.clone(), +// sub_id: m.sub_id.clone(), +// }) +// } +// ThunderMessage::ThunderUnsubscribeMessage(m) => { +// ThunderMessage::ThunderUnsubscribeMessage(m.clone()) +// } +// } +// } +// } + +// #[derive(Debug)] +// pub struct ThunderCallMessage { +// pub method: String, +// pub params: Option, +// pub callback: OneShotSender, +// } + +// impl ThunderCallMessage { +// pub fn callsign(&self) -> String { +// JsonRpcMethodLocator::from_str(&self.method) +// .unwrap() +// .module +// .unwrap() +// } + +// pub fn method_name(&self) -> String { +// JsonRpcMethodLocator::from_str(&self.method) +// .unwrap() +// .method_name +// } +// } + +// #[derive(Debug)] +// pub struct ThunderSubscribeMessage { +// pub module: String, +// pub event_name: String, +// pub params: Option, +// pub handler: MpscSender, +// pub callback: Option>, +// pub sub_id: Option, +// } + +// impl ThunderSubscribeMessage { +// pub fn resubscribe(&self) -> ThunderSubscribeMessage { +// ThunderSubscribeMessage { +// module: self.module.clone(), +// event_name: self.event_name.clone(), +// params: self.params.clone(), +// handler: self.handler.clone(), +// callback: None, +// sub_id: self.sub_id.clone(), +// } +// } +// } + +// #[derive(Debug, Clone)] +// pub struct ThunderUnsubscribeMessage { +// pub module: String, +// pub event_name: String, +// pub subscription_id: Option, +// } + +pub struct ThunderClientBuilder; + +pub type BrokerSubMap = HashMap>>; + +#[derive(Debug, Clone)] +pub struct ThunderClient { + client: ThunderAsyncClient, + subscriptions: Arc>, + callbacks: Arc>>>>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DefaultThunderResult { + pub success: bool, +} + +impl ThunderClient { + fn add_to_callback(&self, request: &ThunderAsyncRequest) { + let mut callbacks = self.callbacks.write().unwrap(); + callbacks.insert(request.id, None); + } + + // if already subscibed updated handlers + fn check_sub( + &self, + request: &DeviceSubscribeRequest, + handler: MpscSender, + ) -> Option { + let mut subscriptions = self.subscriptions.write().unwrap(); + let key = format!("{}_{}", request.module, request.event_name); + if let Some(handlers) = subscriptions.get_mut(&key) { + handlers.push(handler); + return None; + } + let id = get_next_id(); + let async_request = + ThunderAsyncRequest::new(DeviceChannelRequest::Subscribe(request.clone())); + subscriptions.insert(key, vec![handler]); + Some(async_request) + } + + // if only one handler cleanup + fn check_unsub(&self, request: &DeviceUnsubscribeRequest) -> Option { + None + } +} + +#[async_trait] +impl DeviceOperator for ThunderClient { + async fn call(&self, request: DeviceCallRequest) -> DeviceResponseMessage { + let (tx, rx) = oneshot::channel::(); + let async_request = ThunderAsyncRequest::new(DeviceChannelRequest::Call(request)); + self.add_to_callback(&async_request); + self.client.send(async_request).await; + match rx.await { + Ok(response) => response, + Err(_) => DeviceResponseMessage { + message: Value::Null, + sub_id: None, + }, + } + } + + async fn subscribe( + &self, + request: DeviceSubscribeRequest, + handler: mpsc::Sender, + ) -> DeviceResponseMessage { + if let Some(subscribe_request) = self.check_sub(&request, handler.clone()) { + let (tx, rx) = oneshot::channel::(); + self.add_to_callback(&subscribe_request); + self.client.send(subscribe_request).await; + rx.await.unwrap() + } else { + warn!( + "Already subscribed to module: {}, event: {}", + request.module, request.event_name + ); + DeviceResponseMessage { + message: Value::Null, + sub_id: None, + } + } + } + + async fn unsubscribe(&self, request: DeviceUnsubscribeRequest) { + // deprecate + } +} + +#[derive(Debug)] +pub struct ThunderClientManager; + +impl ThunderClientManager { + fn manage( + client: ThunderClient, + mut request_tr: Receiver, + mut response_tr: Receiver, + ) { + let client_c = client.clone(); + tokio::spawn(async move { + loop { + request_tr = client_c.client.start("", request_tr).await; + error!("Thunder disconnected so reconnecting") + } + }); + + tokio::spawn(async move { + while let Some(v) = response_tr.recv().await { + // check with thunder client callbacks and subscriptions + } + }); + } +} + +impl ThunderClientBuilder { + pub async fn get_client() -> Result { + let (sender, tr) = mpsc::channel(10); + let callback = BrokerCallback { sender }; + let (broker_tx, broker_rx) = mpsc::channel(10); + let client = ThunderAsyncClient::new(callback, broker_tx); + let thunder_client = ThunderClient { + client, + subscriptions: Arc::new(RwLock::new(HashMap::new())), + callbacks: Arc::new(RwLock::new(HashMap::new())), + }; + ThunderClientManager::manage(thunder_client.clone(), broker_rx, tr); + Ok(thunder_client) + } +} diff --git a/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs b/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs index 1f7c4ff15..c324cb4de 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs @@ -15,11 +15,12 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, +#[cfg(not(feature = "thunderBroker_enabled"))] +use super::{ + plugin_manager::PluginManagerCommand, + thunder_client::{ThunderClient, ThunderMessage}, }; - +#[cfg(not(feature = "thunderBroker_enabled"))] use crate::{client::thunder_client::ThunderClientBuilder, thunder_state::ThunderConnectionState}; use ripple_sdk::{ api::device::device_operator::DeviceResponseMessage, @@ -29,30 +30,29 @@ use ripple_sdk::{ uuid::Uuid, }; use ripple_sdk::{tokio, utils::error::RippleError}; -use url::Url; - -use super::{ - plugin_manager::PluginManagerCommand, - thunder_client::{ThunderClient, ThunderMessage}, +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, }; - +use url::Url; +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug)] pub struct ThunderClientPool { clients: Vec, } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug)] struct PooledThunderClient { in_use: Arc, client: ThunderClient, } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[derive(Debug)] pub enum ThunderPoolCommand { ThunderMessage(ThunderMessage), ResetThunderClient(Uuid), } - +#[cfg(not(feature = "thunderBroker_enabled"))] impl ThunderClientPool { pub async fn start( url: Url, @@ -179,7 +179,7 @@ impl ThunderClientPool { } } } - +#[cfg(not(feature = "thunderBroker_enabled"))] #[cfg(test)] mod tests { use super::*; diff --git a/device/thunder_ripple_sdk/src/client/thunder_plugins_status_mgr.rs b/device/thunder_ripple_sdk/src/client/thunder_plugins_status_mgr.rs new file mode 100644 index 000000000..8a2e2b486 --- /dev/null +++ b/device/thunder_ripple_sdk/src/client/thunder_plugins_status_mgr.rs @@ -0,0 +1,712 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, + }, +}; + +use ripple_sdk::{ + api::gateway::rpc_gateway_api::JsonRpcApiResponse, + chrono::{DateTime, Duration, Utc}, + framework::RippleResponse, + log::{error, info, warn}, + tokio::sync::mpsc::Sender, + utils::error::RippleError, +}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +use crate::utils::get_next_id; + +use super::thunder_async_client::{ThunderAsyncRequest, ThunderAsyncResponse}; + +#[derive(Clone, Debug)] +pub struct BrokerSender { + pub sender: Sender, +} + +impl BrokerSender { + // Method to send the request to the underlying broker for handling. + pub async fn send(&self, request: ThunderAsyncRequest) -> RippleResponse { + if let Err(e) = self.sender.send(request).await { + error!("Error sending to broker {:?}", e); + Err(RippleError::SendFailure) + } else { + Ok(()) + } + } +} + +/// BrokerCallback will be used by the communication broker to send the firebolt response +/// back to the gateway for client consumption +#[derive(Clone, Debug)] +pub struct BrokerCallback { + pub sender: Sender, +} + +impl BrokerCallback { + pub async fn send(&self, response: ThunderAsyncResponse) { + if let Err(_) = self.sender.send(response).await { + error!("error returning callback for request") + } + } +} + +// defautl timeout for plugin activation in seconds +const DEFAULT_PLUGIN_ACTIVATION_TIMEOUT: i64 = 8; + +// As per thunder 4_4 documentation, the statechange event is published under the method "client.events.1.statechange" +// But it didn't work, most probably a documentation issue. +// const STATE_CHANGE_EVENT_METHOD: &str = "client.events.1.statechange"; + +const STATE_CHANGE_EVENT_METHOD: &str = "thunder.Broker.Controller.events.statechange"; + +#[derive(Debug, Deserialize, PartialEq, Serialize, Clone)] +pub struct Status { + pub callsign: String, + pub state: String, +} + +#[derive(Debug, Deserialize)] +pub struct ThunderError { + pub code: i32, + pub message: String, +} + +impl ThunderError { + pub fn get_state(&self) -> State { + match self.message.as_str() { + "ERROR_INPROGRESS" | "ERROR_PENDING_CONDITIONS" => State::InProgress, + "ERROR_UNKNOWN_KEY" => State::Missing, + _ => State::Unknown, + } + } +} + +impl Status { + pub fn to_state(&self) -> State { + match self.state.as_str() { + "activated" | "resumed" | "suspended" => State::Activated, + "deactivated" => State::Deactivated, + "deactivation" => State::Deactivation, + "activation" | "precondition" => State::Activation, + _ => State::Unavailable, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct StateChangeEvent { + pub callsign: String, + pub state: State, +} + +#[derive(Debug, Deserialize, PartialEq, Serialize, Clone)] +pub enum State { + Activated, + Activation, + Deactivated, + Deactivation, + Unavailable, + Precondition, + Suspended, + Resumed, + Missing, + Error, + InProgress, + Unknown, +} + +impl State { + pub fn is_activated(&self) -> bool { + matches!(self, State::Activated) + } + pub fn is_activating(&self) -> bool { + matches!(self, State::Activation) + } + pub fn is_missing(&self) -> bool { + matches!(self, State::Missing) + } + pub fn is_unavailable(&self) -> bool { + matches!(self, State::Unavailable | State::Unknown | State::Missing) + } +} + +#[derive(Debug, Clone)] +pub struct ThunderPluginState { + pub state: State, + pub activation_timestamp: DateTime, + pub pending_requests: Vec, +} +#[derive(Debug, Clone)] +pub struct StatusManager { + pub status: Arc>>, + pub inprogress_plugins_request: Arc>>, +} + +impl Default for StatusManager { + fn default() -> Self { + Self::new() + } +} + +impl StatusManager { + pub fn new() -> Self { + Self { + status: Arc::new(RwLock::new(HashMap::new())), + inprogress_plugins_request: Arc::new(RwLock::new(HashMap::new())), + } + } + + fn get_controller_call_sign() -> String { + "Controller.1.".to_string() + } + + pub fn update_status(&self, plugin_name: String, state: State) { + info!( + "Updating the status of the plugin: {:?} to state: {:?}", + plugin_name, state + ); + let mut status = self.status.write().unwrap(); + // get the current plugin state from hashmap and update the State + if let Some(plugin_state) = status.get_mut(&plugin_name) { + plugin_state.state = state; + } else { + // if the plugin is not present in the hashmap, add it + status.insert( + plugin_name, + ThunderPluginState { + state, + activation_timestamp: Utc::now(), + pending_requests: Vec::new(), + }, + ); + } + } + + pub fn add_broker_request_to_pending_list( + &self, + plugin_name: String, + request: ThunderAsyncRequest, + ) { + let mut status = self.status.write().unwrap(); + if let Some(plugin_state) = status.get_mut(&plugin_name) { + plugin_state.pending_requests.push(request); + } else { + status.insert( + plugin_name.clone(), + ThunderPluginState { + state: State::Unknown, + activation_timestamp: Utc::now(), + pending_requests: vec![request], + }, + ); + } + // update the time stamp + if let Some(plugin_state) = status.get_mut(&plugin_name) { + plugin_state.activation_timestamp = Utc::now(); + } + } + + // clear all pending requests for the given plugin and return the list of requests to the caller + // Also return a flag to indicate if activation time has expired. + pub fn retrive_pending_broker_requests( + &self, + plugin_name: String, + ) -> (Vec, bool) { + let mut status = self.status.write().unwrap(); + if let Some(plugin_state) = status.get_mut(&plugin_name) { + let pending_requests = plugin_state.pending_requests.clone(); + plugin_state.pending_requests.clear(); + // check if the activation time has expired. + let now = Utc::now(); + if now - plugin_state.activation_timestamp + > Duration::seconds(DEFAULT_PLUGIN_ACTIVATION_TIMEOUT) + { + return (pending_requests, true); + } else { + return (pending_requests, false); + } + } + (Vec::new(), false) + } + + pub fn get_all_pending_broker_requests(&self, plugin_name: String) -> Vec { + let status = self.status.read().unwrap(); + if let Some(plugin_state) = status.get(&plugin_name) { + plugin_state.pending_requests.clone() + } else { + Vec::new() + } + } + + pub fn clear_all_pending_broker_requests(&self, plugin_name: String) { + let mut status = self.status.write().unwrap(); + if let Some(plugin_state) = status.get_mut(&plugin_name) { + plugin_state.pending_requests.clear(); + } + } + + pub fn get_status(&self, plugin_name: String) -> Option { + let status = self.status.read().unwrap(); + status.get(&plugin_name).cloned() + } + + pub fn generate_plugin_activation_request(&self, plugin_name: String) -> String { + let id = get_next_id(); + let controller_call_sign = Self::get_controller_call_sign(); + + let request = json!({ + "jsonrpc": "2.0", + "id": id, + "method": format!("{}activate", controller_call_sign), + "params": json!({ + "callsign": plugin_name, + }) + }) + .to_string(); + // Add this request to the inprogress_plugins_request + self.add_thunder_request_to_inprogress_list(id, request.clone()); + request + } + + pub fn generate_plugin_status_request(&self, plugin_name: String) -> String { + let id = get_next_id(); + let controller_call_sign = Self::get_controller_call_sign(); + + let request = json!({ + "jsonrpc": "2.0", + "id": id, + "method": format!("{}status@{}", controller_call_sign, plugin_name), + }) + .to_string(); + // Add this request to the inprogress_plugins_request + self.add_thunder_request_to_inprogress_list(id, request.clone()); + request + } + + pub fn generate_state_change_subscribe_request(&self) -> String { + let id = get_next_id(); + let controller_call_sign = Self::get_controller_call_sign(); + + let request = json!({ + "jsonrpc": "2.0", + "id": id, + "method": format!("{}register", controller_call_sign), + "params": json!({ + "event": "statechange", + "id": "thunder.Broker.Controller.events" + }) + }) + .to_string(); + // Add this request to the inprogress_plugins_request + self.add_thunder_request_to_inprogress_list(id, request.clone()); + request + } + + fn add_thunder_request_to_inprogress_list(&self, id: u64, request: String) { + let mut inprogress_plugins_request = self.inprogress_plugins_request.write().unwrap(); + inprogress_plugins_request.insert(id, request); + } + + pub async fn is_controller_response( + &self, + sender: BrokerSender, + callback: BrokerCallback, + result: &[u8], + ) -> bool { + let data = match serde_json::from_slice::(result) { + Ok(data) => data, + Err(_) => return false, + }; + + if let Some(method) = data.method { + info!("is_controller_response Method: {:?}", method); + if method == STATE_CHANGE_EVENT_METHOD { + // intercept the statechange event and update plugin status. + let params = match data.params { + Some(params) => params, + None => return false, + }; + + let event: StateChangeEvent = match serde_json::from_value(params) { + Ok(event) => event, + Err(_) => return false, + }; + + self.update_status(event.callsign.clone(), event.state.clone()); + + if event.state.is_activated() { + // get the pending ThunderAsyncRequest and process. + let (pending_requests, expired) = + self.retrive_pending_broker_requests(event.callsign); + if !pending_requests.is_empty() { + for pending_request in pending_requests { + if expired { + error!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } else { + let _ = sender.send(pending_request).await; + } + } + } + } + + return true; + } + } + + if let Some(id) = data.id { + let inprogress_plugins_request = self.inprogress_plugins_request.read().unwrap(); + return inprogress_plugins_request.contains_key(&id); + } + + false + } + async fn on_activate_response( + &self, + sender: BrokerSender, + callback: BrokerCallback, + data: &JsonRpcApiResponse, + request: &str, + ) { + let result = match &data.result { + Some(result) => result, + None => return, + }; + + let callsign = match request.split("callsign\":").last() { + Some(callsign) => callsign.trim_matches(|c| c == '"' || c == '}'), + None => return, + }; + + let (pending_requests, expired) = + self.retrive_pending_broker_requests(callsign.to_string()); + + if result.is_null() { + self.update_status(callsign.to_string(), State::Activated); + + for pending_request in pending_requests { + if expired { + error!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } else { + let _ = sender.send(pending_request).await; + } + } + } else if let Some(_e) = &data.error { + Self::on_thunder_error_response(self, callback, data, &callsign.to_string()).await; + } + } + + async fn on_status_response( + &self, + sender: BrokerSender, + callback: BrokerCallback, + data: &JsonRpcApiResponse, + request: &str, + ) { + let result = match &data.result { + Some(result) => result, + None => return, + }; + + let callsign = match request.split('@').last() { + Some(callsign) => callsign.trim_matches(|c| c == '"' || c == '}'), + None => return, + }; + + let status_res: Vec = match serde_json::from_value(result.clone()) { + Ok(status_res) => status_res, + Err(_) => { + Self::on_thunder_error_response(self, callback, data, &callsign.to_string()).await; + return; + } + }; + + for status in status_res { + if status.callsign != callsign { + // it's not required to enforce callsign matching. But it's good to log a warning. + // Already chekced the id in the request, so it's safe to ignore this. + warn!( + "Call Sign not matching callsign from response: {:?} callsign : {:?}", + status.callsign, callsign + ); + } + self.update_status(callsign.to_string(), status.to_state()); + + let (pending_requests, expired) = + self.retrive_pending_broker_requests(callsign.to_string()); + + for pending_request in pending_requests { + if expired { + error!("Expired request: {:?}", pending_request); + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } else { + let _ = sender.send(pending_request).await; + } + } + } + } + + async fn on_thunder_error_response( + &self, + callback: BrokerCallback, + data: &JsonRpcApiResponse, + plugin_name: &String, + ) { + let error = match &data.error { + Some(error) => error, + None => return, + }; + + error!( + "Error Received from Thunder on getting the status of the plugin: {:?}", + error + ); + + let thunder_error: ThunderError = match serde_json::from_value(error.clone()) { + Ok(error) => error, + Err(_) => return, + }; + + let state = thunder_error.get_state(); + self.update_status(plugin_name.to_string(), state.clone()); + + if state.is_unavailable() { + let (pending_requests, _) = + self.retrive_pending_broker_requests(plugin_name.to_string()); + + for pending_request in pending_requests { + callback + .send_error(pending_request, RippleError::ServiceError) + .await; + } + } + } + + pub fn get_from_inprogress_plugins_request_list(&self, id: u64) -> Option { + let inprogress_plugins_request = self.inprogress_plugins_request.read().unwrap(); + inprogress_plugins_request.get(&id).cloned() + } + + pub async fn handle_controller_response( + &self, + sender: BrokerSender, + callback: BrokerCallback, + result: &[u8], + ) { + let data = match serde_json::from_slice::(result) { + Ok(data) => data, + Err(_) => return, + }; + + let id = match data.id { + Some(id) => id, + None => return, + }; + + let request = match self.get_from_inprogress_plugins_request_list(id) { + Some(request) => request, + None => return, + }; + + if request.contains("Controller.1.activate") { + // handle activate response + self.on_activate_response(sender, callback, &data, &request) + .await; + } else if request.contains("Controller.1.status@") { + // handle status response + self.on_status_response(sender, callback, &data, &request) + .await; + } else if request.contains("Controller.1.register") { + // nothing to do here + info!("StatusManger Received response for register request"); + } + + let mut inprogress_plugins_request = self.inprogress_plugins_request.write().unwrap(); + inprogress_plugins_request.remove(&id); + } +} + +impl BrokerCallback { + /// Default method used for sending errors via the BrokerCallback + pub async fn send_error(&self, request: ThunderAsyncRequest, error: RippleError) { + // TODO + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ripple_sdk::tokio::{ + self, + sync::mpsc::{self, channel}, + }; + + #[test] + fn test_generate_state_change_subscribe_request() { + let status_manager = StatusManager::new(); + let request = status_manager.generate_state_change_subscribe_request(); + assert!(request.contains("register")); + assert!(request.contains("statechange")); + } + + #[tokio::test] + async fn test_on_activate_response() { + let status_manager = StatusManager::new(); + let (tx, _tr) = mpsc::channel(10); + let broker = BrokerSender { sender: tx }; + + let (tx_1, _tr_1) = channel(2); + let callback = BrokerCallback { sender: tx_1 }; + + let data = JsonRpcApiResponse { + id: Some(1), + jsonrpc: "2.0".to_string(), + result: Some(serde_json::json!(null)), + error: None, + method: None, + params: None, + }; + let request = r#"{"jsonrpc":"2.0","id":1,"method":"Controller.1.activate","params":{"callsign":"TestPlugin"}}"#; + status_manager + .on_activate_response(broker, callback, &data, request) + .await; + let status = status_manager.get_status("TestPlugin".to_string()); + assert_eq!(status.unwrap().state, State::Activated); + } + + #[tokio::test] + async fn test_on_status_response() { + let status_manager = StatusManager::new(); + let (tx, _tr) = mpsc::channel(10); + let broker = BrokerSender { sender: tx }; + + let (tx_1, _tr_1) = channel(2); + let callback = BrokerCallback { sender: tx_1 }; + + let data = JsonRpcApiResponse { + id: Some(1), + jsonrpc: "2.0".to_string(), + result: Some(serde_json::json!([{"callsign":"TestPlugin","state":"activated"}])), + error: None, + method: None, + params: None, + }; + let request = r#"{"jsonrpc":"2.0","id":1,"method":"Controller.1.status@TestPlugin"}"#; + status_manager + .on_status_response(broker, callback, &data, request) + .await; + let status = status_manager.get_status("TestPlugin".to_string()); + assert_eq!(status.unwrap().state, State::Activated); + } + + #[tokio::test] + async fn test_on_thunder_error_response() { + let status_manager = StatusManager::new(); + + let (tx_1, _tr_1) = channel(2); + let callback = BrokerCallback { sender: tx_1 }; + + let data = JsonRpcApiResponse { + id: Some(1), + jsonrpc: "2.0".to_string(), + result: None, + error: Some(serde_json::json!({"code":1,"message":"ERROR_UNKNOWN_KEY"})), + method: None, + params: None, + }; + let plugin_name = "TestPlugin".to_string(); + status_manager + .on_thunder_error_response(callback, &data, &plugin_name) + .await; + let status = status_manager.get_status("TestPlugin".to_string()); + assert_eq!(status.unwrap().state, State::Missing); + } + + // Uncomment and use the following unit test only for local testing. Not use as part of the CI/CD pipeline. + /* + use ripple_sdk::{ + api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest}, + }; + use crate::broker::rules_engine::{Rule, RuleTransform}; + #[tokio::test] + async fn test_expired_broker_request() { + let status_manager = StatusManager::new(); + let (tx, _tr) = mpsc::channel(10); + let broker = BrokerSender { sender: tx }; + let (tx_1, _tr_1) = channel(2); + let callback = BrokerCallback { sender: tx_1 }; + let data = JsonRpcApiResponse { + id: Some(1), + jsonrpc: "2.0".to_string(), + result: Some(serde_json::json!(null)), + error: None, + method: None, + params: None, + }; + let request = r#"{"jsonrpc":"2.0","id":1,"method":"Controller.1.activate","params":{"callsign":"TestPlugin"}}"#; + status_manager + .on_activate_response(broker, callback, &data, request) + .await; + let status = status_manager.get_status("TestPlugin".to_string()); + assert_eq!(status.unwrap().state, State::Activated); + let ctx = CallContext::new( + "session_id".to_string(), + "request_id".to_string(), + "app_id".to_string(), + 1, + ApiProtocol::Bridge, + "method".to_string(), + Some("cid".to_string()), + true, + ); + // Add a request to the pending list + let request = DeviceChannelRequest { + rpc: RpcRequest { + ctx, + params_json: "".to_string(), + method: "TestPlugin".to_string(), + }, + rule: Rule { + alias: "TestPlugin".to_string(), + transform: RuleTransform::default(), + endpoint: None, + }, + subscription_processed: None, + }; + status_manager.add_broker_request_to_pending_list("TestPlugin".to_string(), request); + // Sleep for 10 seconds to expire the request + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + // Check if the request is expired + let (pending_requests, expired) = + status_manager.retrive_pending_broker_requests("TestPlugin".to_string()); + assert_eq!(expired, true); + assert_eq!(pending_requests.len(), 1); + } + */ +} diff --git a/device/thunder_ripple_sdk/src/lib.rs b/device/thunder_ripple_sdk/src/lib.rs index d1be27775..46694778c 100644 --- a/device/thunder_ripple_sdk/src/lib.rs +++ b/device/thunder_ripple_sdk/src/lib.rs @@ -18,9 +18,12 @@ pub mod client { pub mod jsonrpc_method_locator; pub mod plugin_manager; + pub mod thunder_async_client; pub mod thunder_client; + pub mod thunder_client2; pub mod thunder_client_pool; pub mod thunder_plugin; + pub mod thunder_plugins_status_mgr; } pub mod bootstrap { diff --git a/device/thunder_ripple_sdk/src/thunder_state.rs b/device/thunder_ripple_sdk/src/thunder_state.rs index 700b01d64..59fa57381 100644 --- a/device/thunder_ripple_sdk/src/thunder_state.rs +++ b/device/thunder_ripple_sdk/src/thunder_state.rs @@ -31,7 +31,7 @@ use ripple_sdk::{ utils::error::RippleError, }; use url::Url; - +#[cfg(not(feature = "thunderBroker_enabled"))] use crate::{ client::{plugin_manager::ThunderPluginBootParam, thunder_client::ThunderClient}, events::thunder_event_processor::{ThunderEventHandler, ThunderEventProcessor}, diff --git a/device/thunder_ripple_sdk/src/utils.rs b/device/thunder_ripple_sdk/src/utils.rs index a9d38a7ef..e5de2aef4 100644 --- a/device/thunder_ripple_sdk/src/utils.rs +++ b/device/thunder_ripple_sdk/src/utils.rs @@ -15,7 +15,10 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::atomic::{AtomicU64, Ordering}, +}; use jsonrpsee::core::Error; use ripple_sdk::{ @@ -109,3 +112,10 @@ pub fn get_error_value(error: &Error) -> Value { } Value::Null } + +static ATOMIC_ID: AtomicU64 = AtomicU64::new(0); + +pub fn get_next_id() -> u64 { + ATOMIC_ID.fetch_add(1, Ordering::Relaxed); + ATOMIC_ID.load(Ordering::Relaxed) +}