Skip to content

Commit

Permalink
chore: update workflow invoker
Browse files Browse the repository at this point in the history
  • Loading branch information
shanithkk committed Aug 2, 2023
1 parent 908e0cc commit 4b9be54
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 36 deletions.
71 changes: 49 additions & 22 deletions actions/workflow-invoker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use actions_common::Config;
use chesterfield::sync::{Client, Database};
use types::Message;

use types::DbDatas;
// #[cfg(test)]
use types::{Era, Topic};
use crate::types::update_with;
use types::Topic;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct Input {
Expand Down Expand Up @@ -60,38 +59,40 @@ impl Action {
self.context.as_mut().expect("Action not Initialized!")
}

pub fn fetch_input(&mut self) -> Result<Vec<DbDatas>, Error> {
pub fn fetch_input(&mut self) -> Result<Vec<Value>, Error> {
let id = self.params.messages.clone()[0].topic.clone();
let data = self.get_context().get_document(&id)?;
println!("{:?}", data);
let parsed = serde_json::from_value::<Topic>(data)?;
Ok(parsed.data)
}

pub fn invoke_trigger(&mut self, payload: Vec<DbDatas>) -> Result<Value, Error> {
pub fn invoke_trigger(&mut self, payload: &mut Vec<Value>) -> Result<Value, Error> {
let mut failed_triggers = vec![];
for message in payload.iter() {
let era = serde_json::from_str::<Era>(&self.params.messages[0].value)?;
for message in payload.iter_mut() {
let data = serde_json::from_str::<Value>(&self.params.messages[0].value).unwrap();
update_with(message, &data);

let url = match message.get("url") {
Some(_x) => message["url"].to_string(),
None => String::new(),
};

let trigger = self.params.polkadot_payout_trigger.clone();
if self
.get_context()
.invoke_trigger(
&trigger,
&serde_json::json!({"allowed_hosts": [message.endpoint.clone()],
"data": {
"address": message.validator,
"era": era.era,
"owner_key": message.key,
"url": message.endpoint
}}),
&serde_json::json!({"allowed_hosts": [url , get_request_host() ],
"data": message}),
)
.is_err()
{
failed_triggers.push(message.validator.clone());
failed_triggers.push(self.params.messages[0].value.clone());
}
}
if !failed_triggers.is_empty() {
return Err(format!("error in triggers {}", failed_triggers.join(", ")))
return Err(format!("error in triggers {:?}", failed_triggers))
.map_err(serde::de::Error::custom);
}
Ok(serde_json::json!({
Expand All @@ -108,9 +109,8 @@ pub fn main(args: Value) -> Result<Value, Error> {
#[cfg(not(test))]
action.init();

let payload = action.fetch_input()?;
println!("22 {:?}", payload);
action.invoke_trigger(payload)
let mut payload = action.fetch_input()?;
action.invoke_trigger(&mut payload)
}

#[cfg(test)]
Expand Down Expand Up @@ -138,19 +138,46 @@ mod tests {
polkadot_payout_trigger: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(),
messages: vec![Message {
topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(),
value: "{ \"era\" :0}".to_string(),
value: serde_json::json!({ "era" :0}).to_string(),
}],
});
action.init(&config);
let workflow_db = action.connect_db(&action.params.db_url, &action.params.db_name);
let workflow_management_db_context = Context::new(workflow_db, None);
let doc = serde_json::json!({
"data": [DbDatas{ endpoint: "todo!()".to_string(), validator: "todo!()".to_string(), key: "todo!()".to_string() }]
"data": [{ "url": "todo!()".to_string(), "validator": "todo!()".to_string(), "owner_key": "todo!()".to_string() }]
});
workflow_management_db_context
let _ = workflow_management_db_context
.insert_document(&doc, Some(action.params.messages[0].topic.clone()));
let res = action.fetch_input();
assert!(res.is_ok());
couchdb.delete().await.expect("Stopping Container Failed");
}

#[test]
fn test_update_value() {
let action = Action::new(Input {
db_url: "url".to_string(),
db_name: "test".to_string(),
polkadot_payout_trigger: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(),
messages: vec![Message {
topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(),
value: serde_json::json!({ "era" :0}).to_string(),
}],
});

let mut doc = serde_json::json!({
"url": "todo!()".to_string(), "validator": "todo!()".to_string(), "owner_key": "todo!()".to_string() }
);
let data = serde_json::from_str::<Value>(&action.params.messages[0].value).unwrap();
update_with(&mut doc, &data);
assert_eq!(
doc,
serde_json::json!({"url":"todo!()","era":0,"owner_key":"todo!()","validator":"todo!()"})
)
}
}

fn get_request_host() -> String {
std::env::var("__OW_API_HOST").unwrap()
}
8 changes: 0 additions & 8 deletions actions/workflow-invoker/src/types/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use super::Topic;
use serde_derive::{Deserialize, Serialize};
use serde_json::{from_str, Error};

// pub type Payload = Vec<(Era, Topic)>;

Expand All @@ -18,9 +16,3 @@ pub struct Era {
#[serde(rename = "era")]
pub era: u32,
}

impl Message {
pub fn parse_value(&self) -> Result<Era, Error> {
from_str(&self.value)
}
}
16 changes: 16 additions & 0 deletions actions/workflow-invoker/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,19 @@ pub use source::Source;
pub use topic::Topic;
mod data;
pub use data::*;

pub fn update_with(dest: &mut serde_json::Value, src: &serde_json::Value) {
use serde_json::Value::{Null, Object};

match (dest, src) {
(&mut Object(ref mut map_dest), &Object(ref map_src)) => {
// map_dest and map_src both are Map<String, Value>
for (key, value) in map_src {
// if key is not in map_dest, create a Null object
// then only, update the value
*map_dest.entry(key.clone()).or_insert(Null) = value.clone();
}
}
(_, _) => panic!("update_with only works with two serde_json::Value::Object s"),
}
}
6 changes: 2 additions & 4 deletions actions/workflow-invoker/src/types/topic.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;

use super::DbDatas;
use serde_json::Value;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Topic {
#[serde(skip_serializing, rename(deserialize = "_id"))]
pub id: String,
#[serde(skip_serializing, rename(deserialize = "_rev"))]
pub rev: String,
pub data: Vec<DbDatas>,
pub data: Vec<Value>,
}

#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
4 changes: 2 additions & 2 deletions actions/workflow_management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ mod tests {
topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(),
token: "akjDSIJGFIJHNSdmngknomlmxcgknhNDlnglnlkoNSDG".to_string(),
input: serde_json::json!({
"endpoint": "".to_string(),
"key": "".to_string(),
"url": "".to_string(),
"owner_key": "".to_string(),
"validator": "".to_string(),
}),
});
Expand Down

0 comments on commit 4b9be54

Please sign in to comment.