Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RPPL-2681 thunderBroker migration workaround changes #673

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

use std::time::Duration;

use crate::utils::rpc_utils::extract_tcp_port;
use futures::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use ripple_sdk::{
log::{error, info},
tokio::{self, net::TcpStream},
utils::rpc_utils::extract_tcp_port,
};
use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream};

Expand Down
18 changes: 9 additions & 9 deletions core/main/src/utils/rpc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ pub fn get_base_method(method: &str) -> String {
method_vec.first().unwrap().to_string().to_lowercase()
}

pub fn extract_tcp_port(url: &str) -> String {
let url_split: Vec<&str> = url.split("://").collect();
if let Some(domain) = url_split.get(1) {
let domain_split: Vec<&str> = domain.split('/').collect();
domain_split.first().unwrap().to_string()
} else {
url.to_owned()
}
}
// pub fn extract_tcp_port(url: &str) -> String {
// let url_split: Vec<&str> = url.split("://").collect();
// if let Some(domain) = url_split.get(1) {
// let domain_split: Vec<&str> = domain.split('/').collect();
// domain_split.first().unwrap().to_string()
// } else {
// url.to_owned()
// }
// }
27 changes: 27 additions & 0 deletions core/sdk/src/api/device/device_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ pub enum DeviceChannelRequest {
Unsubscribe(DeviceUnsubscribeRequest),
}

impl DeviceChannelRequest {
pub fn get_callsign_method(&self) -> (String, String) {
match self {
DeviceChannelRequest::Call(c) => {
let mut collection: Vec<&str> = c.method.split('.').collect();
let method = collection.pop().unwrap_or_default();
let callsign = collection.join(".");
(callsign, method.into())
}
DeviceChannelRequest::Subscribe(s) => (s.module.clone(), s.event_name.clone()),
DeviceChannelRequest::Unsubscribe(u) => (u.module.clone(), u.event_name.clone()),
}
}

pub fn is_subscription(&self) -> bool {
!matches!(self, DeviceChannelRequest::Call(_))
}

pub fn is_unsubscribe(&self) -> Option<DeviceUnsubscribeRequest> {
if let DeviceChannelRequest::Unsubscribe(u) = self {
Some(u.clone())
} else {
None
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceCallRequest {
pub method: String,
Expand Down
10 changes: 10 additions & 0 deletions core/sdk/src/utils/rpc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,13 @@ use jsonrpsee::core::Error;
pub fn rpc_err(msg: impl Into<String>) -> Error {
Error::Custom(msg.into())
}

pub fn extract_tcp_port(url: &str) -> String {
let url_split: Vec<&str> = url.split("://").collect();
if let Some(domain) = url_split.get(1) {
let domain_split: Vec<&str> = domain.split('/').collect();
domain_split.first().unwrap().to_string()
} else {
url.to_owned()
}
}
8 changes: 7 additions & 1 deletion device/thunder_ripple_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ contract_tests = [
"tree_magic_mini",
"rstest"
]
thunderBroker_enabled = []
default = ["thunderBroker_enabled"]

[dependencies]
base64.workspace = true
Expand All @@ -44,6 +46,9 @@ regex.workspace = true
jsonrpsee = { workspace = true, features = ["macros", "ws-client"] }
serde.workspace = true
url.workspace = true
serde_json.workspace = true
futures-channel.workspace = true
futures.workspace = true

strum = { version = "0.24", default-features = false }
strum_macros = "0.24"
Expand All @@ -56,8 +61,9 @@ csv = "1.1" # Allowing minor updates
home = { version = "=0.5.5", optional = true }
tree_magic_mini = { version = "=3.0.3", optional = true }
rstest = { version = "0.18.2", optional = true, default-features = false }
tokio-tungstenite = { workspace = true, features = ["handshake"] }
futures-util = { version = "0.3.28", features = ["sink", "std"], default-features = false}

[dev-dependencies]
tokio-tungstenite = { workspace = true, features = ["native-tls"] }
ripple_sdk = { path = "../../core/sdk", features = ["tdk"] }

36 changes: 28 additions & 8 deletions device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use crate::{
bootstrap::setup_thunder_processors::SetupThunderProcessor,
client::plugin_manager::ThunderPluginBootParam, thunder_state::ThunderBootstrapStateWithClient,
};

use crate::client::thunder_client2;
use crate::thunder_state::ThunderState;
use ripple_sdk::{
extn::client::extn_client::ExtnClient,
log::{error, info},
Expand All @@ -31,15 +34,32 @@ pub async fn boot_thunder(
plugin_param: ThunderPluginBootParam,
) -> Option<ThunderBootstrapStateWithClient> {
info!("Booting thunder");
if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await {
if let Ok(state) = ThunderPoolStep::setup(state).await {
SetupThunderProcessor::setup(state.clone()).await;
return Some(state);

#[cfg(feature = "thunderBroker_enabled")]
{
info!("thunderBroker_enabled feature is enabled");
if let Ok(thndr_client) = thunder_client2::ThunderClientBuilder::get_client().await {
let thunder_state = ThunderState::new(state.clone(), thndr_client);
SetupThunderProcessor::setup(thunder_state, state.clone()).await;
} else {
error!("Unable to connect to Thunder, error in ThunderPoolStep");
error!("Unable to connect to Thunder_Broker, error in ThunderClientBuilder");
}
} else {
error!("Unable to connect to Thunder, error in ThunderGetConfigStep");
None
}

#[cfg(not(feature = "thunderBroker_enabled"))]
{
info!("thunderBroker_enabled feature is not enabled, go for thunderclient");
if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await {
if let Ok(state) = ThunderPoolStep::setup(state).await {
SetupThunderProcessor::setup(state.clone()).await;
return Some(state);
} else {
error!("Unable to connect to Thunder, error in ThunderPoolStep");
}
} else {
error!("Unable to connect to Thunder, error in ThunderGetConfigStep");
};
None
}
None
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::processors::thunder_package_manager::ThunderPackageManagerRequestProc
use crate::processors::thunder_rfc::ThunderRFCProcessor;
use crate::processors::thunder_telemetry::ThunderTelemetryProcessor;
use crate::thunder_state::ThunderBootstrapStateWithClient;
use crate::thunder_state::ThunderState;
use ripple_sdk::extn::client::extn_client::ExtnClient;

use crate::processors::{
thunder_browser::ThunderBrowserRequestProcessor,
Expand All @@ -40,7 +42,7 @@ impl SetupThunderProcessor {
pub fn get_name() -> String {
"SetupThunderProcessor".into()
}

#[cfg(not(feature = "thunderBroker_enabled"))]
pub async fn setup(state: ThunderBootstrapStateWithClient) {
let mut extn_client = state.state.get_client();
extn_client
Expand Down Expand Up @@ -73,4 +75,41 @@ impl SetupThunderProcessor {
extn_client.add_request_processor(ThunderRFCProcessor::new(state.clone().state));
let _ = extn_client.event(ExtnStatus::Ready);
}

#[cfg(feature = "thunderBroker_enabled")]
pub async fn setup(thunder_state: ThunderState, thndrextn_client: ExtnClient) {
let mut extn_client = thndrextn_client.clone();
extn_client.add_request_processor(ThunderDeviceInfoRequestProcessor::new(
thunder_state.clone(),
));
extn_client
.add_request_processor(ThunderBrowserRequestProcessor::new(thunder_state.clone()));
extn_client.add_request_processor(ThunderWifiRequestProcessor::new(thunder_state.clone()));
extn_client
.add_request_processor(ThunderStorageRequestProcessor::new(thunder_state.clone()));
extn_client.add_request_processor(ThunderWindowManagerRequestProcessor::new(
thunder_state.clone(),
));
extn_client.add_request_processor(ThunderRemoteAccessoryRequestProcessor::new(
thunder_state.clone(),
));
extn_client.add_request_processor(ThunderOpenEventsProcessor::new(thunder_state.clone()));

let package_manager_processor =
ThunderPackageManagerRequestProcessor::new(thunder_state.clone());
extn_client.add_request_processor(package_manager_processor);

if extn_client.get_bool_config("rdk_telemetry") {
match extn_client
.request(OperationalMetricRequest::Subscribe)
.await
{
Ok(_) => extn_client
.add_event_processor(ThunderTelemetryProcessor::new(thunder_state.clone())),
Err(_) => error!("Telemetry not setup"),
}
}
extn_client.add_request_processor(ThunderRFCProcessor::new(thunder_state.clone()));
let _ = extn_client.event(ExtnStatus::Ready);
}
}
1 change: 1 addition & 0 deletions device/thunder_ripple_sdk/src/client/plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
use serde::{Deserialize, Serialize};

use super::thunder_plugin::ThunderPlugin::Controller;
#[cfg(not(feature = "thunderBroker_enabled"))]
use super::{thunder_client::ThunderClient, thunder_plugin::ThunderPlugin};

pub struct ActivationSubscriber {
Expand All @@ -41,7 +42,7 @@
}

pub struct PluginManager {
thunder_client: Box<ThunderClient>,

Check failure on line 45 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Generate Code Coverage

cannot find type `ThunderClient` in this scope

Check failure on line 45 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Unit Tests

cannot find type `ThunderClient` in this scope

Check failure on line 45 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Clippy

cannot find type `ThunderClient` in this scope
plugin_states: HashMap<String, PluginState>,
state_subscribers: Vec<ActivationSubscriber>,
//caching the plugin activation param so that we can reactivate the plugins on demand
Expand Down Expand Up @@ -178,7 +179,7 @@

impl PluginManager {
pub async fn start(
thunder_client: Box<ThunderClient>,

Check failure on line 182 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Generate Code Coverage

cannot find type `ThunderClient` in this scope

Check failure on line 182 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Unit Tests

cannot find type `ThunderClient` in this scope

Check failure on line 182 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Clippy

cannot find type `ThunderClient` in this scope
plugin_request: ThunderPluginBootParam,
) -> (mpsc::Sender<PluginManagerCommand>, Vec<String>) {
let (sub_tx, mut sub_rx) = mpsc::channel::<DeviceResponseMessage>(32);
Expand All @@ -198,7 +199,7 @@
}
}
ThunderPluginParam::Default => {
for p in ThunderPlugin::expect_activated_plugins() {

Check failure on line 202 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Generate Code Coverage

failed to resolve: use of undeclared type `ThunderPlugin`

Check failure on line 202 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Unit Tests

failed to resolve: use of undeclared type `ThunderPlugin`

Check failure on line 202 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Clippy

failed to resolve: use of undeclared type `ThunderPlugin`
pm.plugin_states
.insert(String::from(p.callsign()), PluginState::Activated);
}
Expand Down Expand Up @@ -289,7 +290,7 @@
let mut plugins = Vec::new();
match plugin_request.activate_on_boot {
ThunderPluginParam::Default => {
for p in ThunderPlugin::activate_on_boot_plugins() {

Check failure on line 293 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Generate Code Coverage

failed to resolve: use of undeclared type `ThunderPlugin`

Check failure on line 293 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Unit Tests

failed to resolve: use of undeclared type `ThunderPlugin`

Check failure on line 293 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Clippy

failed to resolve: use of undeclared type `ThunderPlugin`
plugins.push(p.callsign().to_string())
}
}
Expand Down Expand Up @@ -435,7 +436,7 @@
let mut plugins = Vec::new();
match self.plugin_request.activate_on_boot.clone() {
ThunderPluginParam::Default => {
for p in ThunderPlugin::activate_on_boot_plugins() {

Check failure on line 439 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Generate Code Coverage

failed to resolve: use of undeclared type `ThunderPlugin`

Check failure on line 439 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Unit Tests

failed to resolve: use of undeclared type `ThunderPlugin`

Check failure on line 439 in device/thunder_ripple_sdk/src/client/plugin_manager.rs

View workflow job for this annotation

GitHub Actions / Run Clippy

failed to resolve: use of undeclared type `ThunderPlugin`
plugins.push(p.callsign().to_string())
}
}
Expand Down
Loading
Loading