Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JQ rule - event handling #667

Merged
merged 8 commits into from
Nov 5, 2024
Merged
51 changes: 48 additions & 3 deletions core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
// SPDX-License-Identifier: Apache-2.0
//

use std::time::Duration;

use crate::utils::rpc_utils::extract_tcp_port;
use crate::{state::platform_state::PlatformState, utils::rpc_utils::extract_tcp_port};
use futures::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use jsonrpsee::{core::RpcResult, types::error::CallError};
use ripple_sdk::{
api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest, RpcStats},
extn::extn_client_message::ExtnResponse,
log::{error, info},
tokio::{self, net::TcpStream},
uuid::Uuid,
};
use serde_json::Value;
use std::time::Duration;
use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream};

pub struct BrokerUtils;
Expand Down Expand Up @@ -66,4 +70,45 @@ impl BrokerUtils {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

pub async fn process_internal_main_request<'a>(
state: &'a PlatformState,
method: &'a str,
) -> RpcResult<Value> {
let ctx = CallContext::new(
Uuid::new_v4().to_string(),
Uuid::new_v4().to_string(),
"internal".into(),
1,
ApiProtocol::Extn,
method.to_string(),
None,
false,
);
let rpc_request = RpcRequest {
ctx: ctx.clone(),
method: method.to_string(),
params_json: RpcRequest::prepend_ctx(None, &ctx),
stats: RpcStats::default(),
};

let resp = state
.get_client()
.get_extn_client()
.main_internal_request(rpc_request.clone())
.await;

if let Ok(res) = resp {
if let Some(ExtnResponse::Value(val)) = res.payload.extract::<ExtnResponse>() {
return Ok(val);
}
}

// TODO: Update error handling
Err(jsonrpsee::core::Error::Call(CallError::Custom {
code: -32100,
message: format!("failed to get {}", method),
data: None,
}))
}
}
56 changes: 56 additions & 0 deletions core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::{
};

use crate::{
broker::broker_utils::BrokerUtils,
firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError},
service::extn::ripple_client::RippleClient,
state::platform_state::PlatformState,
Expand Down Expand Up @@ -601,6 +602,7 @@ impl BrokerOutputForwarder {

if let Some(id) = id {
if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) {
let trigger_event_handling = broker_request.rule.event_handler.is_some();
let sub_processed = broker_request.is_subscription_processed();
let rpc_request = broker_request.rpc.clone();
let session_id = rpc_request.ctx.get_id();
Expand All @@ -619,6 +621,24 @@ impl BrokerOutputForwarder {
if !apply_filter(&broker_request, &result, &rpc_request) {
continue;
}

// TODO: Refactor code in the future to apply rule-based filtering and transformations as required.
if trigger_event_handling {
if let Some(method) = broker_request.rule.event_handler.clone()
{
let platform_state_c = platform_state.clone();
let rpc_request_c = rpc_request.clone();
let response_c = response.clone();
tokio::spawn(Self::handle_event(
platform_state_c,
method,
rpc_request_c,
response_c,
));
continue;
}
}

// check if the request transform has event_decorator_method
if let Some(decorator_method) =
broker_request.rule.transform.event_decorator_method.clone()
Expand Down Expand Up @@ -699,6 +719,7 @@ impl BrokerOutputForwarder {
let request_id = rpc_request.ctx.call_id;
response.id = Some(request_id);
let tm_str = get_rpc_header(&rpc_request);

// Step 2: Create the message
let mut message = ApiMessage::new(
rpc_request.ctx.protocol.clone(),
Expand Down Expand Up @@ -759,6 +780,38 @@ impl BrokerOutputForwarder {
});
}

async fn handle_event(
platform_state: PlatformState,
method: String,
rpc_request: RpcRequest,
mut response: JsonRpcApiResponse,
) {
let session_id = rpc_request.ctx.get_id();
let request_id = rpc_request.ctx.call_id;
let protocol = rpc_request.ctx.protocol.clone();
let platform_state_c = &platform_state;

if let Ok(res) =
BrokerUtils::process_internal_main_request(platform_state_c, method.as_str()).await
{
response.result = Some(serde_json::to_value(res.clone()).unwrap());
}
response.id = Some(request_id);

let message = ApiMessage::new(
protocol,
serde_json::to_string(&response).unwrap(),
request_id.to_string(),
);

if let Some(session) = platform_state_c
.session_state
.get_session_for_connection_id(&session_id)
{
return_api_message_for_transport(session, message, platform_state.clone()).await;
}
}

pub fn handle_non_jsonrpc_response(
data: &[u8],
callback: BrokerCallback,
Expand Down Expand Up @@ -911,6 +964,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: None,
},
Expand Down Expand Up @@ -979,6 +1033,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
None,
);
Expand All @@ -990,6 +1045,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
None,
);
Expand Down
2 changes: 2 additions & 0 deletions core/main/src/broker/rules_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub struct Rule {
#[serde(skip_serializing_if = "Option::is_none")]
pub filter: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_handler: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,
}

Expand Down
3 changes: 3 additions & 0 deletions core/main/src/broker/thunder_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: None,
}
Expand Down Expand Up @@ -621,6 +622,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: Some(false),
};
Expand Down Expand Up @@ -673,6 +675,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: Some(true),
};
Expand Down
2 changes: 2 additions & 0 deletions core/main/src/broker/websocket_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: None,
};
Expand Down Expand Up @@ -289,6 +290,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
},
subscription_processed: None,
};
Expand Down
Loading