Skip to content

Commit

Permalink
feat: refactored code
Browse files Browse the repository at this point in the history
  • Loading branch information
bsenth200 committed Oct 30, 2024
1 parent c713666 commit a90b0a2
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 42 deletions.
7 changes: 3 additions & 4 deletions core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{state::platform_state::PlatformState, utils::rpc_utils::extract_tcp_
use futures::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use jsonrpsee::{core::RpcResult, types::error::CallError};
use ripple_sdk::api::firebolt::fb_capabilities::CAPABILITY_NOT_AVAILABLE;
use ripple_sdk::{
api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats},
extn::extn_client_message::ExtnResponse,
Expand Down Expand Up @@ -72,7 +71,7 @@ impl BrokerUtils {
}
}

pub async fn handle_main_internal_request<'a>(
pub async fn process_internal_main_request<'a>(
state: &'a PlatformState,
method: &'a str,
) -> RpcResult<Value> {
Expand Down Expand Up @@ -107,8 +106,8 @@ impl BrokerUtils {

// TODO: Update error handling
Err(jsonrpsee::core::Error::Call(CallError::Custom {
code: CAPABILITY_NOT_AVAILABLE,
message: format!("{} is not available", method),
code: -32100,
message: format!("failed to get {}", method),
data: None,
}))
}
Expand Down
82 changes: 44 additions & 38 deletions core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ impl BrokerOutputForwarder {

if let Some(id) = id {
if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) {
let requires_event_handling = broker_request.rule.event_handler.is_some();
let trigger_event_handling = broker_request.rule.event_handler.is_some();
let sub_processed = broker_request.is_subscription_processed();
let rpc_request = broker_request.rpc.clone();
let session_id = rpc_request.ctx.get_id();
Expand All @@ -622,46 +622,19 @@ impl BrokerOutputForwarder {
continue;
}

// handle events with internal request if required
if requires_event_handling {
// TODO: Refactor code in the future to apply rule-based filtering and transformations as required.
if trigger_event_handling {
if let Some(method) = broker_request.rule.event_handler.clone()
{
let session_id = rpc_request.ctx.get_id();
let request_id = rpc_request.ctx.call_id;
let protocol = rpc_request.ctx.protocol.clone();
let platform_state_c = platform_state.clone();
tokio::spawn(async move {
if let Ok(res) =
BrokerUtils::handle_main_internal_request(
&platform_state_c,
method.as_str(),
)
.await
{
response.result = Some(
serde_json::to_value(res.clone()).unwrap(),
);
}
response.id = Some(request_id);

let message = ApiMessage::new(
protocol,
serde_json::to_string(&response).unwrap(),
request_id.to_string(),
);

if let Some(session) = platform_state_c
.session_state
.get_session_for_connection_id(&session_id)
{
return_api_message_for_transport(
session,
message,
platform_state_c,
)
.await
}
});
let rpc_request_c = rpc_request.clone();
let response_c = response.clone();
tokio::spawn(Self::handle_event(
platform_state_c,
method,
rpc_request_c,
response_c,
));
continue;
}
}
Expand Down Expand Up @@ -746,6 +719,7 @@ impl BrokerOutputForwarder {
let request_id = rpc_request.ctx.call_id;
response.id = Some(request_id);
let tm_str = get_rpc_header(&rpc_request);

// Step 2: Create the message
let mut message = ApiMessage::new(
rpc_request.ctx.protocol.clone(),
Expand Down Expand Up @@ -806,6 +780,38 @@ impl BrokerOutputForwarder {
});
}

async fn handle_event(
platform_state: PlatformState,
method: String,
rpc_request: RpcRequest,
mut response: JsonRpcApiResponse,
) {
let session_id = rpc_request.ctx.get_id();
let request_id = rpc_request.ctx.call_id;
let protocol = rpc_request.ctx.protocol.clone();
let platform_state_c = &platform_state;

if let Ok(res) =
BrokerUtils::process_internal_main_request(platform_state_c, method.as_str()).await
{
response.result = Some(serde_json::to_value(res.clone()).unwrap());
}
response.id = Some(request_id);

let message = ApiMessage::new(
protocol,
serde_json::to_string(&response).unwrap(),
request_id.to_string(),
);

if let Some(session) = platform_state_c
.session_state
.get_session_for_connection_id(&session_id)
{
return_api_message_for_transport(session, message, platform_state.clone()).await;
}
}

pub fn handle_non_jsonrpc_response(
data: &[u8],
callback: BrokerCallback,
Expand Down

0 comments on commit a90b0a2

Please sign in to comment.