Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JQ rule - event handling #667

Merged
merged 8 commits into from
Nov 5, 2024
Merged
52 changes: 49 additions & 3 deletions core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
// SPDX-License-Identifier: Apache-2.0
//

use std::time::Duration;

use crate::utils::rpc_utils::extract_tcp_port;
use crate::{state::platform_state::PlatformState, utils::rpc_utils::extract_tcp_port};
use futures::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use jsonrpsee::{core::RpcResult, types::error::CallError};
use ripple_sdk::api::firebolt::fb_capabilities::CAPABILITY_NOT_AVAILABLE;
use ripple_sdk::{
api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats},
extn::extn_client_message::ExtnResponse,
log::{error, info},
tokio::{self, net::TcpStream},
uuid::Uuid,
};
use serde_json::Value;
use std::time::Duration;
use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream};

pub struct BrokerUtils;
Expand Down Expand Up @@ -66,4 +71,45 @@ impl BrokerUtils {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

pub async fn handle_main_internal_request<'a>(
state: &'a PlatformState,
method: &'a str,
) -> RpcResult<Value> {
let ctx = CallContext::new(
Uuid::new_v4().to_string(),
Uuid::new_v4().to_string(),
"internal".into(),
1,
ApiProtocol::Extn,
method.to_string(),
None,
false,
);
let rpc_request = RpcRequest {
ctx: ctx.clone(),
method: method.to_string(),
params_json: RpcRequest::prepend_ctx(None, &ctx),
stats: RpcStats::default(),
};

let resp = state
.get_client()
.get_extn_client()
.main_internal_request(rpc_request.clone())
.await;

if let Ok(res) = resp {
if let Some(ExtnResponse::Value(val)) = res.payload.extract::<ExtnResponse>() {
return Ok(val);
}
}

// TODO: Update error handling
Err(jsonrpsee::core::Error::Call(CallError::Custom {
code: CAPABILITY_NOT_AVAILABLE,
message: format!("{} is not available", method),
data: None,
}))
}
}
50 changes: 50 additions & 0 deletions core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::{
};

use crate::{
broker::broker_utils::BrokerUtils,
firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError},
service::extn::ripple_client::RippleClient,
state::platform_state::PlatformState,
Expand Down Expand Up @@ -601,6 +602,7 @@ impl BrokerOutputForwarder {

if let Some(id) = id {
if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) {
let requires_event_handling = broker_request.rule.event_handler.is_some();
let sub_processed = broker_request.is_subscription_processed();
let rpc_request = broker_request.rpc.clone();
let session_id = rpc_request.ctx.get_id();
Expand All @@ -619,6 +621,51 @@ impl BrokerOutputForwarder {
if !apply_filter(&broker_request, &result, &rpc_request) {
continue;
}

// handle events with internal request if required
if requires_event_handling {
if let Some(method) = broker_request.rule.event_handler.clone()
{
let session_id = rpc_request.ctx.get_id();
let request_id = rpc_request.ctx.call_id;
let protocol = rpc_request.ctx.protocol.clone();
let platform_state_c = platform_state.clone();
tokio::spawn(async move {
if let Ok(res) =
BrokerUtils::handle_main_internal_request(
&platform_state_c,
method.as_str(),
)
.await
{
response.result = Some(
serde_json::to_value(res.clone()).unwrap(),
);
}
response.id = Some(request_id);

let message = ApiMessage::new(
protocol,
serde_json::to_string(&response).unwrap(),
bsenth200 marked this conversation as resolved.
Show resolved Hide resolved
request_id.to_string(),
);

if let Some(session) = platform_state_c
.session_state
.get_session_for_connection_id(&session_id)
{
return_api_message_for_transport(
session,
message,
platform_state_c,
)
.await
}
});
continue;
}
}

// check if the request transform has event_decorator_method
if let Some(decorator_method) =
broker_request.rule.transform.event_decorator_method.clone()
Expand Down Expand Up @@ -911,6 +958,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: None,
},
Expand Down Expand Up @@ -979,6 +1027,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
None,
);
Expand All @@ -990,6 +1039,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
None,
);
Expand Down
2 changes: 2 additions & 0 deletions core/main/src/broker/rules_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub struct Rule {
#[serde(skip_serializing_if = "Option::is_none")]
pub filter: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_handler: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,
}

Expand Down
3 changes: 3 additions & 0 deletions core/main/src/broker/thunder_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: None,
}
Expand Down Expand Up @@ -621,6 +622,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: Some(false),
};
Expand Down Expand Up @@ -673,6 +675,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: Some(true),
};
Expand Down
2 changes: 2 additions & 0 deletions core/main/src/broker/websocket_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: None,
};
Expand Down Expand Up @@ -289,6 +290,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: None,
};
Expand Down
Loading