diff --git a/.github/workflows/sightglass.yml b/.github/workflows/sightglass.yml index 50fc1332..fb621edc 100644 --- a/.github/workflows/sightglass.yml +++ b/.github/workflows/sightglass.yml @@ -8,7 +8,7 @@ on: env: CARGO_TERM_COLOR: always - RUST_LOG: info + RUST_LOG: debug # The Wasmtime commit that we build the bench API from for testing. Bumping # this will automatically cause us to update our CI cache on the next run. @@ -73,8 +73,5 @@ jobs: if: runner.os == 'Windows' run: echo "SIGHTGLASS_TEST_ENGINE=$(pwd)\\wasmtime\\target\\release\\wasmtime_bench_api.dll" >> $env:GITHUB_ENV - - name: Build all - run: cargo +nightly build --verbose --all - - name: Test all - run: cargo +nightly test --verbose --all + run: cargo +nightly test --verbose --all -- --nocapture --test-threads 1 diff --git a/crates/cli/src/benchmark.rs b/crates/cli/src/benchmark.rs index c1a1bcdf..57381c35 100644 --- a/crates/cli/src/benchmark.rs +++ b/crates/cli/src/benchmark.rs @@ -5,8 +5,8 @@ use sightglass_data::{Format, Measurement, Phase}; use sightglass_recorder::measure::Measurements; use sightglass_recorder::{bench_api::BenchApi, benchmark::benchmark, measure::MeasureType}; use std::{ - fs, - io::{self, BufWriter, Write}, + env, fs, + io::{self, BufWriter, Read, Write}, path::{Path, PathBuf}, process::Command, process::Stdio, @@ -116,85 +116,123 @@ impl BenchmarkCommand { "iterations-per-process must be greater than zero" ); - if self.processes == 1 { - self.execute_in_current_process() + if env::var("__SIGHTGLASS_CHILD").is_ok() { + self.execute_child() } else { - self.execute_in_multiple_processes() + self.execute_parent() } } - /// Execute benchmark(s) in the provided engine(s) using the current process. - pub fn execute_in_current_process(&self) -> Result<()> { - let mut output_file: Box = if let Some(file) = self.output_file.as_ref() { - Box::new(BufWriter::new(fs::File::create(file)?)) - } else { - Box::new(io::stdout()) - }; + /// Execute a single Wasm benchmark with a single Engine in the current + /// child process. + pub fn execute_child(&self) -> Result<()> { + // The parent process is responsible for ensuring that all these things + // are true for child processes. + assert_eq!(self.processes, 1); + assert_eq!(self.engines.len(), 1); + assert_eq!(self.wasm_files.len(), 1); + assert!(self.output_file.is_none()); + assert!(self.raw); + assert_eq!(self.output_format, Format::Json); + + let engine = &self.engines[0]; + let engine_path = Path::new(engine); + assert!( + engine_path.is_file(), + "parent should have already built the engine, if necessary" + ); - let wasm_files: Vec<_> = self - .wasm_files - .iter() - .map(|f| f.display().to_string()) - .collect(); - let mut all_measurements = vec![]; + log::info!("Using benchmark engine: {}", engine_path.display()); + let lib = libloading::Library::new(&engine_path)?; + let mut bench_api = unsafe { BenchApi::new(&lib)? }; - for engine in &self.engines { - let engine_path = get_built_engine(engine)?; - log::info!("Using benchmark engine: {}", engine_path.display()); - let lib = libloading::Library::new(&engine_path)?; - let mut bench_api = unsafe { BenchApi::new(&lib)? }; - - for wasm_file in &wasm_files { - log::info!("Using Wasm benchmark: {}", wasm_file); - - // Use the provided --working-dir, otherwise find the Wasm file's parent directory. - let working_dir = self.get_working_directory(&wasm_file)?; - log::info!("Using working directory: {}", working_dir.display()); - - // Read the Wasm bytes. - let bytes = fs::read(&wasm_file).context("Attempting to read Wasm bytes")?; - log::debug!("Wasm benchmark size: {} bytes", bytes.len()); - - let mut measurements = Measurements::new(this_arch(), engine, wasm_file); - let mut measure = self.measure.build(); - - // Run the benchmark (compilation, instantiation, and execution) several times in - // this process. - for i in 0..self.iterations_per_process { - let wasm_hash = { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - let mut hasher = DefaultHasher::new(); - wasm_file.hash(&mut hasher); - hasher.finish() - }; - let stdout = format!("stdout-{:x}-{}-{}.log", wasm_hash, std::process::id(), i); - let stdout = Path::new(&stdout); - let stderr = format!("stderr-{:x}-{}-{}.log", wasm_hash, std::process::id(), i); - let stderr = Path::new(&stderr); - let stdin = None; - - benchmark( - &mut bench_api, - &working_dir, - stdout, - stderr, - stdin, - &bytes, - self.stop_after_phase.clone(), - &mut measure, - &mut measurements, - )?; - - self.check_output(Path::new(wasm_file), stdout, stderr)?; - measurements.next_iteration(); - } + let wasm_file = self.wasm_files[0].display().to_string(); + log::info!("Using Wasm benchmark: {}", wasm_file); + + let working_dir = self.get_working_directory(&wasm_file)?; + log::info!("Using working directory: {}", working_dir.display()); + + let wasm_bytes = fs::read(&wasm_file).context("Attempting to read Wasm bytes")?; + log::debug!("Wasm benchmark size: {} bytes", wasm_bytes.len()); + + let mut measurements = Measurements::new(this_arch(), engine, &wasm_file); + let mut measure = self.measure.build(); - all_measurements.extend(measurements.finish()); + // Run the benchmark (compilation, instantiation, and execution) several times in + // this process. + for i in 0..self.iterations_per_process { + if i == 0 { + // To ensure that the first iteration doesn't coincide with + // other child processes' initializations, tell the parent we + // are initialized now, so that it can wait on all child + // processes' initialization before starting iterations. + log::debug!("Notifying parent we are initialized"); + self.notify_parent()?; } + + let wasm_hash = { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + wasm_file.hash(&mut hasher); + hasher.finish() + }; + let stdout = format!("stdout-{:x}-{}-{}.log", wasm_hash, std::process::id(), i); + let stdout = Path::new(&stdout); + let stderr = format!("stderr-{:x}-{}-{}.log", wasm_hash, std::process::id(), i); + let stderr = Path::new(&stderr); + let stdin = None; + + log::debug!("Waiting for parent to tell us to run an iteration"); + self.wait_for_parent()?; + benchmark( + &mut bench_api, + &working_dir, + stdout, + stderr, + stdin, + &wasm_bytes, + self.stop_after_phase.clone(), + &mut measure, + &mut measurements, + )?; + log::debug!("Notifying parent we finished an iteration"); + self.notify_parent()?; + + self.check_output(Path::new(&wasm_file), stdout, stderr)?; + measurements.next_iteration(); } - self.write_results(&all_measurements, &mut output_file)?; + let measurements = measurements.finish(); + let stdout = io::stdout(); + let stdout = stdout.lock(); + serde_json::to_writer(stdout, &measurements)?; + Ok(()) + } + + /// Wait for the parent process to write a byte to our (child process's) + /// stdin. + fn wait_for_parent(&self) -> Result<()> { + debug_assert!(env::var("__SIGHTGLASS_CHILD").is_ok()); + let stdin = io::stdin(); + let mut stdin = stdin.lock(); + let mut buf = [0; 1]; + stdin + .read_exact(&mut buf) + .context("failed to read a byte from stdin")?; + Ok(()) + } + + /// Notify the parent process that we (the child process) finished running + /// an iteration. + fn notify_parent(&self) -> Result<()> { + debug_assert!(env::var("__SIGHTGLASS_CHILD").is_ok()); + let stdout = io::stdout(); + let mut stdout = stdout.lock(); + stdout + .write_all(&[b'\n']) + .context("failed to write a byte to stdout")?; + stdout.flush().context("failed to flush stdout")?; Ok(()) } @@ -267,9 +305,9 @@ impl BenchmarkCommand { Ok(()) } - /// Execute the benchmark(s) by spawning multiple processes. Each of the spawned processes will - /// run the `execute_in_current_process` function above. - fn execute_in_multiple_processes(&self) -> Result<()> { + /// Execute the benchmark(s) by spawning multiple processes. Each of the + /// spawned processes will run the `execute_child` function above. + fn execute_parent(&self) -> Result<()> { let mut output_file: Box = if let Some(file) = self.output_file.as_ref() { Box::new(BufWriter::new(fs::File::create(file)?)) } else { @@ -286,17 +324,19 @@ impl BenchmarkCommand { let mut rng = SmallRng::seed_from_u64(0x1337_4242); - // Worklist that we randomly sample from. + // Worklist of benchmarking child processes that we randomly sample + // from. let mut choices = vec![]; for engine in &self.engines { // Ensure that each of our engines is built before we spawn any - // child processes (potentially in a different working directory, - // and therefore potentially invalidating relative paths used here). + // child processes. let engine = get_built_engine(engine)?; for wasm in &self.wasm_files { - choices.push((engine.clone(), wasm, self.processes)); + for _ in 0..self.processes { + choices.push(Child::new(self, &this_exe, &engine, wasm)?); + } } } @@ -305,58 +345,60 @@ impl BenchmarkCommand { while !choices.is_empty() { let index = rng.gen_range(0, choices.len()); - let (engine, wasm, procs_left) = &mut choices[index]; - - let mut command = Command::new(&this_exe); - command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .arg("benchmark") - .arg("--processes") - .arg("1") - .arg("--iterations-per-process") - .arg(self.iterations_per_process.to_string()) - .arg("--engine") - .arg(&engine) - .arg("--measure") - .arg(self.measure.to_string()) - .arg("--raw") - .arg("--output-format") - // Always use JSON when privately communicating with a - // subprocess. - .arg(Format::Json.to_string()); - - if self.small_workloads { - command.env("WASM_BENCH_USE_SMALL_WORKLOAD", "1"); - } - - if let Some(phase) = self.stop_after_phase { - command.arg("--stop-after").arg(phase.to_string()); - } + let child = &mut choices[index]; - command.arg("--").arg(&wasm); + log::info!( + "Running benchmark iteration in child process {}", + child.process.id() + ); + child.run_one_iteration()?; - let output = command - .output() - .context("failed to run benchmark subprocess")?; + if child.iterations > 0 { + // This child process has more iterations to complete before the + // child prints its measurements to `stdout`. + continue; + } + // Close the child's `stdin`. + // + // This isn't strictly necessary, but should help catch bugs where + // the child is trying to wait on notification from the parent to + // run another iteration, but the parent thinks that the child + // should be done running iterations. + drop(child.process.stdin.take().unwrap()); + + // Read its results from `stdout`. + // + // Do this before waiting on the child to exit. This way we don't + // deadlock waiting on the child to exit while it is blocked trying + // to write to its `stdout` pipe whose buffer is full and we aren't + // emptying because we are waiting on the child to exit. + let mut child_stdout = child.process.stdout.take().unwrap(); + let mut child_results = vec![]; + child_stdout + .read_to_end(&mut child_results) + .context("failed to read benchmark subprocess's results on stdout")?; + + // Finally, wait on the child to exit. + let status = child + .process + .wait() + .context("failed to `wait` on a benchmarking child process")?; anyhow::ensure!( - output.status.success(), - "benchmark subprocess did not exit successfully" + status.success(), + "benchmarking child process did not exit successfully" ); - // Parse the subprocess's output and add its measurements to our - // accumulation. + // Parse the benchmarking child's stdout and add its measurements to + // our accumulation. measurements.extend( - serde_json::from_slice::>>(&output.stdout) - .context("failed to read benchmark subprocess's results")?, + serde_json::from_slice::>>(&child_results) + .context("failed to parse benchmark subprocess's results")?, ); - *procs_left -= 1; - if *procs_left == 0 { - choices.swap_remove(index); - } + // We are all done with this benchmarking child process! Remove it + // from our worklist. + choices.swap_remove(index); } self.write_results(&measurements, &mut output_file)?; @@ -394,6 +436,99 @@ impl BenchmarkCommand { } } +/// A benchmarking child process. +struct Child { + /// The child process itself. + process: std::process::Child, + /// How many iterations it still has left. + iterations: usize, +} + +impl Child { + fn new( + benchmark: &BenchmarkCommand, + this_exe: &Path, + engine: &Path, + wasm: &Path, + ) -> Result { + let mut command = Command::new(&this_exe); + command + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .env("__SIGHTGLASS_CHILD", "1") + .arg("benchmark") + .arg("--processes") + .arg("1") + .arg("--iterations-per-process") + .arg(benchmark.iterations_per_process.to_string()) + .arg("--engine") + .arg(engine) + .arg("--measure") + .arg(benchmark.measure.to_string()) + .arg("--raw") + .arg("--output-format") + .arg(Format::Json.to_string()); + + if benchmark.small_workloads { + command.env("WASM_BENCH_USE_SMALL_WORKLOAD", "1"); + } + + if let Some(phase) = benchmark.stop_after_phase { + command.arg("--stop-after").arg(phase.to_string()); + } + + command.arg("--").arg(&wasm); + + let process = command + .spawn() + .context("failed to spawn benchmarking child process")?; + + let mut child = Child { + process, + iterations: benchmark.iterations_per_process, + }; + + // Wait for the child process to report itself as ready and initialized. + child.wait_for_child()?; + + Ok(child) + } + + fn notify_child(&mut self) -> Result<()> { + let child_stdin = self.process.stdin.as_mut().unwrap(); + child_stdin + .write_all(&[b'\n']) + .context("failed to write to benchmarking child process's stdin")?; + child_stdin + .flush() + .context("failed to flush benchmarking child process's stdin")?; + Ok(()) + } + + fn wait_for_child(&mut self) -> Result<()> { + let child_stdout = self.process.stdout.as_mut().unwrap(); + let mut buf = [0; 1]; + child_stdout + .read_exact(&mut buf) + .context("failed to read a byte from a benchmarking child process's stdout")?; + Ok(()) + } + + fn run_one_iteration(&mut self) -> Result<()> { + assert!(self.iterations > 0); + + log::debug!("Notifying child to run one iteration"); + self.notify_child()?; + + log::debug!("Waiting for child to finish one iteration"); + self.wait_for_child()?; + + self.iterations -= 1; + Ok(()) + } +} + fn this_arch() -> &'static str { if cfg!(target_arch = "x86_64") { "x86_64" diff --git a/crates/cli/tests/tests.rs b/crates/cli/tests/tests.rs index 1f0e6c9c..0d40d7fa 100644 --- a/crates/cli/tests/tests.rs +++ b/crates/cli/tests/tests.rs @@ -4,6 +4,31 @@ use sightglass_data::Measurement; use std::path::PathBuf; use std::process::Command; +trait AssertExt { + fn log_output(self) -> Self; +} + +impl AssertExt for assert_cmd::assert::Assert { + fn log_output(self) -> Self { + let output = self.get_output(); + eprintln!( + "\ + === stdout =====================================================================\n\ + {}\n\ + ================================================================================\n", + String::from_utf8_lossy(&output.stdout).trim_end() + ); + eprintln!( + "\ + === stderr =====================================================================\n\ + {}\n\ + ================================================================================\n", + String::from_utf8_lossy(&output.stderr).trim_end() + ); + self + } +} + /// Get a `Command` for this crate's `sightglass-cli` executable. fn sightglass_cli() -> Command { drop(env_logger::try_init()); @@ -68,7 +93,7 @@ fn benchmark(benchmark_name: &str) -> String { #[test] fn help() { - sightglass_cli().arg("help").assert().success(); + sightglass_cli().arg("help").assert().log_output().success(); } #[test] @@ -83,6 +108,7 @@ fn benchmark_stop_after_compilation() { .arg("compilation") .arg(benchmark("noop")) .assert() + .log_output() .success() .stdout( predicate::str::contains("Compilation") @@ -103,6 +129,7 @@ fn benchmark_stop_after_instantiation() { .arg("instantiation") .arg(benchmark("noop")) .assert() + .log_output() .success() .stdout( predicate::str::contains("Compilation") @@ -123,10 +150,10 @@ fn benchmark_json() { .arg("json") .arg("--") .arg(benchmark("noop")) - .assert(); + .assert() + .log_output(); let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); - eprintln!("=== stdout ===\n{}\n===========", stdout); assert!(serde_json::from_str::(stdout).is_ok()); assert @@ -151,10 +178,10 @@ fn benchmark_csv() { .arg("csv") .arg("--") .arg(benchmark("noop")) - .assert(); + .assert() + .log_output(); let stdout = std::str::from_utf8(&assert.get_output().stdout).unwrap(); - eprintln!("=== stdout ===\n{}\n===========", stdout); let mut reader = csv::Reader::from_reader(stdout.as_bytes()); for measurement in reader.deserialize::>() { drop(measurement.unwrap()); @@ -181,6 +208,7 @@ fn benchmark_summary() { .arg("--") .arg(benchmark("noop")) .assert() + .log_output() .success() .stdout( predicate::str::contains("compilation") @@ -215,6 +243,7 @@ fn benchmark_effect_size() -> anyhow::Result<()> { .arg("3") .arg(benchmark("noop")) .assert() + .log_output() .success() .stdout( predicate::str::contains(&format!("compilation :: cycles :: {}", benchmark("noop"))) diff --git a/crates/data/Cargo.toml b/crates/data/Cargo.toml index 77299a3c..3fa5d422 100644 --- a/crates/data/Cargo.toml +++ b/crates/data/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Sightglass Project Developers"] edition = "2018" [dependencies] -anyhow = "" +anyhow = "1.0.40" csv = "1.1.5" serde = { version = "1.0.118", features = ["derive", "rc"] } serde_json = "1.0.60" diff --git a/crates/data/src/format.rs b/crates/data/src/format.rs index 35fd9e89..7849d298 100644 --- a/crates/data/src/format.rs +++ b/crates/data/src/format.rs @@ -10,7 +10,7 @@ use std::{ }; /// Describes the input/output formats for the data structures in the `sightglass-data` crate. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Format { /// The JSON format. Json,