Skip to content

Commit

Permalink
feat: RPPL-2681 thunderBroker migration workaround changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nnaveen979 committed Nov 4, 2024
1 parent ce96f61 commit 12bd330
Show file tree
Hide file tree
Showing 16 changed files with 1,478 additions and 59 deletions.
2 changes: 1 addition & 1 deletion core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

use std::time::Duration;

use crate::utils::rpc_utils::extract_tcp_port;
use futures::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use ripple_sdk::{
log::{error, info},
tokio::{self, net::TcpStream},
utils::rpc_utils::extract_tcp_port,
};
use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream};

Expand Down
18 changes: 9 additions & 9 deletions core/main/src/utils/rpc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ pub fn get_base_method(method: &str) -> String {
method_vec.first().unwrap().to_string().to_lowercase()
}

pub fn extract_tcp_port(url: &str) -> String {
let url_split: Vec<&str> = url.split("://").collect();
if let Some(domain) = url_split.get(1) {
let domain_split: Vec<&str> = domain.split('/').collect();
domain_split.first().unwrap().to_string()
} else {
url.to_owned()
}
}
// pub fn extract_tcp_port(url: &str) -> String {
// let url_split: Vec<&str> = url.split("://").collect();
// if let Some(domain) = url_split.get(1) {
// let domain_split: Vec<&str> = domain.split('/').collect();
// domain_split.first().unwrap().to_string()
// } else {
// url.to_owned()
// }
// }
27 changes: 27 additions & 0 deletions core/sdk/src/api/device/device_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ pub enum DeviceChannelRequest {
Unsubscribe(DeviceUnsubscribeRequest),
}

impl DeviceChannelRequest {
pub fn get_callsign_method(&self) -> (String, String) {
match self {
DeviceChannelRequest::Call(c) => {
let mut collection: Vec<&str> = c.method.split('.').collect();
let method = collection.pop().unwrap_or_default();
let callsign = collection.join(".");
(callsign, method.into())
}
DeviceChannelRequest::Subscribe(s) => (s.module.clone(), s.event_name.clone()),
DeviceChannelRequest::Unsubscribe(u) => (u.module.clone(), u.event_name.clone()),
}
}

pub fn is_subscription(&self) -> bool {
!matches!(self, DeviceChannelRequest::Call(_))
}

pub fn is_unsubscribe(&self) -> Option<DeviceUnsubscribeRequest> {
if let DeviceChannelRequest::Unsubscribe(u) = self {
Some(u.clone())
} else {
None
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceCallRequest {
pub method: String,
Expand Down
10 changes: 10 additions & 0 deletions core/sdk/src/utils/rpc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,13 @@ use jsonrpsee::core::Error;
pub fn rpc_err(msg: impl Into<String>) -> Error {
Error::Custom(msg.into())
}

pub fn extract_tcp_port(url: &str) -> String {
let url_split: Vec<&str> = url.split("://").collect();
if let Some(domain) = url_split.get(1) {
let domain_split: Vec<&str> = domain.split('/').collect();
domain_split.first().unwrap().to_string()
} else {
url.to_owned()
}
}
8 changes: 7 additions & 1 deletion device/thunder_ripple_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ contract_tests = [
"tree_magic_mini",
"rstest"
]
thunderBroker_enabled = []
default = ["thunderBroker_enabled"]

[dependencies]
base64.workspace = true
Expand All @@ -44,6 +46,9 @@ regex.workspace = true
jsonrpsee = { workspace = true, features = ["macros", "ws-client"] }
serde.workspace = true
url.workspace = true
serde_json.workspace = true
futures-channel.workspace = true
futures.workspace = true

strum = { version = "0.24", default-features = false }
strum_macros = "0.24"
Expand All @@ -56,8 +61,9 @@ csv = "1.1" # Allowing minor updates
home = { version = "=0.5.5", optional = true }
tree_magic_mini = { version = "=3.0.3", optional = true }
rstest = { version = "0.18.2", optional = true, default-features = false }
tokio-tungstenite = { workspace = true, features = ["handshake"] }
futures-util = { version = "0.3.28", features = ["sink", "std"], default-features = false}

[dev-dependencies]
tokio-tungstenite = { workspace = true, features = ["native-tls"] }
ripple_sdk = { path = "../../core/sdk", features = ["tdk"] }

36 changes: 28 additions & 8 deletions device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use crate::{
bootstrap::setup_thunder_processors::SetupThunderProcessor,
client::plugin_manager::ThunderPluginBootParam, thunder_state::ThunderBootstrapStateWithClient,
};

use crate::client::thunder_client2;
use crate::thunder_state::ThunderState;
use ripple_sdk::{
extn::client::extn_client::ExtnClient,
log::{error, info},
Expand All @@ -31,15 +34,32 @@ pub async fn boot_thunder(
plugin_param: ThunderPluginBootParam,
) -> Option<ThunderBootstrapStateWithClient> {
info!("Booting thunder");
if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await {
if let Ok(state) = ThunderPoolStep::setup(state).await {
SetupThunderProcessor::setup(state.clone()).await;
return Some(state);

#[cfg(feature = "thunderBroker_enabled")]
{
info!("thunderBroker_enabled feature is enabled");
if let Ok(thndr_client) = thunder_client2::ThunderClientBuilder::get_client().await {
let thunder_state = ThunderState::new(state.clone(), thndr_client);
SetupThunderProcessor::setup(thunder_state, state.clone()).await;
} else {
error!("Unable to connect to Thunder, error in ThunderPoolStep");
error!("Unable to connect to Thunder_Broker, error in ThunderClientBuilder");
}
} else {
error!("Unable to connect to Thunder, error in ThunderGetConfigStep");
None
}

#[cfg(not(feature = "thunderBroker_enabled"))]
{
info!("thunderBroker_enabled feature is not enabled, go for thunderclient");
if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await {
if let Ok(state) = ThunderPoolStep::setup(state).await {
SetupThunderProcessor::setup(state.clone()).await;
return Some(state);
} else {
error!("Unable to connect to Thunder, error in ThunderPoolStep");
}
} else {
error!("Unable to connect to Thunder, error in ThunderGetConfigStep");
};
None
}
None
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::processors::thunder_package_manager::ThunderPackageManagerRequestProc
use crate::processors::thunder_rfc::ThunderRFCProcessor;
use crate::processors::thunder_telemetry::ThunderTelemetryProcessor;
use crate::thunder_state::ThunderBootstrapStateWithClient;
use crate::thunder_state::ThunderState;
use ripple_sdk::extn::client::extn_client::ExtnClient;

use crate::processors::{
thunder_browser::ThunderBrowserRequestProcessor,
Expand All @@ -40,7 +42,7 @@ impl SetupThunderProcessor {
pub fn get_name() -> String {
"SetupThunderProcessor".into()
}

#[cfg(not(feature = "thunderBroker_enabled"))]
pub async fn setup(state: ThunderBootstrapStateWithClient) {
let mut extn_client = state.state.get_client();
extn_client
Expand Down Expand Up @@ -73,4 +75,41 @@ impl SetupThunderProcessor {
extn_client.add_request_processor(ThunderRFCProcessor::new(state.clone().state));
let _ = extn_client.event(ExtnStatus::Ready);
}

#[cfg(feature = "thunderBroker_enabled")]
pub async fn setup(thunder_state: ThunderState, thndrextn_client: ExtnClient) {
let mut extn_client = thndrextn_client.clone();
extn_client.add_request_processor(ThunderDeviceInfoRequestProcessor::new(
thunder_state.clone(),
));
extn_client
.add_request_processor(ThunderBrowserRequestProcessor::new(thunder_state.clone()));
extn_client.add_request_processor(ThunderWifiRequestProcessor::new(thunder_state.clone()));
extn_client
.add_request_processor(ThunderStorageRequestProcessor::new(thunder_state.clone()));
extn_client.add_request_processor(ThunderWindowManagerRequestProcessor::new(
thunder_state.clone(),
));
extn_client.add_request_processor(ThunderRemoteAccessoryRequestProcessor::new(
thunder_state.clone(),
));
extn_client.add_request_processor(ThunderOpenEventsProcessor::new(thunder_state.clone()));

let package_manager_processor =
ThunderPackageManagerRequestProcessor::new(thunder_state.clone());
extn_client.add_request_processor(package_manager_processor);

if extn_client.get_bool_config("rdk_telemetry") {
match extn_client
.request(OperationalMetricRequest::Subscribe)
.await
{
Ok(_) => extn_client
.add_event_processor(ThunderTelemetryProcessor::new(thunder_state.clone())),
Err(_) => error!("Telemetry not setup"),
}
}
extn_client.add_request_processor(ThunderRFCProcessor::new(thunder_state.clone()));
let _ = extn_client.event(ExtnStatus::Ready);
}
}
1 change: 1 addition & 0 deletions device/thunder_ripple_sdk/src/client/plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use ripple_sdk::{
use serde::{Deserialize, Serialize};

use super::thunder_plugin::ThunderPlugin::Controller;
#[cfg(not(feature = "thunderBroker_enabled"))]
use super::{thunder_client::ThunderClient, thunder_plugin::ThunderPlugin};

pub struct ActivationSubscriber {
Expand Down
Loading

0 comments on commit 12bd330

Please sign in to comment.