Skip to content

Commit

Permalink
feat(node): Revert iota-proxy and update it to the latest upstream (#…
Browse files Browse the repository at this point in the history
…4391)

* Revert "feat(crates): remove iota-proxy crate (#1319)"

This reverts commit 80a89a0.

* chores: Update dependency

* Revert "feat(iota-node, iota-config, iota-swarm-config): remove metrics client push (#1377)"

This reverts commit b9d1ea5.

* cherry pick: Update to latest iota-proxy

* chores: Format codes

* cherry pick: Update iota-node/metrics to latest upstream

* chores: sort Cargo.toml

* feat(docker): Add docker build script for iota-proxy

* chores: Fix typo

* chores: Add newline after headers

* chores: Remove ipnetwork from dependency

* chores: Add newline after headers in iota-node metrics.rs

* fix: Adapt version from workspace an update authors

* refactor: Use Self instead of BridgeSummary in default()

* chore: update Cargo.lock

* Update jemalloc setup in  iota-proxy docker file

* chores: Update Cargo.lock

* fix: use correct push-url values for testnet and mainnet

* fix: rename header check func

* feat: remove hard-coded default remote-write url

* fix: remove the unnecessary inventory hostname label and env variable since it is only used for a debug log anyway

---------

Co-authored-by: Alexander Sporn <[email protected]>
Co-authored-by: Thibault Martinez <[email protected]>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent bf68fbb commit e6bb362
Show file tree
Hide file tree
Showing 29 changed files with 3,890 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .github/crates-filters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ iota-protocol-config:
- "crates/iota-protocol-config/**"
iota-protocol-config-macros:
- "crates/iota-protocol-config-macros/**"
iota-proxy:
- "crates/iota-proxy/**"
iota-replay:
- "crates/iota-replay/**"
iota-rest-api:
Expand Down
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ members = [
"crates/iota-proc-macros",
"crates/iota-protocol-config",
"crates/iota-protocol-config-macros",
"crates/iota-proxy",
"crates/iota-replay",
"crates/iota-rest-api",
"crates/iota-rosetta",
Expand Down Expand Up @@ -217,6 +218,7 @@ async-trait = "0.1.61"
aws-config = "0.56"
axum = { version = "0.7", default-features = false, features = ["tokio", "http1", "http2", "json", "matched-path", "original-uri", "form", "query", "ws"] }
axum-extra = { version = "0.9", features = ["typed-header"] }
axum-server = { git = "https://github.com/bmwill/axum-server.git", rev = "f44323e271afdd1365fd0c8b0a4c0bbdf4956cb7", version = "0.6", default-features = false, features = ["tls-rustls"] }
backoff = { version = "0.4.0", features = ["futures", "futures-core", "pin-project-lite", "tokio", "tokio_1"] }
base64 = "0.21.2"
base64-url = "2"
Expand Down Expand Up @@ -295,6 +297,7 @@ prometheus = "0.13.3"
proptest = "1.1.0"
proptest-derive = "0.3.0"
prost = "0.13"
protobuf = { version = "2.28", features = ["with-bytes"] }
quinn-proto = "0.11.6"
quote = "1.0.23"
rand = "0.8.5"
Expand Down
12 changes: 12 additions & 0 deletions crates/iota-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ pub struct NodeConfig {
#[serde(default)]
pub checkpoint_executor_config: CheckpointExecutorConfig,

#[serde(skip_serializing_if = "Option::is_none")]
pub metrics: Option<MetricsConfig>,

/// In a `iota-node` binary, this is set to
/// SupportedProtocolVersions::SYSTEM_DEFAULT in iota-node/src/main.rs.
/// It is present in the config so that it can be changed by tests in
Expand Down Expand Up @@ -757,6 +760,15 @@ impl AuthorityStorePruningConfig {
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct MetricsConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub push_interval_seconds: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub push_url: Option<String>,
}

#[derive(Default, Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct DBCheckpointConfig {
Expand Down
7 changes: 6 additions & 1 deletion crates/iota-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use clap::{ArgGroup, Parser};
use iota_common::sync::async_once_cell::AsyncOnceCell;
use iota_config::{Config, NodeConfig, node::RunWithRange};
use iota_core::runtime::IotaRuntimes;
use iota_node::IotaNode;
use iota_node::{IotaNode, metrics};
use iota_types::{
committee::EpochId, messages_checkpoint::CheckpointSequenceNumber, multiaddr::Multiaddr,
supported_protocol_versions::SupportedProtocolVersions,
Expand Down Expand Up @@ -92,6 +92,11 @@ fn main() {
config.metrics_address
);

{
let _enter = runtimes.metrics.enter();
metrics::start_metrics_push_task(&config, registry_service.clone());
}

if let Some(listen_address) = args.listen_address {
config.network_address = listen_address;
}
Expand Down
156 changes: 153 additions & 3 deletions crates/iota-node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,164 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use axum::http::header;
use iota_metrics::RegistryService;
use iota_network::tonic::Code;
use iota_network_stack::metrics::MetricsCallbackProvider;
use prometheus::{
HistogramVec, IntCounterVec, IntGaugeVec, Registry, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
Encoder, HistogramVec, IntCounterVec, IntGaugeVec, PROTOBUF_FORMAT, Registry,
register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry,
};
use tracing::error;

const METRICS_PUSH_TIMEOUT: Duration = Duration::from_secs(45);

pub struct MetricsPushClient {
certificate: std::sync::Arc<iota_tls::SelfSignedCertificate>,
client: reqwest::Client,
}

impl MetricsPushClient {
pub fn new(network_key: iota_types::crypto::NetworkKeyPair) -> Self {
use fastcrypto::traits::KeyPair;
let certificate = std::sync::Arc::new(iota_tls::SelfSignedCertificate::new(
network_key.private(),
iota_tls::IOTA_VALIDATOR_SERVER_NAME,
));
let identity = certificate.reqwest_identity();
let client = reqwest::Client::builder()
.identity(identity)
.build()
.unwrap();

Self {
certificate,
client,
}
}

pub fn certificate(&self) -> &iota_tls::SelfSignedCertificate {
&self.certificate
}

pub fn client(&self) -> &reqwest::Client {
&self.client
}
}

/// Starts a task to periodically push metrics to a configured endpoint if a
/// metrics push endpoint is configured.
pub fn start_metrics_push_task(config: &iota_config::NodeConfig, registry: RegistryService) {
use fastcrypto::traits::KeyPair;
use iota_config::node::MetricsConfig;

const DEFAULT_METRICS_PUSH_INTERVAL: Duration = Duration::from_secs(60);

let (interval, url) = match &config.metrics {
Some(MetricsConfig {
push_interval_seconds,
push_url: Some(url),
}) => {
let interval = push_interval_seconds
.map(Duration::from_secs)
.unwrap_or(DEFAULT_METRICS_PUSH_INTERVAL);
let url = reqwest::Url::parse(url).expect("unable to parse metrics push url");
(interval, url)
}
_ => return,
};

// make a copy so we can make a new client later when we hit errors posting
// metrics
let config_copy = config.clone();
let mut client = MetricsPushClient::new(config_copy.network_key_pair().copy());

async fn push_metrics(
client: &MetricsPushClient,
url: &reqwest::Url,
registry: &RegistryService,
) -> Result<(), anyhow::Error> {
// now represents a collection timestamp for all of the metrics we send to the
// proxy
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;

let mut metric_families = registry.gather_all();
for mf in metric_families.iter_mut() {
for m in mf.mut_metric() {
m.set_timestamp_ms(now);
}
}

let mut buf: Vec<u8> = vec![];
let encoder = prometheus::ProtobufEncoder::new();
encoder.encode(&metric_families, &mut buf)?;

let mut s = snap::raw::Encoder::new();
let compressed = s.compress_vec(&buf).map_err(|err| {
error!("unable to snappy encode; {err}");
err
})?;

let response = client
.client()
.post(url.to_owned())
.header(reqwest::header::CONTENT_ENCODING, "snappy")
.header(header::CONTENT_TYPE, PROTOBUF_FORMAT)
.body(compressed)
.timeout(METRICS_PUSH_TIMEOUT)
.send()
.await?;

if !response.status().is_success() {
let status = response.status();
let body = match response.text().await {
Ok(body) => body,
Err(error) => format!("couldn't decode response body; {error}"),
};
return Err(anyhow::anyhow!(
"metrics push failed: [{}]:{}",
status,
body
));
}

tracing::debug!("successfully pushed metrics to {url}");

Ok(())
}

tokio::spawn(async move {
tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service");

let mut interval = tokio::time::interval(interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut errors = 0;
loop {
interval.tick().await;

if let Err(error) = push_metrics(&client, &url, &registry).await {
errors += 1;
if errors >= 10 {
// If we hit 10 failures in a row, start logging errors.
tracing::error!("unable to push metrics: {error}; new client will be created");
} else {
tracing::warn!("unable to push metrics: {error}; new client will be created");
}
// aggressively recreate our client connection if we hit an error
client = MetricsPushClient::new(config_copy.network_key_pair().copy());
} else {
errors = 0;
}
}
});
}

pub struct IotaNodeMetrics {
pub jwk_requests: IntCounterVec,
Expand Down
61 changes: 61 additions & 0 deletions crates/iota-proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
[package]
name = "iota-proxy"
version.workspace = true
authors = ["IOTA Foundation <[email protected]>"]
edition = "2021"
license = "Apache-2.0"
publish = false

[dependencies]
# external dependencies
anyhow.workspace = true
axum.workspace = true
axum-extra.workspace = true
axum-server.workspace = true
bytes.workspace = true
clap.workspace = true
const-str.workspace = true
fastcrypto.workspace = true
futures.workspace = true
git-version.workspace = true
hex.workspace = true
hyper.workspace = true
itertools.workspace = true
multiaddr = "0.17.0"
once_cell.workspace = true
prometheus.workspace = true
prost.workspace = true
protobuf.workspace = true
rand.workspace = true
reqwest.workspace = true
rustls.workspace = true
rustls-pemfile = "2.1.2"
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
serde_yaml.workspace = true
snap.workspace = true
tokio = { workspace = true, features = ["full"] }
tower.workspace = true
tower-http.workspace = true
tracing.workspace = true
url.workspace = true

# internal dependencies
iota-metrics.workspace = true
iota-tls.workspace = true
iota-types.workspace = true
telemetry-subscribers.workspace = true

[dev-dependencies]
# external dependencies
axum-server.workspace = true
mime = "0.3"
serde_json.workspace = true
tower.workspace = true

# internal dependencies
iota-types = { workspace = true, features = ["test-utils"] }

[build-dependencies]
prost-build = "0.13.1"
18 changes: 18 additions & 0 deletions crates/iota-proxy/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Mysten Labs, Inc.
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::io::Result;
fn main() -> Result<()> {
println!("cargo:rerun-if-changed=build.rs");
println!("cargo:rerun-if-env-changed=BUILD_REMOTE_WRITE");

// add this env var to build. you'll need protoc installed locally and a copy of
// the proto files
if option_env!("BUILD_REMOTE_WRITE").is_some() {
prost_build::compile_protos(&["protobufs/remote.proto", "protobufs/types.proto"], &[
"protobufs/",
])?;
}
Ok(())
}
Loading

0 comments on commit e6bb362

Please sign in to comment.