Skip to content

Commit

Permalink
feat: gas market (#2621)
Browse files Browse the repository at this point in the history
* feat: gas market

* cargo fmt

* fix: btc and RGas exchange rates

* fix: test

* cargo clippy

---------

Co-authored-by: mx819812523 <[email protected]>
  • Loading branch information
mx819812523 and mx819812523 authored Sep 13, 2024
1 parent d34e08a commit f9282cc
Show file tree
Hide file tree
Showing 12 changed files with 1,164 additions and 76 deletions.
300 changes: 225 additions & 75 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ members = [
"crates/rooch-open-rpc-macros",
"crates/rooch-open-rpc-spec",
"crates/rooch-open-rpc-spec-builder",
"crates/rooch-oracle",
"crates/rooch-pipeline-processor",
"crates/rooch-proposer",
"crates/rooch-relayer",
Expand All @@ -62,7 +63,8 @@ default-members = [
"moveos/moveos",
"frameworks/framework-release",
"crates/rooch",
"crates/rooch-faucet"
"crates/rooch-faucet",
"crates/rooch-oracle"
]

# All workspace members should inherit these keys
Expand Down Expand Up @@ -169,6 +171,7 @@ ethers = { version = "2.0.7", features = ["legacy"] }
eyre = "0.6.8"
fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "56f6223b84ada922b6cb2c672c69db2ea3dc6a13" }
futures = "0.3.28"
futures-util = "0.3.30"
hex = "0.4.3"
rustc-hex = "2.1"
itertools = "0.13.0"
Expand Down Expand Up @@ -215,10 +218,12 @@ tiny-keccak = { version = "2", features = ["keccak", "sha3"] }
tiny-bip39 = "1.0.0"
tokio = { version = "1.40.0", features = ["full"] }
tokio-util = "0.7.12"
tokio-tungstenite = { version = "0.23.1", features = ["native-tls"] }
tonic = { version = "0.8", features = ["gzip"] }
tracing = "0.1.37"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15" }
tungstenite = "0.24.0"

codespan-reporting = "0.11.1"
codespan = "0.11.1"
Expand Down
23 changes: 23 additions & 0 deletions crates/rooch-oracle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "rooch-oracle"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
tungstenite = { workspace = true }
serde_json = { workspace = true }
futures-util = { workspace = true }
tracing-subscriber = { workspace = true }
log = { workspace = true }
clap = { workspace = true }
reqwest = { workspace = true }
anyhow = { workspace = true }
bcs = { workspace = true }

rooch-rpc-api = { workspace = true }
move-core-types = { workspace = true }
moveos-types = { workspace = true }
rooch-types = { workspace = true }
rooch-rpc-client = { workspace = true }
127 changes: 127 additions & 0 deletions crates/rooch-oracle/src/binance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::data_process::{execute_transaction, parse_and_convert, subscribe_websocket, State};
use clap::Parser;
use log::info;
use move_core_types::account_address::AccountAddress;
use move_core_types::identifier::Identifier;
use move_core_types::language_storage::ModuleId;
use moveos_types::move_types::FunctionId;
use moveos_types::transaction::MoveAction;
use rooch_rpc_client::wallet_context::WalletContext;
use serde_json::Value;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tokio::time::Instant;

#[derive(Parser, Debug, Clone)]
pub struct BinanceConfig {
#[arg(
long,
default_value = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
)]
pub binance_url: String,

#[arg(
long,
env = "ROOCH_BINANCE_WALLET_DIR",
default_value = "~/.rooch/rooch_config"
)]
pub binance_wallet_dir: Option<PathBuf>,

#[arg(long, env = "ROOCH_BINANCE_WALLET_PWD")]
pub binance_wallet_pwd: Option<String>,

#[arg(long, default_value = "10")]
pub binance_submit_interval: u64,

#[arg(long, env = "ROOCH_BINANCE_ORACLE_ID")]
pub binance_oracle_id: String,

#[arg(long, env = "ROOCH_BINANCE_ADMIN_ID")]
pub binance_admin_id: String,
}

pub struct Binance {
pub wallet_state: Arc<RwLock<State>>,
binance_config: BinanceConfig,
}

impl Binance {
pub async fn new(config: BinanceConfig) -> Self {
let wallet = WalletContext::new(config.binance_wallet_dir.clone()).unwrap();
let wallet_pwd = config.binance_wallet_pwd.clone();
Self {
wallet_state: Arc::new(RwLock::new(State {
wallet_pwd,
context: wallet,
})),
binance_config: config,
}
}

pub async fn subscribe(&self, package_id: &str) {
let (tx, mut rx) = mpsc::channel(1);
let url = self.binance_config.binance_url.clone();
let handle = tokio::spawn(async move {
subscribe_websocket(url, tx, None).await;
});
let function_id = FunctionId::new(
ModuleId::new(
AccountAddress::from_hex_literal(package_id).unwrap(),
Identifier::new("trusted_oracle").unwrap(),
),
Identifier::new("submit_data").unwrap(),
);
let address_mapping = self
.wallet_state
.read()
.await
.context
.address_mapping
.clone();
let oracle_obj = parse_and_convert(
format!("object_id:{}", self.binance_config.binance_oracle_id).as_str(),
&address_mapping,
);
let ticker = parse_and_convert("string:BTCUSD", &address_mapping);
let identifier = parse_and_convert("string:Binance", &address_mapping);
let admin_obj = parse_and_convert(
format!("object_id:{}", self.binance_config.binance_admin_id).as_str(),
&address_mapping,
);
let mut last_execution = Instant::now() - Duration::from_secs(10); // 初始化为10秒前
while let Some(msg) = rx.recv().await {
let wallet_state = self.wallet_state.write().await;

let msg_value = serde_json::from_str::<Value>(&msg).unwrap();
if msg_value["c"].as_str().is_none()
|| Instant::now().duration_since(last_execution)
< Duration::from_secs(self.binance_config.binance_submit_interval)
{
continue;
}
last_execution = Instant::now();
let price = format!(
"u256:{}",
msg_value["c"].as_str().unwrap().parse::<f64>().unwrap() * 10f64.powi(8)
);
let decimal = "8u8".to_string();
let args = vec![
oracle_obj.clone(),
ticker.clone(),
parse_and_convert(price.as_str(), &address_mapping),
parse_and_convert(decimal.as_str(), &address_mapping),
identifier.clone(),
admin_obj.clone(),
];
let move_action = MoveAction::new_function_call(function_id.clone(), vec![], args);
let _ = execute_transaction(move_action, wallet_state).await;
info!("Received Binance price: {}", msg_value["c"]);
}
handle.await.expect("The task failed");
}
}
129 changes: 129 additions & 0 deletions crates/rooch-oracle/src/data_process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use log::{error, info, warn};
use move_core_types::account_address::AccountAddress;
use moveos_types::transaction::MoveAction;
use rooch_rpc_api::jsonrpc_types::KeptVMStatusView;
use rooch_rpc_client::wallet_context::WalletContext;
use rooch_types::address::RoochAddress;
use rooch_types::function_arg::FunctionArg;
use serde_json::Value;
use std::collections::BTreeMap;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::{mpsc, RwLockWriteGuard};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;

pub async fn subscribe_websocket(
url: String,
tx: mpsc::Sender<String>,
subscribe_msg: Option<Value>,
) {
loop {
let (ws_stream, _) = match connect_async(&url).await {
Ok(stream) => stream,
Err(e) => {
warn!("Failed to connect: {} error:{}", url, e);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
};

info!("Connected to {}", url);

let (mut write, mut read) = ws_stream.split();

if subscribe_msg.is_some() {
if let Err(e) = write
.send(Message::Text(subscribe_msg.clone().unwrap().to_string()))
.await
{
warn!("Failed to send message: {}", e);
continue;
}
}

while let Some(message) = read.next().await {
match message {
Ok(msg) => {
if let Message::Text(text) = msg {
if let Err(e) = tx.send(text).await {
warn!("Failed to send message through channel: {}", e);
break;
}
}
}
Err(e) => {
warn!("Error: {}", e);
break;
}
}
}

warn!("Connection lost or error occurred, restarting...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

pub async fn subscribe_http(url: String, tx: mpsc::Sender<Value>, interval: u64) {
loop {
match reqwest::get(&url).await {
Ok(response) => {
if let Ok(value) = response.json::<Value>().await {
if let Err(e) = tx.send(value).await {
warn!("Failed to send message through channel: {}", e);
}
}
}
Err(e) => {
warn!("Failed to fetch price: {}", e);
}
};

tokio::time::sleep(Duration::from_secs(interval)).await;
}
}

pub struct State {
pub(crate) wallet_pwd: Option<String>,
pub context: WalletContext,
}

#[allow(clippy::needless_lifetimes)]
pub async fn execute_transaction<'a>(
action: MoveAction,
state: RwLockWriteGuard<'a, State>,
) -> Result<()> {
let sender: RoochAddress = state.context.client_config.active_address.unwrap();
let pwd = state.wallet_pwd.clone();
let result = state
.context
.sign_and_execute(sender, action, pwd, None)
.await;
match result {
Ok(tx) => match tx.execution_info.status {
KeptVMStatusView::Executed => {
info!("Executed success tx_has: {}", tx.execution_info.tx_hash);
}
_ => {
error!("Transfer gases failed {:?}", tx.execution_info.status);
}
},
Err(e) => {
error!("Transfer gases failed {}", e);
}
};
Ok(())
}

pub fn parse_and_convert(arg: &str, address_mapping: &BTreeMap<String, AccountAddress>) -> Vec<u8> {
let mapping = |input: &str| -> Option<AccountAddress> { address_mapping.get(input).cloned() };
FunctionArg::from_str(arg)
.unwrap()
.into_bytes(&mapping)
.unwrap()
}
7 changes: 7 additions & 0 deletions crates/rooch-oracle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

pub mod binance;
pub mod data_process;
pub mod okx;
pub mod pyth;
62 changes: 62 additions & 0 deletions crates/rooch-oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;
use rooch_oracle::binance::{Binance, BinanceConfig};
use rooch_oracle::okx::{Okx, OkxConfig};
use rooch_oracle::pyth::{Pyth, PythConfig};

#[derive(Parser, Clone)]
#[clap(
name = "Rooch Oracle",
about = "Oracle backend for BTC tokens price on Rooch",
rename_all = "kebab-case"
)]
pub struct Config {
#[clap(flatten)]
pub okx_config: OkxConfig,
#[clap(flatten)]
pub binance_config: BinanceConfig,
#[clap(flatten)]
pub pyth_config: PythConfig,
#[clap(short, long, env = "ROOCH_ORACLE_PACKAGE")]
pub package_id: String,
}

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt::try_init();

let config = Config::parse();
let Config {
okx_config,
binance_config,
pyth_config,
package_id,
} = config;
let okx_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let okx = Okx::new(okx_config).await;
okx.subscribe(package_id.as_str()).await;
}
});
let binance_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let binance = Binance::new(binance_config).await;
binance.subscribe(package_id.as_str()).await;
}
});
let pyth_handle = tokio::spawn({
let package_id = package_id.clone();
async move {
let pyth = Pyth::new(pyth_config).await;
pyth.subscribe(package_id.as_str()).await;
}
});

okx_handle.await.expect("okx error");
binance_handle.await.expect("binance error");
pyth_handle.await.expect("binance error")
}
Loading

0 comments on commit f9282cc

Please sign in to comment.