Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hermes): create latest TWAP endpoint #2131

Merged
merged 10 commits into from
Nov 26, 2024
382 changes: 284 additions & 98 deletions apps/hermes/server/Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.7.2"
version = "0.8.0"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down Expand Up @@ -34,6 +34,7 @@ pyth-sdk-solana = { version = "0.10.2" }
pythnet-sdk = { path = "../../../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] }
rand = { version = "0.8.5" }
reqwest = { version = "0.11.14", features = ["blocking", "json"] }
rust_decimal = { version = "1.36.0" }
secp256k1 = { version = "0.27.0", features = ["rand", "recovery", "serde"] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = { version = "1.0.93" }
Expand Down
12 changes: 12 additions & 0 deletions apps/hermes/server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ where
rest::latest_vaas,
rest::price_feed_ids,
rest::latest_price_updates,
rest::latest_twaps,
rest::latest_publisher_stake_caps,
rest::timestamp_price_updates,
rest::price_feeds_metadata,
Expand All @@ -126,6 +127,8 @@ where
types::ParsedPublisherStakeCapsUpdate,
types::ParsedPublisherStakeCap,
types::AssetType,
types::TwapsResponse,
types::ParsedPriceFeedTwap,
)
),
tags(
Expand All @@ -152,6 +155,15 @@ where
get(rest::price_stream_sse_handler),
)
.route("/v2/updates/price/latest", get(rest::latest_price_updates))
.route(
"/v2/updates/twap/:window_seconds/latest",
get(rest::latest_twaps),
)
// TODO(Tejas)
// .route(
// "/v2/updates/twap/:window_seconds/:publish_time",
// get(rest::latest_twaps),
// )
.route(
"/v2/updates/publisher_stake_caps/latest",
get(rest::latest_publisher_stake_caps),
Expand Down
14 changes: 11 additions & 3 deletions apps/hermes/server/src/api/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub use {
price_feed_ids::*,
ready::*,
v2::{
latest_price_updates::*, latest_publisher_stake_caps::*, price_feeds_metadata::*, sse::*,
timestamp_price_updates::*,
latest_price_updates::*, latest_publisher_stake_caps::*, latest_twaps::*,
price_feeds_metadata::*, sse::*, timestamp_price_updates::*,
},
};

Expand Down Expand Up @@ -125,7 +125,7 @@ mod tests {
crate::state::{
aggregate::{
AggregationEvent, PriceFeedsWithUpdateData, PublisherStakeCapsWithUpdateData,
ReadinessMetadata, RequestTime, Update,
ReadinessMetadata, RequestTime, TwapsWithUpdateData, Update,
},
benchmarks::BenchmarksState,
cache::CacheState,
Expand Down Expand Up @@ -198,6 +198,14 @@ mod tests {
) -> Result<PublisherStakeCapsWithUpdateData> {
unimplemented!("Not needed for this test")
}
async fn get_twaps_with_update_data(
&self,
_price_ids: &[PriceIdentifier],
_start_time: RequestTime,
_end_time: RequestTime,
) -> Result<TwapsWithUpdateData> {
unimplemented!("Not needed for this test")
}
}

#[tokio::test]
Expand Down
3 changes: 3 additions & 0 deletions apps/hermes/server/src/api/rest/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ pub async fn index() -> impl IntoResponse {
"/api/get_price_feed?id=<price_feed_id>&publish_time=<publish_time_in_unix_timestamp>(&verbose=true)(&binary=true)",
"/api/get_vaa?id=<price_feed_id>&publish_time=<publish_time_in_unix_timestamp>",
"/api/get_vaa_ccip?data=<0x<price_feed_id_32_bytes>+<publish_time_unix_timestamp_be_8_bytes>>",

"/v2/updates/price/latest?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
"/v2/updates/price/stream?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)(&allow_unordered=false)(&benchmarks_only=false)",
"/v2/updates/price/<timestamp>?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
"/v2/price_feeds?(query=btc)(&asset_type=crypto|equity|fx|metal|rates)",
"/v2/updates/twap/<window_seconds>/latest?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
"/v2/updates/twap/<window_seconds>/<timestamp>?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
])
}
165 changes: 165 additions & 0 deletions apps/hermes/server/src/api/rest/v2/latest_twaps.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use {
crate::{
api::{
rest::{validate_price_ids, RestError},
types::{BinaryUpdate, EncodingType, ParsedPriceFeedTwap, PriceIdInput, TwapsResponse},
ApiState,
},
state::aggregate::{Aggregates, RequestTime},
},
anyhow::Result,
axum::{
extract::{Path, State},
Json,
},
base64::{engine::general_purpose::STANDARD as base64_standard_engine, Engine as _},
pyth_sdk::{DurationInSeconds, PriceIdentifier, UnixTimestamp},
serde::Deserialize,
serde_qs::axum::QsQuery,
utoipa::IntoParams,
};

#[derive(Debug, Deserialize, IntoParams)]
#[into_params(parameter_in=Path)]
pub struct LatestTwapsPathParams {
/// The time window in seconds over which to calculate the TWAP, ending at the current time.
/// For example, a value of 300 would return the most recent 5 minute TWAP.
/// Must be greater than 0 and less than or equal to 600 seconds (10 minutes).
tejasbadadare marked this conversation as resolved.
Show resolved Hide resolved
#[param(example = "300")]
#[serde(deserialize_with = "validate_twap_window")]
window_seconds: u64,
}

#[derive(Debug, Deserialize, IntoParams)]
#[into_params(parameter_in=Query)]
pub struct LatestTwapsQueryParams {
/// Get the most recent TWAP (time weighted average price) for this set of price feed ids.
/// The `binary` data contains the signed start & end cumulative price updates needed to calculate
/// the TWAPs on-chain. The `parsed` data contains the calculated TWAPs.
///
/// This parameter can be provided multiple times to retrieve multiple price updates,
/// for example see the following query string:
///
/// ```
/// ?ids[]=a12...&ids[]=b4c...
/// ```
#[param(rename = "ids[]")]
#[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")]
ids: Vec<PriceIdInput>,

/// Optional encoding type. If true, return the price update in the encoding specified by the encoding parameter. Default is `hex`.
tejasbadadare marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default)]
encoding: EncodingType,

/// If true, include the calculated TWAP in the `parsed` field of each returned feed. Default is `true`.
#[serde(default = "default_true")]
parsed: bool,

/// If true, invalid price IDs in the `ids` parameter are ignored. Only applicable to the v2 APIs. Default is `false`.
#[serde(default)]
ignore_invalid_price_ids: bool,
}

fn validate_twap_window<'de, D>(deserializer: D) -> Result<DurationInSeconds, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let seconds = DurationInSeconds::deserialize(deserializer)?;
if seconds == 0 || seconds > 600 {
return Err(D::Error::custom(
"twap_window_seconds must be in range (0, 600]",
));
}
Ok(seconds)
}
fn default_true() -> bool {
true
}

/// Get the latest TWAP by price feed id with a custom time window.
///
/// Given a collection of price feed ids, retrieve the latest Pyth TWAP price for each price feed.
#[utoipa::path(
get,
path = "/v2/updates/twap/{window_seconds}/latest",
responses(
(status = 200, description = "TWAPs retrieved successfully", body = TwapsResponse),
(status = 404, description = "Price ids not found", body = String)
),
params(
LatestTwapsPathParams,
LatestTwapsQueryParams
)
)]
pub async fn latest_twaps<S>(
State(state): State<ApiState<S>>,
Path(path_params): Path<LatestTwapsPathParams>,
QsQuery(params): QsQuery<LatestTwapsQueryParams>,
) -> Result<Json<TwapsResponse>, RestError>
where
S: Aggregates,
{
let price_id_inputs: Vec<PriceIdentifier> =
params.ids.into_iter().map(|id| id.into()).collect();
let price_ids: Vec<PriceIdentifier> =
validate_price_ids(&state, &price_id_inputs, params.ignore_invalid_price_ids).await?;

// Collect start and end bounds for the TWAP window
let window_seconds = path_params.window_seconds as i64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you want this to be an i64 in the params to avoid this conversion here?

Copy link
Contributor Author

@tejasbadadare tejasbadadare Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i made window_seconds of type DurationInSeconds from pyth_sdk, which is a u64 (probably to enforce a nonnegative duration.) it seemed like a good fit but i see your point as well -- any preference on i64 vs u64 here?

let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as UnixTimestamp;
let start_time = current_time - window_seconds;

// Calculate the average
let twaps_with_update_data = Aggregates::get_twaps_with_update_data(
&*state.state,
&price_ids,
RequestTime::FirstAfter(start_time),
RequestTime::Latest,
)
.await
.map_err(|e| {
tracing::warn!(
"Error getting TWAPs for price IDs {:?} with update data: {:?}",
price_ids,
e
);
RestError::UpdateDataNotFound
})?;

let twap_update_data = twaps_with_update_data.update_data;
let binary: Vec<BinaryUpdate> = twap_update_data
.into_iter()
.map(|data_vec| {
let encoded_data = data_vec
.into_iter()
.map(|data| match params.encoding {
EncodingType::Base64 => base64_standard_engine.encode(data),
EncodingType::Hex => hex::encode(data),
})
.collect();
BinaryUpdate {
encoding: params.encoding,
data: encoded_data,
}
})
.collect();

let parsed: Option<Vec<ParsedPriceFeedTwap>> = if params.parsed {
Some(
twaps_with_update_data
.twaps
.into_iter()
.map(Into::into)
.collect(),
)
} else {
None
};

let twap_resp = TwapsResponse { binary, parsed };
Ok(Json(twap_resp))
}
1 change: 1 addition & 0 deletions apps/hermes/server/src/api/rest/v2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod latest_price_updates;
pub mod latest_publisher_stake_caps;
pub mod latest_twaps;
pub mod price_feeds_metadata;
pub mod sse;
pub mod timestamp_price_updates;
51 changes: 49 additions & 2 deletions apps/hermes/server/src/api/types.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use {
super::doc_examples,
crate::state::aggregate::{PriceFeedUpdate, PriceFeedsWithUpdateData, Slot, UnixTimestamp},
crate::state::aggregate::{
PriceFeedTwap, PriceFeedUpdate, PriceFeedsWithUpdateData, Slot, UnixTimestamp,
},
anyhow::Result,
base64::{engine::general_purpose::STANDARD as base64_standard_engine, Engine as _},
borsh::{BorshDeserialize, BorshSerialize},
derive_more::{Deref, DerefMut},
pyth_sdk::{Price, PriceFeed, PriceIdentifier},
rust_decimal::Decimal,
serde::{Deserialize, Serialize},
std::{
collections::BTreeMap,
Expand Down Expand Up @@ -140,7 +143,7 @@ pub struct RpcPrice {
pub conf: u64,
/// The exponent associated with both the price and confidence interval. Multiply those values
/// by `10^expo` to get the real value.
#[schema(example=-8)]
#[schema(example = -8)]
pub expo: i32,
/// When the price was published. The `publish_time` is a unix timestamp, i.e., the number of
/// seconds since the Unix epoch (00:00:00 UTC on 1 Jan 1970).
Expand Down Expand Up @@ -244,6 +247,50 @@ impl From<PriceFeedUpdate> for ParsedPriceUpdate {
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct ParsedPriceFeedTwap {
pub id: RpcPriceIdentifier,
/// The start unix timestamp of the window
pub start_timestamp: i64,
/// The end unix timestamp of the window
pub end_timestamp: i64,
/// The calculated time weighted average price over the window
pub twap: RpcPrice,
/// The % of slots where the network was down over the TWAP window.
/// A value of zero indicates no slots were missed over the window, and
/// a value of one indicates that every slot was missed over the window.
/// This is a float value stored as a string to avoid precision loss.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is actually being returned as a string (utoipa's default behavior for Decimal)

#[serde(with = "pyth_sdk::utils::as_string")]
tejasbadadare marked this conversation as resolved.
Show resolved Hide resolved
#[schema(value_type = String, example="0.00001")]
pub down_slots_ratio: Decimal,
}
impl From<PriceFeedTwap> for ParsedPriceFeedTwap {
fn from(pft: PriceFeedTwap) -> Self {
Self {
id: RpcPriceIdentifier::from(pft.id),
start_timestamp: pft.start_timestamp,
end_timestamp: pft.end_timestamp,
twap: RpcPrice {
price: pft.twap.price,
conf: pft.twap.conf,
expo: pft.twap.expo,
publish_time: pft.twap.publish_time,
Copy link
Contributor

@guibescos guibescos Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publish_time seems redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah fair. i reused RpcPrice here to avoid creating a slightly different struct just for twap prices, since the other fields fit well (price, conf, expo). thoughts?

},
down_slots_ratio: pft.down_slots_ratio,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct TwapsResponse {
/// Each BinaryUpdate contains the start & end cumulative price updates used to
/// calculate a given price feed's TWAP.
pub binary: Vec<BinaryUpdate>,

/// The calculated TWAPs for each price ID
#[serde(skip_serializing_if = "Option::is_none")]
pub parsed: Option<Vec<ParsedPriceFeedTwap>>,
}

#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize, Clone, ToSchema)]
pub struct ParsedPublisherStakeCapsUpdate {
Expand Down
Loading
Loading