Skip to content

Commit

Permalink
Adaptive collect interval
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Nov 28, 2024
1 parent 7058272 commit 8898ac5
Showing 1 changed file with 64 additions and 25 deletions.
89 changes: 64 additions & 25 deletions stress/src/metrics_latency.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{
cell::RefCell,
collections::{BTreeMap, HashMap},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, Weak,
},
thread::{sleep, JoinHandle},
time::{Duration, Instant},
u64,
};

use opentelemetry::{metrics::MeterProvider, KeyValue};
Expand All @@ -16,6 +19,12 @@ use opentelemetry_sdk::{
},
Resource,
};
use rand::{rngs, Rng, SeedableRng};

thread_local! {
/// Store random number generator for each thread
static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
}

// copy/paste from opentelemetry-sdk/benches/metric.rs
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -55,14 +64,14 @@ impl MetricReader for SharedReader {
fn main() {
let available_threads: usize = std::thread::available_parallelism().map_or(1, |p| p.get());

for threads_count in [available_threads / 3, available_threads * 3] {
for threads_count in [available_threads / 4, available_threads * 4] {
println!("*** updates, using {threads_count} threads ***");
measure_update_latency(&format!("no attribs"), threads_count, |_i, _j| []);
measure_update_latency(&format!("1 attrib"), threads_count, |_i, _j| {
measure_update_latency(&format!("no attribs"), threads_count, 10000000, |_i, _j| []);
measure_update_latency(&format!("1 attrib"), threads_count, 10000000, |_i, _j| {
[KeyValue::new("some_key", 1)]
});

measure_update_latency(&format!("9 attribs"), threads_count, |_i, _j| {
measure_update_latency(&format!("9 attribs"), threads_count, 10000000, |_i, _j| {
// for http.server.request.duration as defined in https://opentelemetry.io/docs/specs/semconv/http/http-metrics/
[
KeyValue::new("http.request.method", "GET"),
Expand All @@ -77,11 +86,11 @@ fn main() {
]
});
println!("*** inserts, using {threads_count} threads ***");
measure_update_latency(&format!("1 attrib"), threads_count, |i, j| {
measure_update_latency(&format!("1 attrib"), threads_count, 1500, |i, j| {
[KeyValue::new(format!("some_key{i}"), j as i64)]
});

measure_update_latency(&format!("10 attribs"), threads_count, |i, j| {
measure_update_latency(&format!("10 attribs"), threads_count, 1500, |i, j| {
[
KeyValue::new(format!("random{i}"), j as i64),
KeyValue::new("http.request.method", "GET"),
Expand All @@ -95,10 +104,15 @@ fn main() {
KeyValue::new("server.port", 8080),
]
});
println!("*** mix mostly updates (200 attribute-sets), using {threads_count} threads ***");
measure_update_latency(&format!("10 attribs"), threads_count, |_i, j| {
println!("*** mix mostly updates (~10% inserts), using {threads_count} threads ***");
measure_update_latency(&format!("10 attribs"), threads_count, 10000, |_i, j| {
let randomness: i64 = 20
- CURRENT_RNG.with(|rng| {
let mut rng = rng.borrow_mut();
rng.gen_range(0..20)
});
[
KeyValue::new("random", (j % 200) as i64),
KeyValue::new("random", (j / 10) as i64 + randomness),
KeyValue::new("http.request.method", "GET"),
KeyValue::new("url.scheme", "not_found"),
KeyValue::new("error.type", 404),
Expand All @@ -116,6 +130,7 @@ fn main() {
fn measure_update_latency<const N: usize>(
msg: &str,
threads_count: usize,
collect_around: u64,
attribs: fn(usize, u64) -> [KeyValue; N],
) {
let reader = SharedReader::new(
Expand All @@ -134,7 +149,6 @@ fn measure_update_latency<const N: usize>(
});
let total_iterations = Arc::new(AtomicU64::new(0));
let iterate_flag = Arc::new(AtomicBool::new(true));
let start = Instant::now();
// run multiple threads and measure how time it takes to update metric
for thread_idx in 0..threads_count {
let hist = histogram.clone();
Expand All @@ -155,25 +169,48 @@ fn measure_update_latency<const N: usize>(
total_iterations.fetch_add(iter_idx, Ordering::AcqRel);
}));
}

let total_measurements = collect_measurements(reader, threads, iterate_flag, collect_around);

assert_eq!(total_measurements, total_iterations.load(Ordering::Acquire));

print_stats(msg, stats, total_measurements);
}

fn collect_measurements(
reader: SharedReader,
threads: Vec<JoinHandle<()>>,
iterate_flag: Arc<AtomicBool>,
collect_around: u64,
) -> u64 {
let start = Instant::now();
let mut total_count = 0;
let mut wait_for_next_collect = Duration::from_micros(500);
while start.elapsed() < Duration::from_secs(1) {
// we should collect frequently enough, so that inserts doesn't reach overflow (2000)
// but not too frequently, so that it will be visible in p99 (have effect on +1% of measurements)
// with 0.3ms sleep, collect will be called around 1900-2500 times (depending on load)
// so we might get around ~2M/s inserts, until they start overflow
// and it's low enough so it shouldn't influence 1% of updates (p99).
std::thread::sleep(Duration::from_micros(300));
total_count += collect_and_return_count(&reader);
sleep(wait_for_next_collect);
let collected = collect_and_return_count(&reader);
// calculate wait interval so that the next collect cycle would be close to `collect_around`
let ratio = collected as f64 / collect_around as f64;
let clamped = if ratio > 2.0 {
2.0
} else if ratio < 0.5 {
0.5
} else {
ratio
};
wait_for_next_collect =
Duration::from_micros((wait_for_next_collect.as_micros() as f64 / clamped) as u64);
total_count += collected;
}
iterate_flag.store(false, Ordering::Release);
threads.into_iter().for_each(|t| {
t.join().unwrap();
});
total_count += collect_and_return_count(&reader);
total_count
}

let total_measurements = total_iterations.load(Ordering::Acquire);
assert_eq!(total_count, total_measurements);

fn print_stats(msg: &str, stats: Vec<Arc<Mutex<HashMap<u64, u64>>>>, total_measurements: u64) {
let stats = stats
.into_iter()
.map(|s| Arc::into_inner(s).unwrap().into_inner().unwrap())
Expand All @@ -189,20 +226,22 @@ fn measure_update_latency<const N: usize>(
});

println!("{msg}");
println!("\titer {}", format_count(total_measurements));
println!("\tavg {}", format_time(sum / total_measurements as u64));
println!("\titer {}", format_count(total_measurements));
println!(
"\tp50 {}",
"\tp50 {}",
format_time(get_percentile_value(total_measurements, &stats, 50))
);
println!(
"\tp95 {}",
"\tp95 {}",
format_time(get_percentile_value(total_measurements, &stats, 95))
);
println!(
"\tp99 {}",
"\tp99 {}",
format_time(get_percentile_value(total_measurements, &stats, 99))
);
println!("\tavg {}", format_time(sum / total_measurements as u64));
println!("\tbest {}", format_time(*stats.iter().next().unwrap().0));
println!("\tworst {}", format_time(*stats.iter().last().unwrap().0));
}

fn collect_and_return_count(reader: &SharedReader) -> u64 {
Expand Down

0 comments on commit 8898ac5

Please sign in to comment.