diff --git a/core/main/src/bootstrap/start_communication_broker.rs b/core/main/src/bootstrap/start_communication_broker.rs index 01d2a1524..49a0b8101 100644 --- a/core/main/src/bootstrap/start_communication_broker.rs +++ b/core/main/src/bootstrap/start_communication_broker.rs @@ -65,10 +65,9 @@ impl Bootstep for StartOtherBrokers { if let Ok(rx) = state.channels_state.get_broker_receiver() { BrokerOutputForwarder::start_forwarder(ps.clone(), rx) } - let session = ps.session_state.get_account_session(); // Setup the endpoints from the manifests let mut endpoint_state = ps.endpoint_state; - endpoint_state.build_other_endpoints(session); + endpoint_state.build_other_endpoints(ps.session_state.get_account_session()); Ok(()) } } diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 9598a6fd1..17808fa6f 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -19,13 +19,14 @@ use ripple_sdk::{ api::{ firebolt::fb_capabilities::JSON_RPC_STANDARD_ERROR_INVALID_PARAMS, gateway::rpc_gateway_api::{ - ApiMessage, ApiProtocol, ApiStats, CallContext, JsonRpcApiResponse, RpcRequest, + ApiMessage, ApiProtocol, ApiStats, CallContext, JsonRpcApiRequest, JsonRpcApiResponse, + RpcRequest, }, session::AccountSession, }, extn::extn_client_message::{ExtnEvent, ExtnMessage}, framework::RippleResponse, - log::{error, trace}, + log::{debug, error, trace}, tokio::{ self, sync::mpsc::{self, Receiver, Sender}, @@ -58,6 +59,7 @@ use super::{ rules_engine::{jq_compile, Rule, RuleEndpoint, RuleEndpointProtocol, RuleEngine}, thunder_broker::ThunderBroker, websocket_broker::WebsocketBroker, + workflow_broker::WorkflowBroker, }; #[derive(Clone, Debug)] @@ -65,7 +67,7 @@ pub struct BrokerSender { pub sender: Sender, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct BrokerCleaner { pub cleaner: Option>, } @@ -80,11 +82,12 @@ impl BrokerCleaner { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct BrokerRequest { pub rpc: RpcRequest, pub rule: Rule, pub subscription_processed: Option, + pub workflow_callback: Option, } pub type BrokerSubMap = HashMap>; @@ -97,6 +100,39 @@ pub struct BrokerConnectRequest { pub session: Option, pub reconnector: Sender, } +impl Default for BrokerConnectRequest { + fn default() -> Self { + Self { + key: "".to_owned(), + endpoint: RuleEndpoint::default(), + sub_map: HashMap::new(), + session: None, + reconnector: mpsc::channel(2).0, + } + } +} +impl From for JsonRpcApiRequest { + fn from(value: BrokerRequest) -> Self { + Self { + jsonrpc: "2.0".to_owned(), + id: Some(value.rpc.ctx.call_id), + method: value.rpc.ctx.method, + params: serde_json::from_str(&value.rpc.params_json).unwrap_or(None), + } + } +} +impl From for JsonRpcApiResponse { + fn from(value: BrokerRequest) -> Self { + Self { + jsonrpc: "2.0".to_owned(), + id: Some(value.rpc.ctx.call_id), + result: None, + error: None, + method: None, + params: None, + } + } +} impl BrokerConnectRequest { pub fn new( @@ -136,11 +172,16 @@ impl BrokerRequest { } impl BrokerRequest { - pub fn new(rpc_request: &RpcRequest, rule: Rule) -> BrokerRequest { + pub fn new( + rpc_request: &RpcRequest, + rule: Rule, + workflow_callback: Option, + ) -> BrokerRequest { BrokerRequest { rpc: rpc_request.clone(), rule, subscription_processed: None, + workflow_callback, } } @@ -155,6 +196,13 @@ impl BrokerRequest { pub struct BrokerCallback { pub sender: Sender, } +impl Default for BrokerCallback { + fn default() -> Self { + Self { + sender: mpsc::channel(2).0, + } + } +} static ATOMIC_ID: AtomicU64 = AtomicU64::new(0); @@ -187,7 +235,7 @@ pub struct BrokerContext { pub app_id: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct BrokerOutput { pub data: JsonRpcApiResponse, } @@ -208,6 +256,26 @@ impl BrokerOutput { } None } + pub fn is_error(&self) -> bool { + self.data.error.is_some() + } + pub fn is_success(&self) -> bool { + self.data.result.is_some() + } + pub fn get_result(&self) -> Option { + self.data.result.clone() + } + pub fn get_error(&self) -> Option { + self.data.error.clone() + } + pub fn get_error_string(&self) -> String { + if let Some(e) = self.data.error.clone() { + if let Ok(v) = serde_json::to_string(&e) { + return v; + } + } + "unknown".to_string() + } } impl From for BrokerContext { @@ -240,6 +308,19 @@ pub struct EndpointBrokerState { cleaner_list: Arc>>, reconnect_tx: Sender, } +impl Default for EndpointBrokerState { + fn default() -> Self { + Self { + endpoint_map: Arc::new(RwLock::new(HashMap::new())), + callback: BrokerCallback::default(), + request_map: Arc::new(RwLock::new(HashMap::new())), + extension_request_map: Arc::new(RwLock::new(HashMap::new())), + rule_engine: RuleEngine::default(), + cleaner_list: Arc::new(RwLock::new(Vec::new())), + reconnect_tx: mpsc::channel(2).0, + } + } +} impl EndpointBrokerState { pub fn new( @@ -260,6 +341,10 @@ impl EndpointBrokerState { state.reconnect_thread(rec_tr, ripple_client); state } + pub fn with_rules_engine(mut self, rule_engine: RuleEngine) -> Self { + self.rule_engine = rule_engine; + self + } fn reconnect_thread(&self, mut rx: Receiver, client: RippleClient) { let mut state = self.clone(); @@ -328,6 +413,7 @@ impl EndpointBrokerState { rpc_request: &RpcRequest, rule: Rule, extn_message: Option, + workflow_callback: Option, ) -> (u64, BrokerRequest) { let id = Self::get_next_id(); let mut rpc_request_c = rpc_request.clone(); @@ -339,6 +425,7 @@ impl EndpointBrokerState { rpc: rpc_request.clone(), rule: rule.clone(), subscription_processed: None, + workflow_callback: workflow_callback.clone(), }, ); } @@ -349,9 +436,11 @@ impl EndpointBrokerState { } rpc_request_c.ctx.call_id = id; - (id, BrokerRequest::new(&rpc_request_c, rule)) + ( + id, + BrokerRequest::new(&rpc_request_c, rule, workflow_callback), + ) } - pub fn build_thunder_endpoint(&mut self) { if let Some(endpoint) = self.rule_engine.rules.endpoints.get("thunder").cloned() { let request = BrokerConnectRequest::new( @@ -375,31 +464,50 @@ impl EndpointBrokerState { } } + fn add_endpoint(&mut self, key: String, endpoint: BrokerSender) { + let mut endpoint_map = self.endpoint_map.write().unwrap(); + endpoint_map.insert(key, endpoint); + } + pub fn get_endpoints(&self) -> HashMap { + self.endpoint_map.read().unwrap().clone() + } + pub fn get_other_endpoints(&self, me: &str) -> HashMap { + let f = self.endpoint_map.read().unwrap().clone(); + let mut result = HashMap::new(); + for (k, v) in f.iter() { + if k.as_str() != me { + result.insert(k.clone(), v.clone()); + } + } + result + } + fn build_endpoint(&mut self, request: BrokerConnectRequest) { let endpoint = request.endpoint.clone(); let key = request.key.clone(); let (broker, cleaner) = match endpoint.protocol { RuleEndpointProtocol::Http => ( - HttpBroker::get_broker(request, self.callback.clone()).get_sender(), + HttpBroker::get_broker(request, self.callback.clone(), self).get_sender(), None, ), RuleEndpointProtocol::Websocket => { - let ws_broker = WebsocketBroker::get_broker(request, self.callback.clone()); + let ws_broker = WebsocketBroker::get_broker(request, self.callback.clone(), self); (ws_broker.get_sender(), Some(ws_broker.get_cleaner())) } RuleEndpointProtocol::Thunder => { - let thunder_broker = ThunderBroker::get_broker(request, self.callback.clone()); + let thunder_broker = + ThunderBroker::get_broker(request, self.callback.clone(), self); ( thunder_broker.get_sender(), Some(thunder_broker.get_cleaner()), ) } + RuleEndpointProtocol::Workflow => ( + WorkflowBroker::get_broker(request, self.callback.clone(), self).get_sender(), + None, + ), }; - - { - let mut endpoint_map = self.endpoint_map.write().unwrap(); - endpoint_map.insert(key, broker); - } + self.add_endpoint(key, broker); if let Some(cleaner) = cleaner { let mut cleaner_list = self.cleaner_list.write().unwrap(); @@ -413,8 +521,10 @@ impl EndpointBrokerState { extn_message: Option, rule: Rule, callback: BrokerCallback, + workflow_callback: Option, ) { - let (id, _updated_request) = self.update_request(&rpc_request, rule.clone(), extn_message); + let (id, _updated_request) = + self.update_request(&rpc_request, rule.clone(), extn_message, workflow_callback); let mut data = JsonRpcApiResponse::default(); // return empty result and handle the rest with jq rule let jv: Value = "".into(); @@ -434,32 +544,42 @@ impl EndpointBrokerState { &self, rpc_request: RpcRequest, extn_message: Option, + requestor_callback: Option, ) -> bool { let mut handled: bool = true; let callback = self.callback.clone(); let mut broker_sender = None; let mut found_rule = None; if let Some(rule) = self.rule_engine.get_rule(&rpc_request) { - let _ = found_rule.insert(rule.clone()); + found_rule = Some(rule.clone()); if let Some(endpoint) = rule.endpoint { if let Some(endpoint) = self.get_sender(&endpoint) { - let _ = broker_sender.insert(endpoint); + broker_sender = Some(endpoint); } } else if rule.alias != "static" { if let Some(endpoint) = self.get_sender("thunder") { - let _ = broker_sender.insert(endpoint); + broker_sender = Some(endpoint); } } } - + trace!("found rule {:?}", found_rule); if found_rule.is_some() { let rule = found_rule.unwrap(); if rule.alias == "static" { - self.handle_static_request(rpc_request, extn_message, rule, callback); + trace!("handling static request for {:?}", rpc_request); + self.handle_static_request( + rpc_request, + extn_message, + rule, + callback, + requestor_callback, + ); } else if broker_sender.is_some() { + trace!("handling not static request for {:?}", rpc_request); let broker = broker_sender.unwrap(); - let (_, updated_request) = self.update_request(&rpc_request, rule, extn_message); + let (_, updated_request) = + self.update_request(&rpc_request, rule, extn_message, requestor_callback); tokio::spawn(async move { if let Err(e) = broker.send(updated_request.clone()).await { callback.send_error(updated_request, e).await @@ -488,11 +608,17 @@ impl EndpointBrokerState { } // Get Broker Request from rpc_request - pub fn get_broker_request(&self, rpc_request: &RpcRequest, rule: Rule) -> BrokerRequest { + pub fn get_broker_request( + &self, + rpc_request: &RpcRequest, + rule: Rule, + workflow_callback: Option, + ) -> BrokerRequest { BrokerRequest { rpc: rpc_request.clone(), rule, subscription_processed: None, + workflow_callback, } } } @@ -500,7 +626,11 @@ impl EndpointBrokerState { /// Trait which contains all the abstract methods for a Endpoint Broker /// There could be Websocket or HTTP protocol implementations of the given trait pub trait EndpointBroker { - fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self; + fn get_broker( + request: BrokerConnectRequest, + callback: BrokerCallback, + endpoint_broker: &mut EndpointBrokerState, + ) -> Self; fn get_sender(&self) -> BrokerSender; @@ -574,6 +704,16 @@ pub trait EndpointBroker { } fn get_cleaner(&self) -> BrokerCleaner; + + fn send_broker_success_response( + callback: &BrokerCallback, + success_message: JsonRpcApiResponse, + ) { + BrokerOutputForwarder::send_json_rpc_response_to_broker(success_message, callback.clone()); + } + fn send_broker_failure_response(callback: &BrokerCallback, error_message: JsonRpcApiResponse) { + BrokerOutputForwarder::send_json_rpc_response_to_broker(error_message, callback.clone()); + } } /// Forwarder gets the BrokerOutput and forwards the response to the gateway. @@ -589,7 +729,7 @@ impl BrokerOutputForwarder { tokio::spawn(async move { while let Some(output) = rx.recv().await { let output_c = output.clone(); - let mut response = output.data; + let mut response = output.data.clone(); let mut is_event = false; // First validate the id check if it could be an event let id = if let Some(e) = output_c.get_event() { @@ -601,6 +741,7 @@ impl BrokerOutputForwarder { if let Some(id) = id { if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) { + let workflow_callback = broker_request.clone().workflow_callback; let sub_processed = broker_request.is_subscription_processed(); let rpc_request = broker_request.rpc.clone(); let session_id = rpc_request.ctx.get_id(); @@ -686,6 +827,7 @@ impl BrokerOutputForwarder { trace!("start_forwarder: no result {:?}", response); apply_response_needed = true; } + if apply_response_needed { if let Some(filter) = broker_request.rule.transform.get_transform_data( super::rules_engine::RuleTransformType::Response, @@ -698,52 +840,67 @@ impl BrokerOutputForwarder { let request_id = rpc_request.ctx.call_id; response.id = Some(request_id); - let tm_str = get_rpc_header(&rpc_request); - // Step 2: Create the message - let mut message = ApiMessage::new( - rpc_request.ctx.protocol.clone(), - serde_json::to_string(&response).unwrap(), - request_id.to_string(), - ); - let mut status_code: i64 = 1; - if let Some(e) = &response.error { - if let Some(Value::Number(n)) = e.get("code") { - if let Some(v) = n.as_i64() { - status_code = v; + + if let Some(workflow_callback) = workflow_callback { + debug!("sending to workflow callback {:?}", response); + let _ = workflow_callback + .sender + .send(BrokerOutput { + data: response.clone(), + }) + .await; + } else { + let tm_str = get_rpc_header(&rpc_request); + // Step 2: Create the message + let mut message = ApiMessage::new( + rpc_request.ctx.protocol.clone(), + serde_json::to_string(&response).unwrap(), + request_id.to_string(), + ); + let mut status_code: i64 = 1; + if let Some(e) = &response.error { + if let Some(Value::Number(n)) = e.get("code") { + if let Some(v) = n.as_i64() { + status_code = v; + } } } - } - message.stats = Some(ApiStats { - stats_ref: add_telemetry_status_code( - &tm_str, - status_code.to_string().as_str(), - ), - stats: rpc_request.stats, - }); - - // Step 3: Handle Non Extension - if matches!(rpc_request.ctx.protocol, ApiProtocol::Extn) { - if let Ok(extn_message) = - platform_state.endpoint_state.get_extn_message(id, is_event) - { - if is_event { - forward_extn_event(&extn_message, response, &platform_state) + message.stats = Some(ApiStats { + stats_ref: add_telemetry_status_code( + &tm_str, + status_code.to_string().as_str(), + ), + stats: rpc_request.stats, + }); + + // Step 3: Handle Non Extension + if matches!(rpc_request.ctx.protocol, ApiProtocol::Extn) { + if let Ok(extn_message) = + platform_state.endpoint_state.get_extn_message(id, is_event) + { + if is_event { + forward_extn_event( + &extn_message, + response, + &platform_state, + ) .await; - } else { - return_extn_response(message, extn_message) + } else { + return_extn_response(message, extn_message) + } } + } else if let Some(session) = platform_state + .session_state + .get_session_for_connection_id(&session_id) + { + return_api_message_for_transport( + session, + message, + platform_state.clone(), + ) + .await } - } else if let Some(session) = platform_state - .session_state - .get_session_for_connection_id(&session_id) - { - return_api_message_for_transport( - session, - message, - platform_state.clone(), - ) - .await } } else { error!( @@ -753,7 +910,10 @@ impl BrokerOutputForwarder { ); } } else { - error!("Error couldnt broker the event {:?}", output_c) + error!( + "Error couldnt broker the event {:?} due to a missing request id", + output_c + ) } } }); @@ -774,10 +934,12 @@ impl BrokerOutputForwarder { None }; let parse_result = serde_json::from_slice::(data); + error!("parse result {:?}", parse_result); if parse_result.is_err() { return Err(RippleError::ParseError); } let result = Some(parse_result.unwrap()); + error!("result {:?}", result); // build JsonRpcApiResponse let data = JsonRpcApiResponse { jsonrpc: "2.0".to_owned(), @@ -787,10 +949,35 @@ impl BrokerOutputForwarder { error: None, params: None, }; - let output = BrokerOutput { data }; - tokio::spawn(async move { callback.sender.send(output).await }); + BrokerOutputForwarder::send_json_rpc_response_to_broker(data, callback.clone()); Ok(()) } + pub fn send_json_rpc_response_to_broker( + json_rpc_api_response: JsonRpcApiResponse, + callback: BrokerCallback, + ) { + tokio::spawn(async move { + callback + .sender + .send(BrokerOutput { + data: json_rpc_api_response, + }) + .await + }); + } + pub fn send_json_rpc_success_response_to_broker( + json_rpc_api_success_response: JsonRpcApiResponse, + callback: BrokerCallback, + ) { + tokio::spawn(async move { + callback + .sender + .send(BrokerOutput { + data: json_rpc_api_success_response, + }) + .await + }); + } } async fn forward_extn_event( @@ -911,8 +1098,10 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + sources: None, }, subscription_processed: None, + workflow_callback: None, }, RippleError::InvalidInput, ) @@ -979,8 +1168,10 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + sources: None, }, None, + None, ); request.ctx.call_id = 2; state.update_request( @@ -990,8 +1181,10 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + sources: None, }, None, + None, ); // Hardcoding the id here will be a problem as multiple tests uses the atomic id and there is no guarantee diff --git a/core/main/src/broker/http_broker.rs b/core/main/src/broker/http_broker.rs index c5be0a6e0..b76001cd0 100644 --- a/core/main/src/broker/http_broker.rs +++ b/core/main/src/broker/http_broker.rs @@ -19,6 +19,7 @@ use std::vec; use hyper::{client::HttpConnector, Body, Client, Method, Request, Response, Uri}; use ripple_sdk::{ + api::gateway::rpc_gateway_api::JsonRpcApiError, log::{debug, error, trace}, tokio::{self, sync::mpsc}, utils::error::RippleError, @@ -28,7 +29,7 @@ use tokio_tungstenite::tungstenite::http::uri::InvalidUri; use super::endpoint_broker::{ BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerRequest, - BrokerSender, EndpointBroker, + BrokerSender, EndpointBroker, EndpointBrokerState, }; pub struct HttpBroker { @@ -55,9 +56,6 @@ async fn send_http_request( /* mix endpoint url with method */ - /* - TODONT: unwraps are bad, need to handle errors - */ let uri: Uri = format!("{}{}", uri, path) .parse() @@ -116,7 +114,11 @@ async fn body_to_bytes(body: Body) -> Vec { } impl EndpointBroker for HttpBroker { - fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + fn get_broker( + request: BrokerConnectRequest, + callback: BrokerCallback, + _broker_state: &mut EndpointBrokerState, + ) -> Self { let endpoint = request.endpoint.clone(); let (tx, mut tr) = mpsc::channel(10); let broker = BrokerSender { sender: tx }; @@ -151,7 +153,10 @@ impl EndpointBroker for HttpBroker { } else { let msg = format!("Error in http broker parsing response from http service at {}. status={:?}",uri, parts.status); error!("{}",msg); - send_broker_response(&callback, &request, error_string_to_json(msg.as_str()).to_string().as_bytes()).await; + Self::send_broker_failure_response(&callback, + JsonRpcApiError::default() + .with_id(request.rpc.ctx.call_id) + .with_message(msg.to_string()).into()); } } Err(err) => { diff --git a/core/main/src/broker/mod.rs b/core/main/src/broker/mod.rs index 421ebe141..3e7d2cbd0 100644 --- a/core/main/src/broker/mod.rs +++ b/core/main/src/broker/mod.rs @@ -22,3 +22,4 @@ pub mod rules_engine; pub mod thunder; pub mod thunder_broker; pub mod websocket_broker; +pub mod workflow_broker; diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index 6b4222a27..0bc78c131 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -18,10 +18,10 @@ use jaq_interpret::{Ctx, FilterT, ParseCtx, RcIter, Val}; use ripple_sdk::api::{ gateway::rpc_gateway_api::RpcRequest, manifest::extn_manifest::ExtnManifest, }; -use ripple_sdk::log::trace; + use ripple_sdk::{ chrono::Utc, - log::{error, info, warn}, + log::{debug, error, info, trace, warn}, serde_json::Value, utils::error::RippleError, }; @@ -50,7 +50,7 @@ impl RuleSet { } } -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Clone, Default)] pub struct RuleEndpoint { pub protocol: RuleEndpointProtocol, pub url: String, @@ -75,16 +75,25 @@ fn default_autostart() -> bool { true } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone, Default)] #[serde(rename_all = "lowercase")] #[cfg_attr(test, derive(PartialEq))] pub enum RuleEndpointProtocol { + #[default] Websocket, Http, Thunder, + Workflow, +} +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct JsonDataSource { + // configurable namespace to "stuff" an in individual result payload into + pub namespace: Option, + pub method: String, + pub params: Option, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct Rule { pub alias: String, // Not every rule needs transform @@ -94,6 +103,8 @@ pub struct Rule { pub filter: Option, #[serde(skip_serializing_if = "Option::is_none")] pub endpoint: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub sources: Option>, } #[derive(Debug, Clone, Deserialize, Default, Serialize)] @@ -152,15 +163,34 @@ impl RuleEngine { } } + pub fn load(path: &str) -> Result { + let path = Path::new(path); + if path.exists() { + let contents = fs::read_to_string(path).unwrap(); + Self::load_from_string_literal(contents) + } else { + warn!("path for the rule is invalid {}", path.display()); + Err(RippleError::InvalidInput) + } + } + pub fn load_from_string_literal(contents: String) -> Result { + let (_content, rule_set) = Self::load_from_content(contents)?; + let mut rules_engine = RuleEngine::default(); + rules_engine.rules.append(rule_set); + Ok(rules_engine.clone()) + } + pub fn build(extn_manifest: &ExtnManifest) -> Self { trace!("building rules engine {:?}", extn_manifest.rules_path); let mut engine = RuleEngine::default(); for path in extn_manifest.rules_path.iter() { let path_for_rule = Self::build_path(path, &extn_manifest.default_path); - info!("loading rule {}", path_for_rule); + debug!("loading rules file {}", path_for_rule); if let Some(p) = Path::new(&path_for_rule).to_str() { if let Ok(contents) = fs::read_to_string(p) { - info!("Rule content {}", contents); + info!("Rules content {}", contents); + info!("loading rules from path {}", path); + info!("loading rule {}", path_for_rule); if let Ok((_, rule_set)) = Self::load_from_content(contents) { engine.rules.append(rule_set) } else { @@ -180,7 +210,7 @@ impl RuleEngine { match serde_json::from_str::(&contents) { Ok(manifest) => Ok((contents, manifest)), Err(err) => { - warn!("{:?} could not load rule", err); + error!("{:?} could not load rule", err); Err(RippleError::InvalidInput) } } @@ -215,7 +245,10 @@ impl RuleEngine { } pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result { - info!("Jq rule {} input {:?}", filter, input); + info!( + "Jq rule {} input {:?}, reference {}", + filter, input, reference + ); let start = Utc::now().timestamp_millis(); // start out only from core filters, // which do not include filters in the standard library @@ -254,6 +287,27 @@ pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result) -> Value { + if values.len() == 1 { + return values[0].clone(); + } + debug!("Composing values {:?}", values); + + let mut composition_filter = ".[0]".to_string(); + for v in 1..values.len() { + composition_filter = format!("{} * .[{}]", composition_filter, v); + } + match jq_compile(Value::Array(values), &composition_filter, String::new()) { + Ok(composed_value) => composed_value, + Err(err) => { + error!("Failed to compose JSON values with error: {:?}", err); + Value::Null // Return a default value on failure + } + } +} +pub fn make_name_json_safe(name: &str) -> String { + name.replace([' ', '.', ','], "_") +} #[cfg(test)] mod tests { @@ -307,4 +361,58 @@ mod tests { let resp = jq_compile(input, filter, String::new()); assert_eq!(resp.unwrap(), "EN".to_string()); } + #[test] + fn test_composed_jq_compile() { + let a = json!({"asome": "avalue"}); + let b = json!({"bsome": "bvalue"}); + let c = json!({"csome": {"cvalue" : "nested"}}); + let vals = vec![a, b, c]; + let mut composition_filter = ".[0]".to_string(); + for v in 1..vals.len() { + composition_filter = format!("{} * .[{}]", composition_filter, v); + } + + assert!(jq_compile( + jq_compile( + Value::Array(vals.clone()), + &composition_filter, + String::new() + ) + .unwrap(), + ".asome", + String::new() + ) + .unwrap() + .as_str() + .unwrap() + .contains("avalue")); + assert!(jq_compile( + jq_compile( + Value::Array(vals.clone()), + &composition_filter, + String::new() + ) + .unwrap(), + ".bsome", + String::new() + ) + .unwrap() + .as_str() + .unwrap() + .contains("bvalue")); + assert!(jq_compile( + jq_compile( + Value::Array(vals.clone()), + &composition_filter, + String::new() + ) + .unwrap(), + ".csome.cvalue", + String::new() + ) + .unwrap() + .as_str() + .unwrap() + .contains("nested")); + } } diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index fa51d7f18..3af8594d5 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -17,7 +17,7 @@ use super::{ endpoint_broker::{ BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutput, BrokerRequest, - BrokerSender, BrokerSubMap, EndpointBroker, + BrokerSender, BrokerSubMap, EndpointBroker, EndpointBrokerState, }, thunder::thunder_plugins_status_mgr::StatusManager, thunder::user_data_migrator::UserDataMigrator, @@ -346,7 +346,11 @@ impl ThunderBroker { } impl EndpointBroker for ThunderBroker { - fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + fn get_broker( + request: BrokerConnectRequest, + callback: BrokerCallback, + _broker_state: &mut EndpointBrokerState, + ) -> Self { Self::start(request, callback) } @@ -462,7 +466,7 @@ mod tests { let (tx, _) = mpsc::channel(1); let request = BrokerConnectRequest::new("somekey".to_owned(), endpoint, tx); let callback = BrokerCallback { sender }; - ThunderBroker::get_broker(request, callback) + ThunderBroker::get_broker(request, callback, &mut EndpointBrokerState::default()) } //function to create a BrokerRequest @@ -474,8 +478,10 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + sources: None, }, subscription_processed: None, + workflow_callback: None, } } @@ -621,8 +627,10 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + sources: None, }, subscription_processed: Some(false), + workflow_callback: None, }; thndr_broker.subscribe(&subscribe_request); @@ -673,8 +681,10 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + sources: None, }, subscription_processed: Some(true), + workflow_callback: None, }; thndr_broker.subscribe(&unsubscribe_request); diff --git a/core/main/src/broker/websocket_broker.rs b/core/main/src/broker/websocket_broker.rs index b0d02b3bc..683d15b2f 100644 --- a/core/main/src/broker/websocket_broker.rs +++ b/core/main/src/broker/websocket_broker.rs @@ -21,6 +21,7 @@ use super::endpoint_broker::{ BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerRequest, BrokerSender, EndpointBroker, }; +use crate::broker::endpoint_broker::EndpointBrokerState; use futures_util::{SinkExt, StreamExt}; use ripple_sdk::{ log::{debug, error}, @@ -30,7 +31,6 @@ use std::{ collections::HashMap, sync::{Arc, RwLock}, }; - pub struct WebsocketBroker { sender: BrokerSender, cleaner: BrokerCleaner, @@ -185,7 +185,11 @@ impl WSNotificationBroker { } impl EndpointBroker for WebsocketBroker { - fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + fn get_broker( + request: BrokerConnectRequest, + callback: BrokerCallback, + _broker_state: &mut EndpointBrokerState, + ) -> Self { Self::start(request, callback) } @@ -250,7 +254,9 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + sources: None, }, + workflow_callback: None, subscription_processed: None, }; @@ -289,7 +295,9 @@ mod tests { transform: RuleTransform::default(), endpoint: None, filter: None, + sources: None, }, + workflow_callback: None, subscription_processed: None, }; let id = request.get_id(); diff --git a/core/main/src/broker/workflow_broker.rs b/core/main/src/broker/workflow_broker.rs new file mode 100644 index 000000000..297e77b73 --- /dev/null +++ b/core/main/src/broker/workflow_broker.rs @@ -0,0 +1,316 @@ +use super::endpoint_broker::{ + BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerRequest, BrokerSender, + EndpointBroker, +}; +use super::rules_engine::JsonDataSource; +use crate::broker::endpoint_broker::{BrokerOutput, EndpointBrokerState}; +use crate::broker::rules_engine::{compose_json_values, make_name_json_safe}; +use futures::future::BoxFuture; +use futures::FutureExt; + +use futures::future::try_join_all; +use ripple_sdk::api::gateway::rpc_gateway_api::{JsonRpcApiError, JsonRpcApiResponse, RpcRequest}; +use ripple_sdk::utils::error::RippleError; +use ripple_sdk::{ + log::{error, trace}, + tokio::{self, sync::mpsc}, +}; +use serde_json::json; +pub struct WorkflowBroker { + sender: BrokerSender, +} + +#[derive(Debug)] +pub enum SubBrokerErr { + RpcError(RippleError), + + JsonRpcApiError(JsonRpcApiError), +} +pub type SubBrokerResult = Result; + +async fn subbroker_call( + endpoint_broker: EndpointBrokerState, + rpc_request: RpcRequest, + source: JsonDataSource, +) -> Result { + let (brokered_tx, mut brokered_rx) = mpsc::channel::(10); + endpoint_broker.handle_brokerage( + rpc_request, + None, + Some(BrokerCallback { + sender: brokered_tx, + }), + ); + + match brokered_rx.recv().await { + Some(msg) => { + if msg.is_error() { + Err(SubBrokerErr::RpcError(RippleError::BrokerError( + msg.get_error_string(), + ))) + } else { + Ok(json!({make_name_json_safe( + &source + .clone() + .namespace + .unwrap_or(source.method.to_string()), + ): msg.data.result.unwrap_or(json!({}))})) + } + } + None => { + error!("Failed to receive message"); + Err(SubBrokerErr::RpcError(RippleError::BrokerError( + "Failed to receive message".to_string(), + ))) + } + } +} + +impl WorkflowBroker { + pub fn create_the_futures( + sources: Vec, + rpc_request: RpcRequest, + endpoint_broker: EndpointBrokerState, + ) -> Vec>> { + let mut futures = vec![]; + + for source in sources.clone() { + trace!("Source {:?}", source.clone()); + + let mut rpc_request = rpc_request.clone(); + rpc_request.method = source.method.clone(); + + // Deserialize the existing params_json + let mut existing_params = + match serde_json::from_str::(&rpc_request.params_json) { + Ok(params) => params, + Err(e) => { + error!("Failed to parse existing params_json: {:?}", e); + continue; + } + }; + + // Handle new params from the rule source + if let Some(ref params) = source.params { + // Ensure params is valid JSON + match serde_json::from_str::(params) { + Ok(new_params) => { + // Merge the new params with existing params + if let Some(existing_array) = existing_params.as_array_mut() { + existing_array.push(new_params); + } else { + error!( + "Existing params_json is not an array: {:?}", + existing_params + ); + } + } + Err(e) => { + error!("Invalid params JSON string: {:?}", e); + continue; + } + } + } + + // Serialize the merged parameters back into params_json + rpc_request.params_json = serde_json::to_string(&existing_params).unwrap(); + let t = subbroker_call(endpoint_broker.clone(), rpc_request, source).boxed(); // source is still usable here + futures.push(t); + } + futures + } + + pub async fn run_workflow( + broker_request: &BrokerRequest, + endpoint_broker: EndpointBrokerState, + ) -> SubBrokerResult { + let mut futures = Self::create_the_futures( + broker_request.rule.sources.clone().unwrap_or_default(), + broker_request.rpc.clone(), + endpoint_broker.clone(), + ); + /* + workflow steps are currently all or nothing/sudden death: if one step fails, the whole workflow fails + */ + + // Define your batch size here + let batch_size = 10; + let mut results = vec![]; + + for chunk in futures.chunks_mut(batch_size) { + match try_join_all(chunk.iter_mut().map(|f| f.as_mut()).collect::>()).await { + Ok(success) => { + results.extend(success); + } + Err(e) => { + error!( + "Error {:?} in subbroker call for workflow: {}", + e, broker_request.rpc.method + ); + return Err(SubBrokerErr::JsonRpcApiError( + JsonRpcApiError::default() + .with_code(-32001) + .with_message(format!( + "workflow error {:?}: for api {}", + e, broker_request.rpc.method + )) + .with_id(broker_request.rpc.ctx.call_id), + )); + } + } + } + + // Return an Ok result if the loop has zero elements to iterate on + let composed: JsonRpcApiResponse = broker_request.clone().into(); + let composed = composed.with_result(Some(compose_json_values(results))); + trace!("Composed {:?}", composed.result); + Ok(composed) + } + pub fn start(callback: BrokerCallback, endpoint_broker: EndpointBrokerState) -> BrokerSender { + let (tx, mut rx) = mpsc::channel::(10); + /* + This is a "meta rule": a rule that composes other rules. + */ + tokio::spawn(async move { + loop { + match rx.recv().await { + Some(broker_request) => { + trace!("Received message {:?}", broker_request); + + match Self::run_workflow(&broker_request, endpoint_broker.clone()).await { + Ok(yay) => { + Self::send_broker_success_response(&callback, yay); + } + Err(boo) => match boo { + SubBrokerErr::JsonRpcApiError(e) => { + Self::send_broker_failure_response(&callback, e.into()); + } + SubBrokerErr::RpcError(ripple_error) => { + let boo = JsonRpcApiError::default() + .with_code(-32001) + .with_message(format!( + "workflow error {:?}: for api {}", + ripple_error, broker_request.rpc.method + )) + .with_id(broker_request.rpc.ctx.call_id) + .into(); + Self::send_broker_failure_response(&callback, boo); + } + }, + } + } + None => { + error!("Failed to receive message"); + break; + } + } + } + }); + BrokerSender { sender: tx } + } +} + +impl EndpointBroker for WorkflowBroker { + fn get_broker( + _request: BrokerConnectRequest, + callback: BrokerCallback, + broker_state: &mut EndpointBrokerState, + ) -> Self { + Self { + sender: Self::start(callback, broker_state.clone()), + } + } + + fn get_sender(&self) -> super::endpoint_broker::BrokerSender { + self.sender.clone() + } + + fn get_cleaner(&self) -> super::endpoint_broker::BrokerCleaner { + BrokerCleaner::default() + } +} +/* +write exhaustive tests for the WorkflowBroker + */ + +#[cfg(test)] +pub mod tests { + + use ripple_sdk::{api::gateway::rpc_gateway_api::RpcRequest, tokio, Mockable}; + use serde_json::json; + + use crate::broker::{ + endpoint_broker::{BrokerCallback, BrokerRequest, EndpointBrokerState}, + rules_engine::{JsonDataSource, Rule, RuleEngine}, + }; + pub fn broker_request(callback: BrokerCallback) -> BrokerRequest { + let mut rule = Rule { + alias: "module.method".to_string(), + ..Default::default() + }; + let source = JsonDataSource { + method: "module.method".to_string(), + namespace: Some("module".to_string()), + ..Default::default() + }; + + rule.sources = Some(vec![source]); + BrokerRequest { + rpc: RpcRequest::mock(), + rule, + subscription_processed: None, + workflow_callback: Some(callback), + } + } + pub fn rule_engine() -> RuleEngine { + let engine = RuleEngine::load_from_string_literal( + json!({ + "endpoints": { + "workflow" : { + "protocol": "workflow", + "url": "http://asdf.com", + "jsonrpc": false + } + }, + "rules": { + "static.rule": { + "alias": "static", + "transform": { + "response": "\"Sky\"" + } + }, + "module.method": { + "alias": "workflow", + "endpoint": "workflow", + "sources": [{ + "method": "static.rule" + }] + } + } + } + ) + .to_string(), + ); + engine.unwrap() + } + pub fn endppoint_broker_state() -> EndpointBrokerState { + EndpointBrokerState::default().with_rules_engine(rule_engine()) + } + + #[tokio::test] + pub async fn test_run_workflow() { + /* + THIS IS A WORK IN PROGRESS... endpoint_broker is highly coupled, making it difficult to test without a full integration test + */ + use super::*; + + let (tx, mut _rx) = mpsc::channel::(10); + let callback = BrokerCallback { sender: tx }; + let request = broker_request(callback); + let broker = endppoint_broker_state(); + + let foo = WorkflowBroker::run_workflow(&request, broker); + let foo = foo.await; + assert!(foo.is_err()); + } +} diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index 1fb4913fd..e1602271c 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -236,10 +236,11 @@ impl FireboltGateway { match result { Ok(_) => { - if !platform_state - .endpoint_state - .handle_brokerage(request_c.clone(), extn_msg.clone()) - { + if !platform_state.endpoint_state.handle_brokerage( + request_c.clone(), + extn_msg.clone(), + None, + ) { // Route match request.clone().ctx.protocol { ApiProtocol::Extn => { diff --git a/core/sdk/src/api/gateway/rpc_gateway_api.rs b/core/sdk/src/api/gateway/rpc_gateway_api.rs index 2efcbc1b0..f4c4a9c49 100644 --- a/core/sdk/src/api/gateway/rpc_gateway_api.rs +++ b/core/sdk/src/api/gateway/rpc_gateway_api.rs @@ -54,7 +54,7 @@ impl From for AppIdentification { } } -#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)] pub struct CallContext { pub session_id: String, pub request_id: String, @@ -114,10 +114,11 @@ impl crate::Mockable for CallContext { } } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize, Default)] pub enum ApiProtocol { Bridge, Extn, + #[default] JsonRpc, } @@ -197,6 +198,60 @@ impl JsonRpcApiRequest { } } +#[derive(Clone, Default, Debug)] +pub struct JsonRpcApiError { + pub code: i32, + pub id: Option, + pub message: String, + pub method: Option, + pub params: Option, +} +impl JsonRpcApiError { + pub fn new( + code: i32, + id: Option, + message: String, + method: Option, + params: Option, + ) -> Self { + JsonRpcApiError { + code, + id, + message, + method, + params, + } + } + pub fn with_method(mut self, method: String) -> Self { + self.method = Some(method); + self + } + pub fn with_params(mut self, params: Option) -> Self { + self.params = params; + self + } + pub fn with_id(mut self, id: u64) -> Self { + self.id = Some(id); + self + } + pub fn with_message(mut self, message: String) -> Self { + self.message = message; + self + } + pub fn with_code(mut self, code: i32) -> Self { + self.code = code; + self + } + pub fn to_response(&self) -> JsonRpcApiResponse { + JsonRpcApiResponse::error(self) + } +} +impl From for JsonRpcApiResponse { + fn from(error: JsonRpcApiError) -> Self { + JsonRpcApiResponse::error(&error) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JsonRpcApiResponse { pub jsonrpc: String, @@ -224,6 +279,51 @@ impl Default for JsonRpcApiResponse { } } +impl JsonRpcApiResponse { + pub fn error(error: &JsonRpcApiError) -> Self { + JsonRpcApiResponse { + jsonrpc: "2.0".to_owned(), + id: error.id, + result: None, + error: Some(json!({"code": error.code, "message": error.message})), + method: error.method.clone(), + params: error.params.clone(), + } + } + + pub fn as_bytes(&self) -> Vec { + serde_json::to_string(self).unwrap().as_bytes().to_vec() + } + pub fn with_result(mut self, result: Option) -> Self { + self.result = result; + self.error = None; + self + } + pub fn with_method(mut self, method: Option) -> Self { + self.method = method; + self + } + pub fn with_params(mut self, params: Option) -> Self { + self.params = params; + self + } + pub fn with_id(mut self, id: u64) -> Self { + self.id = Some(id); + self + } + pub fn with_error(mut self, error: Value) -> Self { + self.error = Some(error); + self.result = None; + self + } + pub fn is_error(&self) -> bool { + self.error.is_some() + } + pub fn is_success(&self) -> bool { + self.result.is_some() + } +} + impl crate::Mockable for JsonRpcApiResponse { fn mock() -> Self { JsonRpcApiResponse { @@ -281,7 +381,7 @@ impl RpcStats { } } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize, Default)] pub struct RpcRequest { pub method: String, pub params_json: String, diff --git a/core/sdk/src/extn/extn_client_message.rs b/core/sdk/src/extn/extn_client_message.rs index 70953fcd3..1bf44c6a3 100644 --- a/core/sdk/src/extn/extn_client_message.rs +++ b/core/sdk/src/extn/extn_client_message.rs @@ -92,7 +92,7 @@ use super::{extn_id::ExtnId, ffi::ffi_message::CExtnMessage}; /// /// `callback` |Async Channel [async_channel::Sender] | Usually added by `Main` to the `target` to respond back to the `requestor`| -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ExtnMessage { pub id: String, pub requestor: ExtnId, @@ -184,6 +184,11 @@ pub enum ExtnPayload { Response(ExtnResponse), Event(ExtnEvent), } +impl Default for ExtnPayload { + fn default() -> Self { + ExtnPayload::Request(ExtnRequest::Config(Config::DefaultName)) + } +} impl ExtnPayload { pub fn extract(&self) -> Option { diff --git a/core/sdk/src/extn/extn_id.rs b/core/sdk/src/extn/extn_id.rs index 64932bd97..7c7406ba0 100644 --- a/core/sdk/src/extn/extn_id.rs +++ b/core/sdk/src/extn/extn_id.rs @@ -24,13 +24,14 @@ use serde_json::Value; use super::extn_client_message::{ExtnPayload, ExtnPayloadProvider, ExtnRequest, ExtnResponse}; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub enum ExtnClassId { Gateway, Device, DataGovernance, Distributor, Protected, + #[default] Jsonrpsee, Launcher, Internal, @@ -67,8 +68,9 @@ impl ExtnClassId { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub enum ExtnType { + #[default] Main, Channel, Extn, @@ -140,7 +142,7 @@ impl ExtnClassType { /// Below capability means the given plugin offers a JsonRpsee rpc extension for a service named bridge /// /// `ripple:extn:jsonrpsee:bridge` -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Default)] pub struct ExtnId { pub _type: ExtnType, pub class: ExtnClassId, diff --git a/core/sdk/src/framework/ripple_contract.rs b/core/sdk/src/framework/ripple_contract.rs index dea302134..f5bc1932a 100644 --- a/core/sdk/src/framework/ripple_contract.rs +++ b/core/sdk/src/framework/ripple_contract.rs @@ -35,10 +35,11 @@ use serde_json::Value; /// b. Distributor Extn/Channel /// c. Combination of a Device + Distributor Extensions -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)] #[serde(rename_all = "snake_case")] pub enum RippleContract { /// Used by Main application to provide internal contracts for Extensions + #[default] Internal, /// Provided by the distributor useful for adding Governance implementation for handling /// privacy information and other sensitive data. diff --git a/core/sdk/src/utils/error.rs b/core/sdk/src/utils/error.rs index 8d11ae733..718449e7e 100644 --- a/core/sdk/src/utils/error.rs +++ b/core/sdk/src/utils/error.rs @@ -41,6 +41,7 @@ pub enum RippleError { ServiceNotReady, BrokerError(String), } + impl std::fmt::Display for RippleError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self {