Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: logger implementation for event feeder like cosmos icon eth substrate #130

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions event_feed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ dirs = '5.0.1'
envy = '0.4'
anyhow = '1.0.82'
icon-sdk = '1.2.0'
tendermint-rpc = {version = "0.35.0", features = ["http-client", "websocket-client"]}
tendermint-rpc = { version = "0.35.0", features = [
"http-client",
"websocket-client",
] }
futures = "0.3.30"
base64 = "0.22.0"

[dependencies.ethers]
version = '2.0.8'
default_features = false
features = [
'ws',
'rustls',
]
features = ['ws', 'rustls']

[dependencies.serde]
version = '1.0.198'
Expand All @@ -34,8 +33,4 @@ path = '../runtime/lite'

[dependencies.tokio]
version = '1.36.0'
features = [
'macros',
'time',
'rt-multi-thread',
]
features = ['macros', 'time', 'rt-multi-thread']
36 changes: 32 additions & 4 deletions event_feed/src/cosmos/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use super::*;
use crate::{common, cosmos};
use crate::common::{self, ChainConfig};
use anyhow::*;
use futures::StreamExt;
use runtime::{logger::CoreLogger, Logger};
use serde_json::Value;
use tendermint_rpc::event::{Event, EventData};
use tendermint_rpc::query::EventType;
use tendermint_rpc::{SubscriptionClient, WebSocketClient};

/// The `CosmosFeed` struct in Rust defines a feed for handling Cosmos blockchain events with filtering
/// capabilities.
Expand All @@ -19,6 +23,7 @@ use tendermint_rpc::event::{Event, EventData};
pub struct CosmosFeed {
pub chain_config: ChainConfig,
pub events: Vec<String>,
pub logger: CoreLogger,
}

/// The above Rust code defines an implementation for a `CosmosFeed` struct. Here is a breakdown of what
Expand All @@ -35,16 +40,18 @@ impl CosmosFeed {
/// Returns:
///
/// A `CosmosFeed` struct instance is being returned.
pub fn new(chain_config: common::ChainConfig) -> CosmosFeed {
pub fn new(chain_config: common::ChainConfig, logger: CoreLogger) -> CosmosFeed {
let events = chain_config
.event_filter
.split(',')
.map(|e| e.to_string())
.filter(|e| !e.is_empty())
.collect::<Vec<String>>();
// let logger = CoreLogger::new(Some("./event-feed.log"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code clean up

let cosmos = CosmosFeed {
chain_config,
events,
logger,
};
cosmos
}
Expand All @@ -65,6 +72,10 @@ impl CosmosFeed {
let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url)
.await
.unwrap();
self.logger.info(&format!(
"Following the publisher {}",
self.chain_config.node_url
));
let driver_handle = tokio::spawn(async move { driver.run().await });

let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
Expand All @@ -77,10 +88,12 @@ impl CosmosFeed {
let filter_events = events
.iter()
.filter(|tendermint_event| {
Self::convert_to_feeder_event(tendermint_event, &self.chain_config).is_some()
Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config)
.is_some()
})
.flat_map(|tendermint_event| {
Self::convert_to_feeder_event(tendermint_event, &self.chain_config).unwrap()
Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config)
.unwrap()
})
.collect::<Vec<serde_json::Value>>();

Expand All @@ -91,6 +104,7 @@ impl CosmosFeed {
}

drop(subs);
self.logger.info("Websocket connection closed!!");
let _ = driver_handle.await;
Ok(())
}
Expand All @@ -112,9 +126,12 @@ impl CosmosFeed {
/// The function `convert_to_feeder_event` returns an `Option` containing a `Vec` of
/// `serde_json::Value` elements.
fn convert_to_feeder_event(
&self,
event: &Event,
chain_config: &ChainConfig,
) -> Option<Vec<serde_json::Value>> {
self.logger
.debug(&format!("Processing events : {:?}", event));
match event.data.clone() {
EventData::LegacyNewBlock {
ref block,
Expand All @@ -124,8 +141,16 @@ impl CosmosFeed {
let block = block.as_ref().unwrap();
let block_number = block.header.version.block as usize;
let hash_string = block.header.last_commit_hash.map(|h| h.to_string());
self.logger.info(&format!(
"Processing LegacyNewBlockEvent for block: {}",
block.header.version.block
));

let filtered_events: Vec<Value> = if chain_config.event_filter.is_empty() {
self.logger.info(&format!(
"Processing all events from block : {}",
block_number
));
result_begin_block
.unwrap()
.events
Expand All @@ -141,6 +166,7 @@ impl CosmosFeed {
.map(|e| serde_json::to_value(e).unwrap())
.collect()
} else {
self.logger.info("Filtering events based on the event name");
result_begin_block
.unwrap()
.events
Expand All @@ -161,8 +187,10 @@ impl CosmosFeed {
};

if !filtered_events.is_empty() {
self.logger.info("returning the filtered events");
Some(filtered_events)
} else {
self.logger.info("No events matched the filter");
None
}
}
Expand Down
10 changes: 0 additions & 10 deletions event_feed/src/cosmos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
use crate::common::ChainConfig;
use futures::StreamExt;
use std::fmt::Error;
use std::thread::sleep;
use std::time::Duration;
use tendermint_rpc::event::Event;
use tendermint_rpc::query::{EventType, Query};
use tendermint_rpc::Order::Ascending;
use tendermint_rpc::{Client, SubscriptionClient, WebSocketClient};

mod feeder;
pub use feeder::*;

Expand Down
14 changes: 8 additions & 6 deletions event_feed/src/cosmos/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod tests {

use super::super::*;
use crate::common::FeederEvent;
use crate::common::ChainConfig;
use runtime::{logger::CoreLogger};

#[tokio::test]
async fn test_cosmos_feed_new() {
Expand All @@ -11,8 +11,9 @@ mod tests {
contract_filter: "".to_string(),
chain: "cosmos".to_string(),
};
let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));

let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);
assert_eq!(cosmos_feed.chain_config, chain_config);
assert_eq!(cosmos_feed.events.len(), 1);
assert!(cosmos_feed.events.contains(&"transfer".to_string()));
Expand All @@ -30,8 +31,8 @@ mod tests {
let callback = |events: Vec<serde_json::Value>| {
assert!(events.is_empty());
};

let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));
let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);
let result = cosmos_feed.event_feed(&callback).await;

assert!(result.is_ok());
Expand All @@ -52,7 +53,8 @@ mod tests {
// Simulate an error condition
assert!(events.is_empty());
};
let cosmos_feed = CosmosFeed::new(chain_config.clone());
let logger = CoreLogger::new(Some("./event-feed.log"));
let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger);

let _ = cosmos_feed.event_feed(&callback).await;
}
Expand Down
16 changes: 15 additions & 1 deletion event_feed/src/eth/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use super::*;
use runtime::{logger::CoreLogger, Logger};

pub struct EthFeed {
eth_service: Provider<Ws>,
events: Vec<(String, H256)>,
contracts: Vec<String>,
logger: CoreLogger,
}

impl EthFeed {
pub async fn new(config: ChainConfig) -> Result<EthFeed> {
pub async fn new(config: ChainConfig, logger: CoreLogger) -> Result<EthFeed> {
let events = config
.event_filter
.split(';')
Expand All @@ -27,6 +30,7 @@ impl EthFeed {
eth_service: client,
events,
contracts,
logger,
};

Ok(eth_feed)
Expand All @@ -47,6 +51,10 @@ impl EthFeed {
let filter = Filter::new().from_block(last_block - 25).events(events);

let mut stream = client.subscribe_logs(&filter).await?;
self.logger.info(&format!(
"Subscribed to events with the filter : {:?}",
filter
));

while let Some(log) = stream.next().await {
if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address))
Expand All @@ -72,6 +80,10 @@ impl EthFeed {
let tx_receipt = client.get_transaction_receipt(tx_hash).await;

if let Ok(Some(tx_receipt)) = tx_receipt {
self.logger.info(&format!(
"Received trabsaction receipt for the tx_hash : {:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typographical error "transaction"

tx_hash
));
let mut logs = Vec::<Value>::new();

for log in tx_receipt.logs.iter() {
Expand All @@ -80,6 +92,8 @@ impl EthFeed {
{
for evt in self.events.iter() {
if log.topics[0] == evt.1 {
self.logger
.info(&format!("Matched event : {:?}", evt.0.clone()));
logs.push(
serde_json::to_value(
FeederEvent {
Expand Down
37 changes: 33 additions & 4 deletions event_feed/src/icon/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::substrate::polkadot::runtime_apis::core::Core;

use super::*;
use runtime::{logger::CoreLogger, Logger};

pub struct IconFeed {
icon_service: IconService,
events: Vec<String>,
score: Vec<String>,
logger: CoreLogger,
}

impl IconFeed {
pub fn new(config: ChainConfig) -> Result<IconFeed> {
pub fn new(config: ChainConfig, logger: CoreLogger) -> Result<IconFeed> {
let events = config
.event_filter
.split(',')
Expand All @@ -28,6 +32,7 @@ impl IconFeed {
icon_service,
events,
score,
logger,
};

Ok(icon_feed)
Expand All @@ -38,20 +43,30 @@ impl IconFeed {
let mut score_filter = false;

if !self.events.is_empty() || !self.score.is_empty() {
self.logger
.info("Checking the evnt filters or score filters");
let tx_hash: String =
serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap();
self.logger.info(&format!(
"Filtering the events with the tx_hash : {:?}",
tx_hash
));

let event_logs = get_event_logs_by_tx_hash(&self.icon_service, &tx_hash).await?;

for event_log in event_logs {
if !&self.score.is_empty() {
for filter_score in &self.score {
if filter_score == &event_log.score_address {
self.logger
.info(&format!("Matched the score filter : {:?}", filter_score));
score_filter = true;
break;
}
}
} else {
self.logger
.info("No score filter found, allowing all the transactions");
score_filter = true;
}

Expand All @@ -64,6 +79,8 @@ impl IconFeed {
}
} else {
events_filter = true;
self.logger
.info("No event filter found, allowing all the transactions");
}

if events_filter && score_filter {
Expand All @@ -75,14 +92,19 @@ impl IconFeed {
events_filter = true;
score_filter = true;
}

Ok(events_filter & score_filter)
let result = events_filter && score_filter;
self.logger
.info(&format!("Filtering the result : {:?}", result));
Ok(result)
}

pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
let mut latest_height = get_icon_block_height(&self.icon_service).await?;
let mut old_height = latest_height - 1;

self.logger.info(&format!(
"Event feed started at {:?}, {:?}",
latest_height, old_height
));
loop {
if old_height < latest_height {
let block = match self
Expand Down Expand Up @@ -113,12 +135,19 @@ impl IconFeed {
}
}

self.logger.info(&format!(
"Filtered the {:?} transactions",
filtered_tx.len()
));

if !filtered_tx.is_empty() {
cb(filtered_tx)
}

old_height += 1;
} else {
self.logger
.info("No new blocks got detected, sleeping for 1 second");
sleep(Duration::from_secs(1));
}

Expand Down
Loading