From e3226cb4792674b8e5a54a45dae6b74a970fc010 Mon Sep 17 00:00:00 2001 From: Jonathan Rainer Date: Mon, 20 Jan 2025 17:01:03 +0000 Subject: [PATCH] ROVER-244 Remove dead code and tidy up (#2352) As per title, now we're coming to the end of the refactor we can remove the code we're no longer using and tidy up our dependencies etc. --- .cargo/config.toml | 3 +- .github/workflows/run-smokes-manual.yml | 4 - .github/workflows/smoke-test.yml | 7 +- Cargo.lock | 57 -- Cargo.toml | 9 - crates/rover-std/src/fs.rs | 2 +- src/command/dev/{next/mod.rs => do_dev.rs} | 57 +- src/command/dev/legacy/compose.rs | 158 ----- src/command/dev/legacy/do_dev.rs | 170 ----- src/command/dev/legacy/introspect.rs | 162 ----- src/command/dev/legacy/mod.rs | 26 - src/command/dev/legacy/netstat.rs | 62 -- .../dev/legacy/protocol/follower/message.rs | 211 ------ .../dev/legacy/protocol/follower/messenger.rs | 231 ------- .../dev/legacy/protocol/follower/mod.rs | 21 - src/command/dev/legacy/protocol/leader.rs | 654 ------------------ src/command/dev/legacy/protocol/mod.rs | 15 - src/command/dev/legacy/protocol/socket.rs | 64 -- src/command/dev/legacy/protocol/types.rs | 41 -- src/command/dev/legacy/router/command.rs | 153 ---- src/command/dev/legacy/router/config.rs | 313 --------- src/command/dev/legacy/router/mod.rs | 7 - src/command/dev/legacy/router/runner.rs | 360 ---------- src/command/dev/legacy/schema.rs | 198 ------ src/command/dev/legacy/watcher.rs | 344 --------- src/command/dev/mod.rs | 19 +- src/command/dev/{legacy => }/no_dev.rs | 0 src/command/dev/{next => }/router/binary.rs | 0 .../dev/{next => }/router/config/mod.rs | 8 +- .../dev/{next => }/router/config/parser.rs | 9 +- .../dev/{next => }/router/config/remote.rs | 0 .../dev/{next => }/router/config/state.rs | 0 .../dev/{next => }/router/hot_reload.rs | 11 +- src/command/dev/{next => }/router/install.rs | 18 +- src/command/dev/{next => }/router/mod.rs | 0 src/command/dev/{next => }/router/run.rs | 12 +- .../dev/{next => }/router/watchers/file.rs | 7 +- .../dev/{next => }/router/watchers/mod.rs | 0 .../router/watchers/router_config.rs | 6 +- src/command/graph/mod.rs | 5 +- src/command/subgraph/mod.rs | 6 +- src/command/supergraph/compose/do_compose.rs | 345 +-------- src/options/compose.rs | 15 +- src/options/subgraph.rs | 113 --- src/utils/client.rs | 10 +- 45 files changed, 89 insertions(+), 3824 deletions(-) rename src/command/dev/{next/mod.rs => do_dev.rs} (90%) delete mode 100644 src/command/dev/legacy/compose.rs delete mode 100644 src/command/dev/legacy/do_dev.rs delete mode 100644 src/command/dev/legacy/introspect.rs delete mode 100644 src/command/dev/legacy/mod.rs delete mode 100644 src/command/dev/legacy/netstat.rs delete mode 100644 src/command/dev/legacy/protocol/follower/message.rs delete mode 100644 src/command/dev/legacy/protocol/follower/messenger.rs delete mode 100644 src/command/dev/legacy/protocol/follower/mod.rs delete mode 100644 src/command/dev/legacy/protocol/leader.rs delete mode 100644 src/command/dev/legacy/protocol/mod.rs delete mode 100644 src/command/dev/legacy/protocol/socket.rs delete mode 100644 src/command/dev/legacy/protocol/types.rs delete mode 100644 src/command/dev/legacy/router/command.rs delete mode 100644 src/command/dev/legacy/router/config.rs delete mode 100644 src/command/dev/legacy/router/mod.rs delete mode 100644 src/command/dev/legacy/router/runner.rs delete mode 100644 src/command/dev/legacy/schema.rs delete mode 100644 src/command/dev/legacy/watcher.rs rename src/command/dev/{legacy => }/no_dev.rs (100%) rename src/command/dev/{next => }/router/binary.rs (100%) rename src/command/dev/{next => }/router/config/mod.rs (96%) rename src/command/dev/{next => }/router/config/parser.rs (97%) rename src/command/dev/{next => }/router/config/remote.rs (100%) rename src/command/dev/{next => }/router/config/state.rs (100%) rename src/command/dev/{next => }/router/hot_reload.rs (99%) rename src/command/dev/{next => }/router/install.rs (95%) rename src/command/dev/{next => }/router/mod.rs (100%) rename src/command/dev/{next => }/router/run.rs (98%) rename src/command/dev/{next => }/router/watchers/file.rs (99%) rename src/command/dev/{next => }/router/watchers/mod.rs (100%) rename src/command/dev/{next => }/router/watchers/router_config.rs (87%) diff --git a/.cargo/config.toml b/.cargo/config.toml index d85a669f4..0d1ae6d5d 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -2,5 +2,4 @@ xtask = "run --package xtask --" rover = "run --package rover --" install-rover = "run --release --package rover -- install --force" -test-next = "test --all-features -- --nocapture" -build-next = "build --features composition-js,dev-next" \ No newline at end of file +test-next = "test --all-features -- --nocapture" \ No newline at end of file diff --git a/.github/workflows/run-smokes-manual.yml b/.github/workflows/run-smokes-manual.yml index bd2599da3..ff9e2d34e 100644 --- a/.github/workflows/run-smokes-manual.yml +++ b/.github/workflows/run-smokes-manual.yml @@ -9,9 +9,6 @@ on: description: 'JSON list of router versions' required: true type: string - dev-next: - description: 'Compile binaries with the dev-next flag' - type: boolean name: "Run Smoke Tests (Manually)" jobs: @@ -20,5 +17,4 @@ jobs: with: composition-versions: ${{ inputs.composition-versions }} router-versions: ${{ inputs.router-versions }} - dev-next: ${{ inputs.dev-next }} secrets: inherit diff --git a/.github/workflows/smoke-test.yml b/.github/workflows/smoke-test.yml index 7cda0c0fb..5745de34e 100644 --- a/.github/workflows/smoke-test.yml +++ b/.github/workflows/smoke-test.yml @@ -9,9 +9,6 @@ on: description: 'JSON list of router versions' required: true type: string - dev-next: - description: 'Compile with the dev-next flag' - type: boolean #TODO: When GitHub Actions supports ARM based Linux images for public repos: https://github.blog/changelog/2024-06-03-actions-arm-based-linux-and-windows-runners-are-now-in-public-beta/ this will need to be added name: Smoke Tests @@ -47,12 +44,12 @@ jobs: yum -y install perl-core gcc openssl-devel openssl git curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y . "$HOME/.cargo/env" - cargo build --features ${{ inputs.dev-next && 'dev-next' || 'default' }} --target ${{ matrix.compile_target.target }} --test e2e + cargo build --target ${{ matrix.compile_target.target }} --test e2e - if: ${{ matrix.compile_target.container == '' }} name: "Build binaries on host" run: | rustup target add ${{ matrix.compile_target.target }} - cargo build --features ${{ inputs.dev-next && 'dev-next' || 'default' }} --target ${{ matrix.compile_target.target }} --test e2e + cargo build --target ${{ matrix.compile_target.target }} --test e2e - uses: actions/upload-artifact@v4 name: "Store built binaries to use later on" with: diff --git a/Cargo.lock b/Cargo.lock index a0a6e8cba..a42e77678 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1103,12 +1103,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" - [[package]] name = "check-if-email-exists" version = "0.9.1" @@ -1549,16 +1543,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "ctrlc" -version = "3.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3" -dependencies = [ - "nix", - "windows-sys 0.59.0", -] - [[package]] name = "cynic-parser" version = "0.4.5" @@ -1883,12 +1867,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" -[[package]] -name = "doctest-file" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562" - [[package]] name = "downcast" version = "0.11.0" @@ -3173,19 +3151,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "interprocess" -version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2f4e4a06d42fab3e85ab1b419ad32b09eab58b901d40c57935ff92db3287a13" -dependencies = [ - "doctest-file", - "libc", - "recvmsg", - "widestring", - "windows-sys 0.52.0", -] - [[package]] name = "ip_network" version = "0.4.1" @@ -3888,18 +3853,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" -[[package]] -name = "nix" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" -dependencies = [ - "bitflags 2.7.0", - "cfg-if", - "cfg_aliases", - "libc", -] - [[package]] name = "nom" version = "7.1.3" @@ -4777,12 +4730,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "recvmsg" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175" - [[package]] name = "redox_syscall" version = "0.5.3" @@ -5021,8 +4968,6 @@ dependencies = [ "chrono", "clap", "console", - "crossbeam-channel", - "ctrlc", "derive-getters", "dialoguer", "dircpy", @@ -5037,7 +4982,6 @@ dependencies = [ "http 1.1.0", "httpmock", "indoc", - "interprocess", "itertools 0.13.0", "lazy_static", "lazycell", @@ -5088,7 +5032,6 @@ dependencies = [ "tracing-test", "url", "uuid", - "which 7.0.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 607ec042a..2fdbe2a44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,8 +45,6 @@ default = ["composition-js"] # notably, it is disabled for x86_64-unknown-linux-musl builds # because of this GitHub issue: https://github.com/denoland/deno/issues/3711 composition-js = [] -dev-next = ["composition-rewrite"] -composition-rewrite = [] ### cross-workspace dependencies # these dependencies can be used by any other workspace crate by specifying the dependency like so: @@ -95,8 +93,6 @@ clap = "4" chrono = "0.4" ci_info = "0.14" console = "0.15" -crossbeam-channel = "0.5" -ctrlc = "3" derive-getters = "0.5.0" dialoguer = "0.11" directories-next = "2.0" @@ -112,7 +108,6 @@ http-body = "1.0.1" http-body-util = "0.1.2" httpmock = "0.7" hyper = "1.0" -interprocess = { version = "2", default-features = false } indoc = "2" itertools = "0.13.0" lazycell = "1" @@ -181,8 +176,6 @@ camino = { workspace = true } clap = { workspace = true, features = ["color", "derive", "env"] } chrono = { workspace = true } console = { workspace = true } -crossbeam-channel = { workspace = true } -ctrlc = { workspace = true } derive-getters = { workspace = true } dialoguer = { workspace = true } flate2 = { workspace = true } @@ -191,7 +184,6 @@ graphql_client = { workspace = true } heck = { workspace = true } http = { workspace = true } houston = { workspace = true } -interprocess = { workspace = true } itertools = { workspace = true } prettytable-rs = { workspace = true } lazycell = { workspace = true } @@ -228,7 +220,6 @@ toml = { workspace = true } tower = { workspace = true } tower-lsp = { version = "0.20.0" } tracing = { workspace = true } -which = { workspace = true } uuid = { workspace = true } url = { workspace = true, features = ["serde"] } diff --git a/crates/rover-std/src/fs.rs b/crates/rover-std/src/fs.rs index a4e9df971..9e6fd848b 100644 --- a/crates/rover-std/src/fs.rs +++ b/crates/rover-std/src/fs.rs @@ -322,7 +322,7 @@ impl Fs { })); return; } - Err(err) => { + Err(_err) => { let _ = tx.send(Err(RoverStdError::FileRemoved { file: path.display().to_string(), })); diff --git a/src/command/dev/next/mod.rs b/src/command/dev/do_dev.rs similarity index 90% rename from src/command/dev/next/mod.rs rename to src/command/dev/do_dev.rs index 5bce5331f..73e2e9701 100644 --- a/src/command/dev/next/mod.rs +++ b/src/command/dev/do_dev.rs @@ -1,52 +1,33 @@ -#![warn(missing_docs)] - -use std::{io::stdin, str::FromStr}; +use std::io::stdin; +use std::str::FromStr; use apollo_federation_types::config::{FederationVersion, RouterVersion}; use camino::Utf8PathBuf; use futures::StreamExt; use houston::{Config, Profile}; -use router::{ - hot_reload::HotReloadConfigOverrides, install::InstallRouter, run::RunRouter, - watchers::file::FileWatcher, -}; use rover_client::operations::config::who_am_i::WhoAmI; use rover_std::{errln, infoln, warnln}; use semver::Version; use tower::ServiceExt; -use self::router::config::{RouterAddress, RunRouterConfig}; +use crate::command::dev::router::config::RouterAddress; +use crate::command::dev::router::hot_reload::HotReloadConfigOverrides; +use crate::command::dev::router::run::RunRouter; +use crate::command::dev::{OVERRIDE_DEV_COMPOSITION_VERSION, OVERRIDE_DEV_ROUTER_VERSION}; +use crate::command::Dev; +use crate::composition::pipeline::CompositionPipeline; use crate::composition::supergraph::binary::OutputTarget; +use crate::composition::supergraph::config::full::introspect::MakeResolveIntrospectSubgraph; +use crate::composition::supergraph::config::resolver::fetch_remote_subgraph::MakeFetchRemoteSubgraph; +use crate::composition::supergraph::config::resolver::fetch_remote_subgraphs::MakeFetchRemoteSubgraphs; use crate::composition::supergraph::config::resolver::SubgraphPrompt; use crate::composition::FederationUpdaterConfig; -use crate::{ - command::{ - dev::{OVERRIDE_DEV_COMPOSITION_VERSION, OVERRIDE_DEV_ROUTER_VERSION}, - Dev, - }, - composition::{ - pipeline::CompositionPipeline, - supergraph::config::{ - full::introspect::MakeResolveIntrospectSubgraph, - resolver::{ - fetch_remote_subgraph::MakeFetchRemoteSubgraph, - fetch_remote_subgraphs::MakeFetchRemoteSubgraphs, - }, - }, - }, - utils::{ - client::StudioClientConfig, - effect::{ - exec::{TokioCommand, TokioSpawn}, - read_file::FsReadFile, - write_file::FsWriteFile, - }, - env::RoverEnvKey, - }, - RoverOutput, RoverResult, -}; - -mod router; +use crate::utils::client::StudioClientConfig; +use crate::utils::effect::exec::{TokioCommand, TokioSpawn}; +use crate::utils::effect::read_file::FsReadFile; +use crate::utils::effect::write_file::FsWriteFile; +use crate::utils::env::RoverEnvKey; +use crate::{RoverOutput, RoverResult}; impl Dev { /// Runs rover dev @@ -76,7 +57,6 @@ impl Dev { let service = client_config .get_authenticated_client(profile)? .studio_graphql_service()?; - let who_am_i_service = WhoAmI::new(service); let fetch_remote_subgraphs_factory = MakeFetchRemoteSubgraphs::builder() @@ -208,7 +188,7 @@ impl Dev { ); let run_router = RunRouter::default() - .install::( + .install( router_version, client_config.clone(), override_install_path, @@ -224,7 +204,6 @@ impl Dev { Some(credential.clone()), ) .await; - // This RouterAddress has some logic figuring out _which_ of the potentially multiple // address options we should use (eg, CLI, config, env var, or default). It will be used in // the overrides for the temporary config we set for hot-reloading the router, but also as diff --git a/src/command/dev/legacy/compose.rs b/src/command/dev/legacy/compose.rs deleted file mode 100644 index 7b682c299..000000000 --- a/src/command/dev/legacy/compose.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::fs; -use std::io::prelude::*; - -use anyhow::{Context, Error}; -use apollo_federation_types::config::{FederationVersion, SupergraphConfig}; -use camino::Utf8PathBuf; -use rover_std::{errln, Fs}; - -use crate::command::dev::legacy::do_dev::log_err_and_continue; -use crate::command::supergraph::compose::{Compose, CompositionOutput}; -use crate::options::PluginOpts; -use crate::utils::client::StudioClientConfig; -use crate::{RoverError, RoverResult}; - -#[derive(Debug)] -pub struct ComposeRunner { - compose: Compose, - override_install_path: Option, - client_config: StudioClientConfig, - write_path: Utf8PathBuf, - composition_state: Option>, - plugin_exe: Option, -} - -impl ComposeRunner { - pub fn new( - compose_opts: PluginOpts, - override_install_path: Option, - client_config: StudioClientConfig, - write_path: Utf8PathBuf, - ) -> Self { - Self { - compose: Compose::new(compose_opts), - override_install_path, - client_config, - write_path, - composition_state: None, - plugin_exe: None, - } - } - - pub async fn maybe_install_supergraph( - &mut self, - federation_version: FederationVersion, - ) -> RoverResult { - if let Some(plugin_exe) = &self.plugin_exe { - Ok(plugin_exe.clone()) - } else { - let plugin_exe = self - .compose - .maybe_install_supergraph( - self.override_install_path.clone(), - self.client_config.clone(), - federation_version, - ) - .await?; - self.plugin_exe = Some(plugin_exe.clone()); - Ok(plugin_exe) - } - } - - pub async fn run( - &mut self, - supergraph_config: &mut SupergraphConfig, - ) -> std::result::Result, String> { - let prev_state = self.composition_state(); - self.composition_state = Some( - self.compose - .exec( - self.override_install_path.clone(), - self.client_config.clone(), - supergraph_config, - None, - ) - .await, - ); - let new_state = self.composition_state(); - - match (prev_state, new_state) { - // wasn't composed, now composed - (None, Some(Ok(new_success))) | (Some(Err(_)), Some(Ok(new_success))) => { - let _ = self - .update_supergraph_schema(&new_success.supergraph_sdl) - .map_err(log_err_and_continue); - Ok(Some(new_success)) - } - // had a composition error, now a new composition error - (Some(Err(prev_err)), Some(Err(new_err))) => { - if prev_err != new_err { - let _ = self.remove_supergraph_schema(); - } - Err(new_err) - } - // had a successful composition, now a new successful composition - (Some(Ok(prev_success)), Some(Ok(new_success))) => { - if prev_success != new_success { - let _ = self.update_supergraph_schema(&new_success.supergraph_sdl); - Ok(Some(new_success)) - } else { - Ok(None) - } - } - // not composed (this should be unreachable in practice) - (_, None) => { - let _ = self.remove_supergraph_schema(); - Ok(None) - } - // now has an error - (_, Some(Err(new_err))) => { - let _ = self.remove_supergraph_schema(); - Err(new_err) - } - } - } - - fn remove_supergraph_schema(&self) -> RoverResult<()> { - if Fs::assert_path_exists(&self.write_path).is_ok() { - errln!("composition failed, killing the router"); - Ok(fs::remove_file(&self.write_path) - .with_context(|| format!("could not remove {}", &self.write_path))?) - } else { - Ok(()) - } - } - - fn update_supergraph_schema(&self, sdl: &str) -> RoverResult<()> { - tracing::info!("composition succeeded, updating the supergraph schema..."); - let context = format!("could not write SDL to {}", &self.write_path); - match std::fs::File::create(&self.write_path) { - Ok(mut opened_file) => { - if let Err(e) = opened_file.write_all(sdl.as_bytes()) { - Err(RoverError::new( - Error::new(e) - .context("could not write bytes") - .context(context), - )) - } else if let Err(e) = opened_file.flush() { - Err(RoverError::new( - Error::new(e) - .context("could not flush file") - .context(context), - )) - } else { - tracing::info!("wrote updated supergraph schema to {}", &self.write_path); - Ok(()) - } - } - Err(e) => Err(RoverError::new(Error::new(e).context(context))), - } - } - - pub fn composition_state(&self) -> Option> { - self.composition_state.as_ref().map(|s| match s { - Ok(comp) => Ok(comp.clone()), - Err(err) => Err(err.to_string()), - }) - } -} diff --git a/src/command/dev/legacy/do_dev.rs b/src/command/dev/legacy/do_dev.rs deleted file mode 100644 index 1bc0460c4..000000000 --- a/src/command/dev/legacy/do_dev.rs +++ /dev/null @@ -1,170 +0,0 @@ -use anyhow::{anyhow, Context}; -use camino::Utf8PathBuf; -use futures::channel::mpsc::channel; -use futures::future::join_all; -use futures::stream::StreamExt; -use futures::FutureExt; -use rover_std::warnln; - -use crate::command::dev::{legacy::protocol::FollowerMessage, Dev}; -use crate::utils::client::StudioClientConfig; -use crate::utils::supergraph_config::get_supergraph_config; -use crate::{RoverError, RoverOutput, RoverResult}; - -use super::protocol::{FollowerChannel, FollowerMessenger, LeaderChannel, LeaderSession}; -use super::router::RouterConfigHandler; - -pub fn log_err_and_continue(err: RoverError) -> RoverError { - let _ = err.print(); - err -} - -impl Dev { - pub async fn run( - &self, - override_install_path: Option, - client_config: StudioClientConfig, - ) -> RoverResult { - self.opts - .plugin_opts - .prompt_for_license_accept(&client_config)?; - - let router_config_handler = RouterConfigHandler::try_from(&self.opts.supergraph_opts)?; - let router_address = router_config_handler.get_router_address(); - let raw_socket_name = router_config_handler.get_raw_socket_name(); - let leader_channel = LeaderChannel::new(); - let follower_channel = FollowerChannel::new(); - - let supergraph_config = get_supergraph_config( - &self.opts.supergraph_opts.graph_ref, - &self.opts.supergraph_opts.supergraph_config_path, - self.opts.supergraph_opts.federation_version.as_ref(), - client_config.clone(), - &self.opts.plugin_opts.profile, - false, - ) - .await?; - - if let Some(mut leader_session) = LeaderSession::new( - override_install_path, - &client_config, - leader_channel.clone(), - follower_channel.clone(), - self.opts.plugin_opts.clone(), - &supergraph_config, - router_config_handler, - self.opts.supergraph_opts.license.clone(), - ) - .await? - { - warnln!( - "Do not run this command in production! It is intended for local development only." - ); - let (ready_sender, mut ready_receiver) = channel(1); - let follower_messenger = FollowerMessenger::from_main_session( - follower_channel.clone().sender, - leader_channel.receiver, - ); - - tokio::task::spawn_blocking(move || { - ctrlc::set_handler(move || { - eprintln!( - "\nshutting down the `rover dev` session and all attached processes..." - ); - let _ = follower_channel - .sender - .send(FollowerMessage::shutdown(true)) - .map_err(|e| { - let e = - RoverError::new(anyhow!("could not shut down router").context(e)); - log_err_and_continue(e) - }); - }) - .context("could not set ctrl-c handler for main `rover dev` process") - .unwrap(); - }); - - let subgraph_watcher_handle = tokio::task::spawn(async move { - let _ = leader_session - .listen_for_all_subgraph_updates(ready_sender) - .await - .map_err(log_err_and_continue); - }); - - ready_receiver.next().await.unwrap(); - - let subgraph_watchers = self - .opts - .supergraph_opts - .get_subgraph_watchers( - &client_config, - supergraph_config, - follower_messenger.clone(), - self.opts.subgraph_opts.subgraph_polling_interval, - &self.opts.plugin_opts.profile, - self.opts.subgraph_opts.subgraph_retries, - ) - .await - .transpose() - .unwrap_or_else(|| { - self.opts - .subgraph_opts - .get_subgraph_watcher( - router_address, - &client_config, - follower_messenger.clone(), - ) - .map(|watcher| vec![watcher]) - })?; - - let futs = subgraph_watchers.into_iter().map(|mut watcher| { - let client_config = client_config.clone(); - async move { - let _ = watcher - .watch_subgraph_for_changes(client_config.clone().retry_period()) - .await - .map_err(log_err_and_continue); - } - }); - tokio::join!(join_all(futs), subgraph_watcher_handle.map(|_| ())); - } else { - let follower_messenger = FollowerMessenger::from_attached_session(&raw_socket_name); - let mut subgraph_refresher = self.opts.subgraph_opts.get_subgraph_watcher( - router_address, - &client_config, - follower_messenger.clone(), - )?; - tracing::info!( - "connecting to existing `rover dev` process by communicating via the interprocess socket located at {raw_socket_name}", - ); - - // start the interprocess socket health check in the background - let health_messenger = follower_messenger.clone(); - tokio::task::spawn_blocking(move || { - let _ = health_messenger.health_check().map_err(|_| { - eprintln!("shutting down..."); - std::process::exit(1); - }); - }); - - // set up the ctrl+c handler to notify the main session to remove the killed subgraph - let kill_name = subgraph_refresher.get_name(); - ctrlc::set_handler(move || { - eprintln!("\nshutting down..."); - let _ = follower_messenger - .remove_subgraph(&kill_name) - .map_err(log_err_and_continue); - std::process::exit(1); - }) - .context("could not set ctrl-c handler")?; - - // watch for subgraph changes on the main thread - // it will take care of updating the main `rover dev` session - subgraph_refresher - .watch_subgraph_for_changes(client_config.retry_period()) - .await?; - } - - unreachable!("watch_subgraph_for_changes never returns") - } -} diff --git a/src/command/dev/legacy/introspect.rs b/src/command/dev/legacy/introspect.rs deleted file mode 100644 index 561e5ec74..000000000 --- a/src/command/dev/legacy/introspect.rs +++ /dev/null @@ -1,162 +0,0 @@ -use std::time::Duration; - -use anyhow::anyhow; -use reqwest::Client; -use rover_std::Style; - -use crate::command::dev::legacy::protocol::{SubgraphSdl, SubgraphUrl}; -use crate::command::graph::Introspect as GraphIntrospect; -use crate::command::subgraph::Introspect as SubgraphIntrospect; -use crate::options::IntrospectOpts; -use crate::{RoverError, RoverErrorSuggestion, RoverResult}; - -#[derive(Clone, Debug)] -pub struct UnknownIntrospectRunner { - endpoint: SubgraphUrl, - client: Client, - headers: Option>, -} - -impl UnknownIntrospectRunner { - pub fn new( - endpoint: SubgraphUrl, - client: Client, - headers: Option>, - ) -> Self { - Self { - endpoint, - client, - headers, - } - } - - pub async fn run( - &self, - retry_period: Duration, - ) -> RoverResult<(SubgraphSdl, IntrospectRunnerKind)> { - let subgraph_runner = SubgraphIntrospectRunner { - endpoint: self.endpoint.clone(), - client: self.client.clone(), - headers: self.headers.clone(), - retry_period, - }; - - let graph_runner = GraphIntrospectRunner { - endpoint: self.endpoint.clone(), - client: self.client.clone(), - headers: self.headers.clone(), - retry_period, - }; - - // we _could_ run these in parallel - // but we could run into race conditions where - // the regular introspection query runs a bit after - // the federated introspection query - // in which case we may incorrectly assume - // they do not support federated introspection - // so, run the graph query first and _then_ the subgraph query - let graph_result = graph_runner.run().await; - let subgraph_result = subgraph_runner.run().await; - - match (subgraph_result, graph_result) { - (Ok(s), _) => { - tracing::info!("fetching federated SDL succeeded"); - Ok((s, IntrospectRunnerKind::Subgraph(subgraph_runner))) - } - (Err(_), Ok(s)) => { - let warn_prefix = Style::WarningPrefix.paint("WARN:"); - eprintln!("{} could not fetch federated SDL, using introspection schema without directives. you should convert this monograph to a federated subgraph. see https://www.apollographql.com/docs/federation/subgraphs/ for more information.", warn_prefix); - Ok((s, IntrospectRunnerKind::Graph(graph_runner))) - } - (Err(se), Err(ge)) => { - let message = anyhow!( - "could not run `rover graph introspect {0}` or `rover subgraph introspect {0}`", - &self.endpoint - ); - let mut err = RoverError::new(message); - let (ge, se) = (ge.to_string(), se.to_string()); - if ge == se { - err.set_suggestion(RoverErrorSuggestion::Adhoc(ge)) - } else { - err.set_suggestion(RoverErrorSuggestion::Adhoc(format!("`rover subgraph introspect {0}` failed with:\n{1}\n`rover graph introspect {0}` failed with:\n{2}", &self.endpoint, &se, &ge))); - }; - Err(err) - } - } - } -} - -#[derive(Debug, Clone)] -pub enum IntrospectRunnerKind { - Unknown(UnknownIntrospectRunner), - Subgraph(SubgraphIntrospectRunner), - Graph(GraphIntrospectRunner), -} - -impl IntrospectRunnerKind { - pub fn endpoint(&self) -> SubgraphUrl { - match &self { - Self::Unknown(u) => u.endpoint.clone(), - Self::Subgraph(s) => s.endpoint.clone(), - Self::Graph(g) => g.endpoint.clone(), - } - } -} - -#[derive(Debug, Clone)] -pub struct SubgraphIntrospectRunner { - endpoint: SubgraphUrl, - client: Client, - headers: Option>, - retry_period: Duration, -} - -impl SubgraphIntrospectRunner { - pub async fn run(&self) -> RoverResult { - tracing::debug!( - "running `rover subgraph introspect --endpoint {}`", - &self.endpoint - ); - SubgraphIntrospect { - opts: IntrospectOpts { - endpoint: self.endpoint.clone(), - headers: self.headers.clone(), - watch: false, - // TODO: remove after the composition rewrite; this is the de facto default of the - // polling interval option, here to make compilation work - polling_interval: Duration::from_secs(1), - }, - } - .exec(&self.client, true, self.retry_period) - .await - } -} - -#[derive(Debug, Clone)] -pub struct GraphIntrospectRunner { - endpoint: SubgraphUrl, - client: Client, - headers: Option>, - retry_period: Duration, -} - -impl GraphIntrospectRunner { - pub async fn run(&self) -> RoverResult { - tracing::debug!( - "running `rover graph introspect --endpoint {}`", - &self.endpoint - ); - GraphIntrospect { - opts: IntrospectOpts { - endpoint: self.endpoint.clone(), - headers: self.headers.clone(), - watch: false, - // TODO: remove after the composition rewrite; this is the de facto default of the - // polling interval option, here to make compilation work - polling_interval: Duration::from_secs(1), - }, - } - .exec(&self.client, true, self.retry_period) - .await - } -} diff --git a/src/command/dev/legacy/mod.rs b/src/command/dev/legacy/mod.rs deleted file mode 100644 index 234fc6ddf..000000000 --- a/src/command/dev/legacy/mod.rs +++ /dev/null @@ -1,26 +0,0 @@ -#[cfg(feature = "composition-js")] -mod compose; - -#[cfg(feature = "composition-js")] -mod do_dev; - -#[cfg(feature = "composition-js")] -mod introspect; - -#[cfg(feature = "composition-js")] -mod protocol; - -#[cfg(feature = "composition-js")] -mod router; - -#[cfg(feature = "composition-js")] -mod schema; - -#[cfg(feature = "composition-js")] -mod netstat; - -#[cfg(not(feature = "composition-js"))] -mod no_dev; - -#[cfg(feature = "composition-js")] -mod watcher; diff --git a/src/command/dev/legacy/netstat.rs b/src/command/dev/legacy/netstat.rs deleted file mode 100644 index a34c63ae1..000000000 --- a/src/command/dev/legacy/netstat.rs +++ /dev/null @@ -1,62 +0,0 @@ -use reqwest::Url; -use std::{ - collections::HashSet, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, -}; -use url::Host; - -use crate::command::dev::legacy::protocol::SubgraphUrl; - -pub fn normalize_loopback_urls(url: &SubgraphUrl) -> Vec { - let hosts = match url.host() { - Some(host) => match host { - Host::Ipv4(ip) => { - if &ip.to_string() == "::" { - vec![ - IpAddr::V4(ip).to_string(), - IpAddr::V4(Ipv4Addr::LOCALHOST).to_string(), - ] - } else { - vec![IpAddr::V4(ip).to_string()] - } - } - Host::Ipv6(ip) => { - if &ip.to_string() == "::" || &ip.to_string() == "::1" { - vec![ - IpAddr::V6(ip).to_string(), - IpAddr::V6(Ipv6Addr::LOCALHOST).to_string(), - ] - } else { - vec![IpAddr::V6(ip).to_string()] - } - } - Host::Domain(domain) => { - if domain == "localhost" { - vec![ - IpAddr::V4(Ipv4Addr::LOCALHOST).to_string(), - IpAddr::V6(Ipv6Addr::LOCALHOST).to_string(), - "[::]".to_string(), - "0.0.0.0".to_string(), - ] - } else { - vec![domain.to_string()] - } - } - }, - None => Vec::new(), - }; - if hosts.is_empty() { - vec![url.clone()] - } else { - Vec::from_iter( - hosts - .iter() - .map(|host| { - let mut url = url.clone(); - let _ = url.set_host(Some(host)); - url - }) - .collect::>(), - ) - } -} diff --git a/src/command/dev/legacy/protocol/follower/message.rs b/src/command/dev/legacy/protocol/follower/message.rs deleted file mode 100644 index 46c7a50dc..000000000 --- a/src/command/dev/legacy/protocol/follower/message.rs +++ /dev/null @@ -1,211 +0,0 @@ -use anyhow::anyhow; -use apollo_federation_types::javascript::SubgraphDefinition; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; - -use crate::command::dev::legacy::protocol::{entry_from_definition, SubgraphEntry, SubgraphName}; -use crate::{RoverError, RoverResult, PKG_VERSION}; - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct FollowerMessage { - kind: FollowerMessageKind, - is_from_main_session: bool, -} - -impl FollowerMessage { - pub fn get_version(is_from_main_session: bool) -> Self { - Self { - kind: FollowerMessageKind::get_version(), - is_from_main_session, - } - } - - pub fn get_subgraphs(is_from_main_session: bool) -> Self { - Self { - kind: FollowerMessageKind::get_subgraphs(), - is_from_main_session, - } - } - - pub fn health_check(is_from_main_session: bool) -> RoverResult { - if is_from_main_session { - Err(RoverError::new(anyhow!( - "You cannot send a health check from the main `rover dev` process" - ))) - } else { - Ok(Self { - kind: FollowerMessageKind::health_check(), - is_from_main_session, - }) - } - } - - pub fn add_subgraph( - is_from_main_session: bool, - subgraph: &SubgraphDefinition, - ) -> RoverResult { - Ok(Self { - kind: FollowerMessageKind::add_subgraph(subgraph)?, - is_from_main_session, - }) - } - - pub fn update_subgraph( - is_from_main_session: bool, - subgraph: &SubgraphDefinition, - ) -> RoverResult { - Ok(Self { - kind: FollowerMessageKind::update_subgraph(subgraph)?, - is_from_main_session, - }) - } - - pub fn remove_subgraph( - is_from_main_session: bool, - subgraph_name: &SubgraphName, - ) -> RoverResult { - Ok(Self { - kind: FollowerMessageKind::remove_subgraph(subgraph_name), - is_from_main_session, - }) - } - - pub fn shutdown(is_from_main_session: bool) -> Self { - Self { - kind: FollowerMessageKind::shutdown(), - is_from_main_session, - } - } - - pub fn is_from_main_session(&self) -> bool { - self.is_from_main_session - } - - pub fn kind(&self) -> &FollowerMessageKind { - &self.kind - } - - pub fn print(&self) { - if self.is_from_main_session() { - tracing::debug!("sending message to self: {:?}", &self); - } else { - tracing::debug!( - "sending message to the main `rover dev` process: {:?}", - &self - ); - } - match self.kind() { - FollowerMessageKind::AddSubgraph { subgraph_entry } => { - if self.is_from_main_session() { - eprintln!( - "starting a session with the '{}' subgraph", - &subgraph_entry.0 .0 - ); - } else { - eprintln!( - "adding the '{}' subgraph to the session", - &subgraph_entry.0 .0 - ); - } - } - FollowerMessageKind::UpdateSubgraph { subgraph_entry } => { - eprintln!( - "updating the schema for the '{}' subgraph in the session", - &subgraph_entry.0 .0 - ); - } - FollowerMessageKind::RemoveSubgraph { subgraph_name } => { - if self.is_from_main_session() { - eprintln!( - "removing the '{}' subgraph from this session", - &subgraph_name - ); - } else { - tracing::debug!( - "removing the '{}' subgraph from the session", - &subgraph_name - ); - } - } - FollowerMessageKind::Shutdown => { - tracing::debug!("shutting down the router for this session"); - } - FollowerMessageKind::HealthCheck => { - tracing::debug!("sending health check ping to the main process"); - } - FollowerMessageKind::GetVersion { - follower_version: _, - } => { - tracing::debug!("requesting the version of the main process"); - } - FollowerMessageKind::GetSubgraphs => { - tracing::debug!("asking the main process about existing subgraphs"); - } - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum FollowerMessageKind { - GetVersion { follower_version: String }, - GetSubgraphs, - HealthCheck, - Shutdown, - AddSubgraph { subgraph_entry: SubgraphEntry }, - UpdateSubgraph { subgraph_entry: SubgraphEntry }, - RemoveSubgraph { subgraph_name: SubgraphName }, -} - -impl FollowerMessageKind { - fn get_version() -> Self { - Self::GetVersion { - follower_version: PKG_VERSION.to_string(), - } - } - - fn get_subgraphs() -> Self { - Self::GetSubgraphs - } - - fn health_check() -> Self { - Self::HealthCheck - } - - fn shutdown() -> Self { - Self::Shutdown - } - - fn add_subgraph(subgraph: &SubgraphDefinition) -> RoverResult { - Ok(Self::AddSubgraph { - subgraph_entry: entry_from_definition(subgraph)?, - }) - } - - fn update_subgraph(subgraph: &SubgraphDefinition) -> RoverResult { - Ok(Self::UpdateSubgraph { - subgraph_entry: entry_from_definition(subgraph)?, - }) - } - - fn remove_subgraph(subgraph_name: &SubgraphName) -> Self { - Self::RemoveSubgraph { - subgraph_name: subgraph_name.to_string(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn follower_message_can_request_version() { - let message = FollowerMessageKind::get_version(); - let expected_message_json = serde_json::to_string(&message).unwrap(); - assert_eq!( - expected_message_json, - serde_json::json!({"GetVersion": {"follower_version": PKG_VERSION.to_string()}}) - .to_string() - ) - } -} diff --git a/src/command/dev/legacy/protocol/follower/messenger.rs b/src/command/dev/legacy/protocol/follower/messenger.rs deleted file mode 100644 index 2cc8eb917..000000000 --- a/src/command/dev/legacy/protocol/follower/messenger.rs +++ /dev/null @@ -1,231 +0,0 @@ -use std::{fmt::Debug, io::BufReader, time::Duration}; - -use anyhow::anyhow; -use apollo_federation_types::javascript::SubgraphDefinition; -use crossbeam_channel::{Receiver, Sender}; -use interprocess::local_socket::traits::Stream; - -use crate::command::dev::legacy::protocol::{ - create_socket_name, socket_read, socket_write, FollowerMessage, LeaderMessageKind, - SubgraphKeys, SubgraphName, -}; -use crate::{RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION}; - -#[derive(Clone, Debug)] -pub struct FollowerMessenger { - kind: FollowerMessengerKind, -} - -impl FollowerMessenger { - /// Create a [`FollowerMessenger`] for the main session that can talk to itself via a channel. - pub fn from_main_session( - follower_message_sender: Sender, - leader_message_receiver: Receiver, - ) -> Self { - Self { - kind: FollowerMessengerKind::from_main_session( - follower_message_sender, - leader_message_receiver, - ), - } - } - - /// Create a [`FollowerMessenger`] for an attached session that can talk to the main session via a socket. - pub fn from_attached_session(raw_socket_name: &str) -> Self { - Self { - kind: FollowerMessengerKind::from_attached_session(raw_socket_name.to_string()), - } - } - - /// Send a health check to the main session once every second to make sure it is alive. - /// - /// This is function will block indefinitely and should be run from a separate thread. - pub fn health_check(&self) -> RoverResult<()> { - loop { - if let Err(e) = - self.message_leader(FollowerMessage::health_check(self.is_from_main_session())?) - { - break Err(e); - } - std::thread::sleep(Duration::from_secs(1)); - } - } - - /// Send a version check to the main session - pub fn version_check(&self) -> RoverResult<()> { - self.message_leader(FollowerMessage::get_version(self.is_from_main_session()))?; - Ok(()) - } - - /// Request information about the current subgraphs in a session - pub fn session_subgraphs(&self) -> RoverResult> { - self.message_leader(FollowerMessage::get_subgraphs(self.is_from_main_session())) - } - - /// Add a subgraph to the main session - pub fn add_subgraph(&self, subgraph: &SubgraphDefinition) -> RoverResult<()> { - self.message_leader(FollowerMessage::add_subgraph( - self.is_from_main_session(), - subgraph, - )?)?; - Ok(()) - } - - /// Update a subgraph in the main session - pub fn update_subgraph(&self, subgraph: &SubgraphDefinition) -> RoverResult<()> { - self.message_leader(FollowerMessage::update_subgraph( - self.is_from_main_session(), - subgraph, - )?)?; - Ok(()) - } - - /// Remove a subgraph from the main session - pub fn remove_subgraph(&self, subgraph: &SubgraphName) -> RoverResult<()> { - self.message_leader(FollowerMessage::remove_subgraph( - self.is_from_main_session(), - subgraph, - )?)?; - Ok(()) - } - - /// Send a message to the leader - fn message_leader( - &self, - follower_message: FollowerMessage, - ) -> RoverResult> { - self.kind.message_leader(follower_message) - } - - fn is_from_main_session(&self) -> bool { - self.kind.is_from_main_session() - } -} - -#[derive(Clone, Debug)] -enum FollowerMessengerKind { - FromMainSession { - follower_message_sender: Sender, - leader_message_receiver: Receiver, - }, - FromAttachedSession { - raw_socket_name: String, - }, -} - -impl FollowerMessengerKind { - fn from_main_session( - follower_message_sender: Sender, - leader_message_receiver: Receiver, - ) -> Self { - Self::FromMainSession { - follower_message_sender, - leader_message_receiver, - } - } - - fn from_attached_session(raw_socket_name: String) -> Self { - Self::FromAttachedSession { raw_socket_name } - } - - fn message_leader( - &self, - follower_message: FollowerMessage, - ) -> RoverResult> { - use FollowerMessengerKind::*; - follower_message.print(); - let leader_message = match self { - FromMainSession { - follower_message_sender, - leader_message_receiver, - } => { - tracing::trace!("main session sending follower message on channel"); - follower_message_sender.send(follower_message)?; - tracing::trace!("main session reading leader message from channel"); - let leader_message = leader_message_receiver.recv().map_err(|e| { - RoverError::new(anyhow!("the main process failed to update itself").context(e)) - }); - - tracing::trace!("main session received leader message from channel"); - - leader_message - } - FromAttachedSession { raw_socket_name } => { - let socket_name = create_socket_name(raw_socket_name)?; - let stream = Stream::connect(socket_name).map_err(|_| { - let mut err = RoverError::new(anyhow!( - "there is not a main `rover dev` process to report updates to" - )); - err.set_suggestion(RoverErrorSuggestion::SubmitIssue); - err - })?; - - let mut stream = BufReader::new(stream); - - tracing::trace!("attached session sending follower message on socket"); - // send our message over the socket - socket_write(&follower_message, &mut stream)?; - - tracing::trace!("attached session reading leader message from socket"); - // wait for our message to be read by the other socket handler - // then read the response that was written back to the socket - socket_read(&mut stream).map_err(|e| { - RoverError::new( - anyhow!( - "this process did not receive a message from the main process after sending {:?}", - &follower_message - ) - .context(e), - ) - }) - } - }?; - - self.handle_leader_message(&leader_message) - } - - fn handle_leader_message( - &self, - leader_message: &LeaderMessageKind, - ) -> RoverResult> { - leader_message.print(); - match leader_message { - LeaderMessageKind::GetVersion { - leader_version, - follower_version: _, - } => { - self.require_same_version(leader_version)?; - Ok(None) - } - LeaderMessageKind::LeaderSessionInfo { subgraphs } => Ok(Some(subgraphs.to_vec())), - _ => Ok(None), - } - } - - fn require_same_version(&self, leader_version: &str) -> RoverResult<()> { - if leader_version != PKG_VERSION { - let mut err = RoverError::new(anyhow!( - "The main process is running version {}, and this process is running version {}.", - &leader_version, - PKG_VERSION - )); - err.set_suggestion(RoverErrorSuggestion::Adhoc( - "You should use the same version of `rover` to run `rover dev` sessions" - .to_string(), - )); - Err(err) - } else { - Ok(()) - } - } - - fn is_from_main_session(&self) -> bool { - matches!( - self, - Self::FromMainSession { - follower_message_sender: _, - leader_message_receiver: _ - } - ) - } -} diff --git a/src/command/dev/legacy/protocol/follower/mod.rs b/src/command/dev/legacy/protocol/follower/mod.rs deleted file mode 100644 index d2b3685cd..000000000 --- a/src/command/dev/legacy/protocol/follower/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -mod message; -mod messenger; - -pub use message::*; -pub use messenger::*; - -use crossbeam_channel::{bounded, Receiver, Sender}; - -#[derive(Debug, Clone)] -pub struct FollowerChannel { - pub sender: Sender, - pub receiver: Receiver, -} - -impl FollowerChannel { - pub fn new() -> Self { - let (sender, receiver) = bounded(0); - - Self { sender, receiver } - } -} diff --git a/src/command/dev/legacy/protocol/leader.rs b/src/command/dev/legacy/protocol/leader.rs deleted file mode 100644 index 9729a1f16..000000000 --- a/src/command/dev/legacy/protocol/leader.rs +++ /dev/null @@ -1,654 +0,0 @@ -use std::{ - collections::{hash_map::Entry::Vacant, HashMap}, - fmt::Debug, - io::BufReader, - net::TcpListener, - str::FromStr, -}; - -use anyhow::{anyhow, Context}; -use apollo_federation_types::{ - config::{FederationVersion, SupergraphConfig}, - javascript::SubgraphDefinition, -}; -use camino::Utf8PathBuf; -use crossbeam_channel::{bounded, Receiver, Sender}; -use futures::TryFutureExt; -use interprocess::local_socket::traits::{ListenerExt, Stream}; -use interprocess::local_socket::ListenerOptions; -use serde::{Deserialize, Serialize}; -use tracing::{info, warn}; - -use crate::{ - command::dev::{ - legacy::{ - compose::ComposeRunner, - do_dev::log_err_and_continue, - router::{RouterConfigHandler, RouterRunner}, - }, - OVERRIDE_DEV_COMPOSITION_VERSION, - }, - options::PluginOpts, - utils::client::StudioClientConfig, - RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION, -}; - -use super::{ - create_socket_name, - socket::{handle_socket_error, socket_read, socket_write}, - types::{ - CompositionResult, SubgraphEntry, SubgraphKey, SubgraphKeys, SubgraphName, SubgraphSdl, - }, - FollowerChannel, FollowerMessage, FollowerMessageKind, -}; - -#[derive(Debug)] -pub struct LeaderSession { - subgraphs: HashMap, - raw_socket_name: String, - compose_runner: ComposeRunner, - router_runner: Option, - follower_channel: FollowerChannel, - leader_channel: LeaderChannel, - federation_version: FederationVersion, - supergraph_config: Option, -} - -impl LeaderSession { - /// Create a new [`LeaderSession`] that is responsible for running composition and the router - /// It listens on a socket for incoming messages for subgraph changes, in addition to watching - /// its own subgraph - /// Returns: - /// Ok(Some(Self)) when successfully initiated - /// Ok(None) when a LeaderSession already exists for that address - /// Err(RoverError) when something went wrong. - #[allow(clippy::too_many_arguments)] - pub async fn new( - override_install_path: Option, - client_config: &StudioClientConfig, - leader_channel: LeaderChannel, - follower_channel: FollowerChannel, - plugin_opts: PluginOpts, - supergraph_config: &Option, - router_config_handler: RouterConfigHandler, - license: Option, - ) -> RoverResult> { - let raw_socket_name = router_config_handler.get_raw_socket_name(); - let router_socket_addr = router_config_handler.get_router_address(); - let socket_name = create_socket_name(&raw_socket_name)?; - - if let Ok(stream) = Stream::connect(socket_name.clone()) { - // write to the socket, so we don't make the other session deadlock waiting on a message - let mut stream = BufReader::new(stream); - socket_write(&FollowerMessage::health_check(false)?, &mut stream)?; - let _ = LeaderSession::socket_read(&mut stream); - // return early so an attached session can be created instead - return Ok(None); - } - - tracing::info!("initializing main `rover dev process`"); - // if we can't connect to the socket, we should start it and listen for incoming - // subgraph events - // - // remove the socket file before starting in case it was here from last time - // if we can't connect to it, it's safe to remove - let _ = std::fs::remove_file(&raw_socket_name); - - if TcpListener::bind(router_socket_addr).is_err() { - let mut err = - RoverError::new(anyhow!("You cannot bind the router to '{}' because that address is already in use by another process on this machine.", &router_socket_addr)); - err.set_suggestion(RoverErrorSuggestion::Adhoc( - format!("Try setting a different port for the router to bind to with the `--supergraph-port` argument, or shut down the process bound to '{}'.", &router_socket_addr) - )); - return Err(err); - } - - // create a [`ComposeRunner`] that will be in charge of composing our supergraph - let mut compose_runner = ComposeRunner::new( - plugin_opts.clone(), - override_install_path.clone(), - client_config.clone(), - router_config_handler.get_supergraph_schema_path(), - ); - - // create a [`RouterRunner`] that we will use to spawn the router when we have a successful composition - let mut router_runner = RouterRunner::new( - router_config_handler.get_supergraph_schema_path(), - router_config_handler.get_router_config_path(), - plugin_opts.clone(), - router_socket_addr, - router_config_handler.get_router_listen_path(), - override_install_path, - client_config.clone(), - license, - ); - - let config_fed_version = supergraph_config - .clone() - .and_then(|sc| sc.get_federation_version()); - - let federation_version = Self::get_federation_version( - config_fed_version, - OVERRIDE_DEV_COMPOSITION_VERSION.clone(), - )?; - - // install plugins before proceeding - router_runner.maybe_install_router().await?; - compose_runner - .maybe_install_supergraph(federation_version.clone()) - .await?; - - router_config_handler.start()?; - - Ok(Some(Self { - subgraphs: HashMap::new(), - raw_socket_name, - compose_runner, - router_runner: Some(router_runner), - follower_channel, - leader_channel, - federation_version, - supergraph_config: supergraph_config.clone(), - })) - } - - /// Calculates what the correct version of Federation should be, based on the - /// value of the given environment variable and the supergraph_schema - /// - /// The order of precedence is: - /// Environment Variable -> Schema -> Default (Latest) - fn get_federation_version( - sc_config_version: Option, - env_var: Option, - ) -> RoverResult { - let env_var_version = if let Some(version) = env_var { - match FederationVersion::from_str(&format!("={}", version)) { - Ok(v) => Some(v), - Err(e) => { - warn!("could not parse version from environment variable '{:}'", e); - info!("will check supergraph schema next..."); - None - } - } - } else { - None - }; - - env_var_version.map(Ok).unwrap_or_else(|| { - Ok(sc_config_version.unwrap_or_else(|| { - warn!("federation version not found in supergraph schema"); - info!("using latest version instead"); - FederationVersion::LatestFedTwo - })) - }) - } - - /// Start the session by watching for incoming subgraph updates and re-composing when needed - #[allow(unreachable_code)] - pub async fn listen_for_all_subgraph_updates( - &mut self, - ready_sender: futures::channel::mpsc::Sender<()>, - ) -> RoverResult<()> { - self.receive_messages_from_attached_sessions()?; - self.receive_all_subgraph_updates(ready_sender).await; - Ok(()) - } - - /// Listen for incoming subgraph updates and re-compose the supergraph - async fn receive_all_subgraph_updates( - &mut self, - mut ready_sender: futures::channel::mpsc::Sender<()>, - ) -> ! { - ready_sender.try_send(()).unwrap(); - loop { - tracing::trace!("main session waiting for follower message"); - let follower_message = self.follower_channel.receiver.recv().unwrap(); - let leader_message = self - .handle_follower_message_kind(follower_message.kind()) - .await; - - if !follower_message.is_from_main_session() { - leader_message.print(); - } - let debug_message = format!("could not send message {:?}", &leader_message); - tracing::trace!("main session sending leader message"); - - self.leader_channel - .sender - .send(leader_message) - .expect(&debug_message); - tracing::trace!("main session sent leader message"); - } - } - - /// Listen on the socket for incoming [`FollowerMessageKind`] messages. - fn receive_messages_from_attached_sessions(&self) -> RoverResult<()> { - let socket_name = create_socket_name(&self.raw_socket_name)?; - let listener = ListenerOptions::new() - .name(socket_name) - .create_sync() - .with_context(|| { - format!( - "could not start local socket server at {:?}", - &self.raw_socket_name - ) - })?; - tracing::info!( - "connected to socket {}, waiting for messages", - &self.raw_socket_name - ); - - let follower_message_sender = self.follower_channel.sender.clone(); - let leader_message_receiver = self.leader_channel.receiver.clone(); - tokio::task::spawn_blocking(move || { - listener - .incoming() - .filter_map(handle_socket_error) - .for_each(|stream| { - let mut stream = BufReader::new(stream); - let follower_message = Self::socket_read(&mut stream); - let _ = match follower_message { - Ok(message) => { - let debug_message = format!("{:?}", &message); - tracing::debug!("the main `rover dev` process read a message from the socket, sending an update message on the channel"); - follower_message_sender.send(message).unwrap_or_else(|_| { - panic!("failed to send message on channel: {}", &debug_message) - }); - tracing::debug!("the main `rover dev` process is processing the message from the socket"); - let leader_message = leader_message_receiver.recv().expect("failed to receive message on the channel"); - tracing::debug!("the main `rover dev` process is sending the result on the socket"); - Self::socket_write(leader_message, &mut stream) - } - Err(e) => { - tracing::debug!("the main `rover dev` process could not read incoming socket message, skipping channel update"); - Err(e) - } - }.map_err(log_err_and_continue); - }); - }); - - Ok(()) - } - - /// Adds a subgraph to the internal supergraph representation. - async fn add_subgraph(&mut self, subgraph_entry: &SubgraphEntry) -> LeaderMessageKind { - let is_first_subgraph = self.subgraphs.is_empty(); - let ((name, url), sdl) = subgraph_entry; - - if let Vacant(e) = self.subgraphs.entry((name.to_string(), url.clone())) { - e.insert(sdl.to_string()); - - // Followers add subgraphs, but sometimes those subgraphs depend on each other - // (e.g., through extending a type in another subgraph). When that happens, - // composition fails until _all_ subgraphs are loaded in. This acknowledges the - // follower's message when we haven't loaded in all the subgraphs, deferring - // composition until we have at least the number of subgraphs represented in the - // supergraph.yaml file - // - // This applies only when the supergraph.yaml file is present. Without it, we will - // try composition each time we add a subgraph - if let Some(supergraph_config) = self.supergraph_config.clone() { - let subgraphs_from_config = supergraph_config.into_iter(); - if self.subgraphs.len() < subgraphs_from_config.len() { - return LeaderMessageKind::MessageReceived; - } - } - - let composition_result = self.compose().await; - if let Err(composition_err) = composition_result { - LeaderMessageKind::error(composition_err) - } else if composition_result.transpose().is_some() && !is_first_subgraph { - LeaderMessageKind::add_subgraph_composition_success(name) - } else { - LeaderMessageKind::MessageReceived - } - } else { - LeaderMessageKind::error( - RoverError::new(anyhow!( - "subgraph with name '{}' and url '{}' already exists", - &name, - &url - )) - .to_string(), - ) - } - } - - /// Updates a subgraph in the internal supergraph representation. - async fn update_subgraph(&mut self, subgraph_entry: &SubgraphEntry) -> LeaderMessageKind { - let ((name, url), sdl) = &subgraph_entry; - if let Some(prev_sdl) = self.subgraphs.get_mut(&(name.to_string(), url.clone())) { - if prev_sdl != sdl { - *prev_sdl = sdl.to_string(); - let composition_result = self.compose().await; - if let Err(composition_err) = composition_result { - LeaderMessageKind::error(composition_err) - } else if composition_result.transpose().is_some() { - LeaderMessageKind::update_subgraph_composition_success(name) - } else { - LeaderMessageKind::message_received() - } - } else { - LeaderMessageKind::message_received() - } - } else { - self.add_subgraph(subgraph_entry).await - } - } - - /// Removes a subgraph from the internal subgraph representation. - async fn remove_subgraph(&mut self, subgraph_name: &SubgraphName) -> LeaderMessageKind { - let found = self - .subgraphs - .keys() - .find(|(name, _)| name == subgraph_name) - .cloned(); - - if let Some((name, url)) = found { - self.subgraphs.remove(&(name.to_string(), url)); - let composition_result = self.compose().await; - if let Err(composition_err) = composition_result { - LeaderMessageKind::error(composition_err) - } else if composition_result.transpose().is_some() { - LeaderMessageKind::remove_subgraph_composition_success(&name) - } else { - LeaderMessageKind::message_received() - } - } else { - LeaderMessageKind::message_received() - } - } - - /// Reruns composition, which triggers the router to reload. - async fn compose(&mut self) -> CompositionResult { - match self - .compose_runner - .run(&mut self.supergraph_config_internal_representation()) - .and_then(|maybe_new_schema| async { - if maybe_new_schema.is_some() { - if let Some(runner) = self.router_runner.as_mut() { - if let Err(err) = runner.spawn().await { - return Err(err.to_string()); - } - } - } - Ok(maybe_new_schema) - }) - .await - { - Ok(res) => Ok(res), - Err(e) => { - if let Some(runner) = self.router_runner.as_mut() { - let _ = runner.kill().await.map_err(log_err_and_continue); - } - Err(e) - } - } - } - - /// Reads a [`FollowerMessage`] from an open socket connection. - fn socket_read( - stream: &mut BufReader, - ) -> RoverResult { - socket_read(stream) - .inspect(|message| { - tracing::debug!("leader received message {:?}", &message); - }) - .map_err(|e| { - e.context("the main `rover dev` process did not receive a valid incoming message") - .into() - }) - } - - /// Writes a [`LeaderMessageKind`] to an open socket connection. - fn socket_write( - message: LeaderMessageKind, - stream: &mut BufReader, - ) -> RoverResult<()> { - tracing::debug!("leader sending message {:?}", message); - socket_write(&message, stream) - } - - /// Gets the supergraph configuration from the internal state. This can different from the - /// supergraph.yaml file as it represents intermediate states of composition while adding - /// subgraphs to the internal representation of that file - fn supergraph_config_internal_representation(&self) -> SupergraphConfig { - let mut supergraph_config: SupergraphConfig = self - .subgraphs - .iter() - .map(|((name, url), sdl)| SubgraphDefinition { - name: name.clone(), - url: url.to_string(), - sdl: sdl.clone(), - }) - .collect::>() - .into(); - - supergraph_config.set_federation_version(self.federation_version.clone()); - supergraph_config - } - - /// Gets the list of subgraphs running in this session - fn get_subgraphs(&self) -> SubgraphKeys { - tracing::debug!("notifying new `rover dev` process about existing subgraphs"); - self.subgraphs.keys().cloned().collect() - } - - pub async fn shutdown(&mut self) { - let router_runner = self.router_runner.take(); - let raw_socket_name = self.raw_socket_name.clone(); - if let Some(mut runner) = router_runner { - let _ = runner.kill().await.map_err(log_err_and_continue); - } - let _ = std::fs::remove_file(&raw_socket_name); - std::process::exit(1) - } - - /// Handles a follower message by updating the internal subgraph representation if needed, - /// and returns a [`LeaderMessageKind`] that can be sent over a socket or printed by the main session - async fn handle_follower_message_kind( - &mut self, - follower_message: &FollowerMessageKind, - ) -> LeaderMessageKind { - use FollowerMessageKind::*; - match follower_message { - AddSubgraph { subgraph_entry } => self.add_subgraph(subgraph_entry).await, - - UpdateSubgraph { subgraph_entry } => self.update_subgraph(subgraph_entry).await, - - RemoveSubgraph { subgraph_name } => self.remove_subgraph(subgraph_name).await, - - GetSubgraphs => LeaderMessageKind::current_subgraphs(self.get_subgraphs()), - - Shutdown => { - self.shutdown().await; - LeaderMessageKind::message_received() - } - - HealthCheck => LeaderMessageKind::message_received(), - - GetVersion { follower_version } => LeaderMessageKind::get_version(follower_version), - } - } -} - -impl Drop for LeaderSession { - fn drop(&mut self) { - let router_runner = self.router_runner.take(); - let socket_addr = self.raw_socket_name.clone(); - tokio::task::spawn(async move { - if let Some(mut runner) = router_runner { - let _ = runner.kill().await.map_err(log_err_and_continue); - } - let _ = std::fs::remove_file(&socket_addr); - std::process::exit(1) - }); - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum LeaderMessageKind { - GetVersion { - follower_version: String, - leader_version: String, - }, - LeaderSessionInfo { - subgraphs: SubgraphKeys, - }, - CompositionSuccess { - action: String, - }, - ErrorNotification { - error: String, - }, - MessageReceived, -} - -impl LeaderMessageKind { - pub fn get_version(follower_version: &str) -> Self { - Self::GetVersion { - follower_version: follower_version.to_string(), - leader_version: PKG_VERSION.to_string(), - } - } - - pub fn current_subgraphs(subgraphs: SubgraphKeys) -> Self { - Self::LeaderSessionInfo { subgraphs } - } - - pub fn error(error: String) -> Self { - Self::ErrorNotification { error } - } - - pub fn add_subgraph_composition_success(subgraph_name: &SubgraphName) -> Self { - Self::CompositionSuccess { - action: format!("adding the '{}' subgraph", subgraph_name), - } - } - - pub fn update_subgraph_composition_success(subgraph_name: &SubgraphName) -> Self { - Self::CompositionSuccess { - action: format!("updating the '{}' subgraph", subgraph_name), - } - } - - pub fn remove_subgraph_composition_success(subgraph_name: &SubgraphName) -> Self { - Self::CompositionSuccess { - action: format!("removing the '{}' subgraph", subgraph_name), - } - } - - pub fn message_received() -> Self { - Self::MessageReceived - } - - pub fn print(&self) { - match self { - LeaderMessageKind::ErrorNotification { error } => { - eprintln!("{}", error); - } - LeaderMessageKind::CompositionSuccess { action } => { - eprintln!("successfully composed after {}", &action); - } - LeaderMessageKind::LeaderSessionInfo { subgraphs } => { - let subgraphs = match subgraphs.len() { - 0 => "no subgraphs".to_string(), - 1 => "1 subgraph".to_string(), - l => format!("{} subgraphs", l), - }; - tracing::info!("the main `rover dev` process currently has {}", subgraphs); - } - LeaderMessageKind::GetVersion { - leader_version, - follower_version: _, - } => { - tracing::debug!( - "the main `rover dev` process is running version {}", - &leader_version - ); - } - LeaderMessageKind::MessageReceived => { - tracing::debug!( - "the main `rover dev` process acknowledged the message, but did not take an action" - ) - } - } - } -} - -#[derive(Debug, Clone)] -pub struct LeaderChannel { - pub sender: Sender, - pub receiver: Receiver, -} - -impl LeaderChannel { - pub fn new() -> Self { - let (sender, receiver) = bounded(0); - - Self { sender, receiver } - } -} - -#[cfg(test)] -mod tests { - use apollo_federation_types::config::FederationVersion::{ExactFedOne, ExactFedTwo}; - use rstest::rstest; - use semver::Version; - use speculoos::assert_that; - use speculoos::prelude::ResultAssertions; - - use super::*; - - #[rstest] - fn leader_message_can_get_version() { - let follower_version = PKG_VERSION.to_string(); - let message = LeaderMessageKind::get_version(&follower_version); - let expected_message_json = serde_json::to_string(&message).unwrap(); - assert_eq!( - expected_message_json, - serde_json::json!({ - "GetVersion": { - "follower_version": follower_version, - "leader_version": follower_version, - } - }) - .to_string() - ) - } - - #[rstest] - #[case::env_var_no_yaml_fed_two(Some(String::from("2.3.4")), None, ExactFedTwo(Version::parse("2.3.4").unwrap()), false)] - #[case::env_var_no_yaml_fed_one(Some(String::from("0.40.0")), None, ExactFedOne(Version::parse("0.40.0").unwrap()), false)] - #[case::env_var_no_yaml_unsupported_fed_version( - Some(String::from("1.0.1")), - None, - FederationVersion::LatestFedTwo, - false - )] - #[case::nonsense_env_var_no_yaml( - Some(String::from("crackers")), - None, - FederationVersion::LatestFedTwo, - false - )] - #[case::env_var_with_yaml_fed_two(Some(String::from("2.3.4")), Some(ExactFedTwo(Version::parse("2.3.4").unwrap())), ExactFedTwo(Version::parse("2.3.4").unwrap()), false)] - #[case::env_var_with_yaml_fed_one(Some(String::from("0.50.0")), Some(ExactFedTwo(Version::parse("2.3.5").unwrap())), ExactFedOne(Version::parse("0.50.0").unwrap()), false)] - #[case::nonsense_env_var_with_yaml(Some(String::from("cheese")), Some(ExactFedTwo(Version::parse("2.3.5").unwrap())), ExactFedTwo(Version::parse("2.3.5").unwrap()), false)] - #[case::yaml_no_env_var_fed_two(None, Some(ExactFedTwo(Version::parse("2.3.5").unwrap())), ExactFedTwo(Version::parse("2.3.5").unwrap()), false)] - #[case::yaml_no_env_var_fed_one(None, Some(ExactFedOne(Version::parse("0.69.0").unwrap())), ExactFedOne(Version::parse("0.69.0").unwrap()), false)] - #[case::nothing_grabs_latest(None, None, FederationVersion::LatestFedTwo, false)] - fn federation_version_respects_precedence_order( - #[case] env_var_value: Option, - #[case] config_value: Option, - #[case] expected_value: FederationVersion, - #[case] error_expected: bool, - ) { - let res = LeaderSession::get_federation_version(config_value, env_var_value); - if error_expected { - assert_that(&res).is_err(); - } else { - assert_that(&res.unwrap()).is_equal_to(expected_value); - } - } -} diff --git a/src/command/dev/legacy/protocol/mod.rs b/src/command/dev/legacy/protocol/mod.rs deleted file mode 100644 index 72eb1572c..000000000 --- a/src/command/dev/legacy/protocol/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -use interprocess::local_socket::{GenericFilePath, Name, ToFsName}; - -pub use follower::*; -pub use leader::*; -pub(crate) use socket::*; -pub use types::*; - -mod follower; -mod leader; -mod socket; -mod types; - -pub(crate) fn create_socket_name(raw_socket_name: &str) -> std::io::Result { - raw_socket_name.to_fs_name::() -} diff --git a/src/command/dev/legacy/protocol/socket.rs b/src/command/dev/legacy/protocol/socket.rs deleted file mode 100644 index ccb1ba2eb..000000000 --- a/src/command/dev/legacy/protocol/socket.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{ - fmt::Debug, - io::{self, BufRead, BufReader, Write}, -}; - -use anyhow::{anyhow, Context, Error}; -use interprocess::local_socket::Stream; -use serde::{de::DeserializeOwned, Serialize}; - -use crate::RoverResult; - -pub(crate) fn handle_socket_error(conn: io::Result) -> Option { - match conn { - Ok(val) => Some(val), - Err(error) => { - eprintln!("incoming connection failed: {}", error); - None - } - } -} - -pub(crate) fn socket_read(stream: &mut BufReader) -> std::result::Result -where - B: Serialize + DeserializeOwned + Debug, -{ - let mut incoming_message = String::new(); - - match stream.read_line(&mut incoming_message) { - Ok(_) => { - if incoming_message.is_empty() { - Err(anyhow!("incoming message was empty")) - } else { - let incoming_message: B = - serde_json::from_str(&incoming_message).with_context(|| { - format!( - "incoming message '{}' was not valid JSON", - &incoming_message - ) - })?; - Ok(incoming_message) - } - } - Err(e) => Err(Error::new(e).context("could not read incoming message")), - } -} - -pub(crate) fn socket_write(message: &A, stream: &mut BufReader) -> RoverResult<()> -where - A: Serialize + DeserializeOwned + Debug, -{ - let outgoing_json = serde_json::to_string(message) - .with_context(|| format!("could not convert outgoing message {:?} to json", &message))?; - let outgoing_string = format!("{}\n", &outgoing_json); - stream - .get_mut() - .write_all(outgoing_string.as_bytes()) - .with_context(|| { - format!( - "could not write outgoing message {:?} to socket", - &outgoing_json - ) - })?; - Ok(()) -} diff --git a/src/command/dev/legacy/protocol/types.rs b/src/command/dev/legacy/protocol/types.rs deleted file mode 100644 index 472b6c470..000000000 --- a/src/command/dev/legacy/protocol/types.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::command::supergraph::compose::CompositionOutput; - -use anyhow::Result; -use apollo_federation_types::javascript::SubgraphDefinition; -use reqwest::Url; - -pub type SubgraphName = String; -pub type SubgraphUrl = Url; -pub type SubgraphSdl = String; -pub type SubgraphKey = (SubgraphName, SubgraphUrl); -pub type SubgraphKeys = Vec; -pub type SubgraphEntry = (SubgraphKey, SubgraphSdl); -pub type CompositionResult = std::result::Result, String>; - -pub(crate) fn sdl_from_definition(subgraph_definition: &SubgraphDefinition) -> SubgraphSdl { - subgraph_definition.sdl.to_string() -} - -pub(crate) fn name_from_definition(subgraph_definition: &SubgraphDefinition) -> SubgraphName { - subgraph_definition.name.to_string() -} - -pub(crate) fn url_from_definition(subgraph_definition: &SubgraphDefinition) -> Result { - Ok(subgraph_definition.url.parse()?) -} - -pub(crate) fn key_from_definition(subgraph_definition: &SubgraphDefinition) -> Result { - Ok(( - name_from_definition(subgraph_definition), - url_from_definition(subgraph_definition)?, - )) -} - -pub(crate) fn entry_from_definition( - subgraph_definition: &SubgraphDefinition, -) -> Result { - Ok(( - key_from_definition(subgraph_definition)?, - sdl_from_definition(subgraph_definition), - )) -} diff --git a/src/command/dev/legacy/router/command.rs b/src/command/dev/legacy/router/command.rs deleted file mode 100644 index 6d1480889..000000000 --- a/src/command/dev/legacy/router/command.rs +++ /dev/null @@ -1,153 +0,0 @@ -use std::env::var; -use std::{ - io::{BufRead, BufReader}, - process::{Child, Command, Stdio}, -}; - -use anyhow::{anyhow, Context}; -use crossbeam_channel::Sender; -use rover_client::operations::config::who_am_i::{self, Actor}; -use rover_std::warnln; - -use crate::options::ProfileOpt; -use crate::utils::client::StudioClientConfig; -use crate::{command::dev::legacy::do_dev::log_err_and_continue, RoverError, RoverResult}; - -#[derive(Debug)] -pub struct BackgroundTask { - child: Child, - descriptor: String, -} - -pub enum BackgroundTaskLog { - Stdout(String), - Stderr(String), -} - -impl BackgroundTask { - pub async fn new( - command: String, - log_sender: Sender, - client_config: &StudioClientConfig, - profile_opt: &ProfileOpt, - ) -> RoverResult { - let descriptor = command.clone(); - let args: Vec<&str> = command.split(' ').collect(); - let (bin, args) = match args.len() { - 0 => Err(anyhow!("the command you passed is empty")), - 1 => Ok((args[0], Vec::new())), - _ => Ok((args[0], Vec::from_iter(args[1..].iter()))), - }?; - tracing::info!("starting `{}`", &command); - - if which::which(bin).is_err() { - return Err(anyhow!("{} is not installed on this machine", &bin).into()); - } - - let mut command = Command::new(bin); - command.args(args).env("APOLLO_ROVER", "true"); - - command.stdout(Stdio::piped()).stderr(Stdio::piped()); - command.stdin(Stdio::null()); - - if let Ok(apollo_graph_ref) = var("APOLLO_GRAPH_REF") { - command.env("APOLLO_GRAPH_REF", apollo_graph_ref); - if let Ok(client) = client_config - .get_authenticated_client(profile_opt) - .map_err(|err| { - warnln!( - "APOLLO_GRAPH_REF is set, but credentials could not be loaded. Enterprise features within the router will not function: {err}" - ); - }) - { - if let Some(api_key) = who_am_i::run(&client).await.map_or_else(|err| { - warnln!("Could not determine the type of configured credentials, Router may fail to start if Enterprise features are enabled: {err}"); - Some(client.credential.api_key.clone()) - }, |identity| { - match identity.key_actor_type { - Actor::GRAPH => Some(client.credential.api_key.clone()), - _ => { - warnln!( - "APOLLO_GRAPH_REF is set, but the key provided is not a graph key. \ - Enterprise features within the router will not function. \ - Either select a `--profile` that is configured with a graph-specific \ - key, or provide one via the APOLLO_KEY environment variable. \ - You can configure a graph key by following the instructions at https://www.apollographql.com/docs/graphos/api-keys/#graph-api-keys"); - None - } - } - }) { - command.env("APOLLO_KEY", api_key); - } - } - } - - let mut child = command - .spawn() - .with_context(|| "could not spawn child process")?; - - match child.stdout.take() { - Some(stdout) => { - let log_sender = log_sender.clone(); - tokio::task::spawn_blocking(move || { - let stdout = BufReader::new(stdout); - stdout.lines().for_each(|line| { - if let Ok(line) = line { - log_sender - .send(BackgroundTaskLog::Stdout(line)) - .expect("could not update stdout logs for command"); - } - }); - }); - } - None => { - return Err(anyhow!("Could not take stdout from spawned router").into()); - } - } - - match child.stderr.take() { - Some(stderr) => { - tokio::task::spawn_blocking(move || { - let stderr = BufReader::new(stderr); - stderr.lines().for_each(|line| { - if let Ok(line) = line { - log_sender - .send(BackgroundTaskLog::Stderr(line)) - .expect("could not update stderr logs for command"); - } - }); - }); - } - None => { - return Err(anyhow!("Could not take stderr from spawned router").into()); - } - } - - Ok(Self { child, descriptor }) - } - - pub fn kill(&mut self) { - let pid = self.id(); - tracing::info!("killing child with pid {}", &pid); - let _ = self.child.kill().map_err(|_| { - log_err_and_continue(RoverError::new(anyhow!( - "could not kill child with pid {}", - &pid - ))); - }); - } - - pub fn id(&self) -> u32 { - self.child.id() - } - - pub fn descriptor(&self) -> &str { - &self.descriptor - } -} - -impl Drop for BackgroundTask { - fn drop(&mut self) { - self.kill() - } -} diff --git a/src/command/dev/legacy/router/config.rs b/src/command/dev/legacy/router/config.rs deleted file mode 100644 index 7f0305a09..000000000 --- a/src/command/dev/legacy/router/config.rs +++ /dev/null @@ -1,313 +0,0 @@ -use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, - sync::{Arc, Mutex}, -}; - -use anyhow::{anyhow, Context}; -use camino::Utf8PathBuf; -use crossbeam_channel::{unbounded, Receiver}; -use serde_json::json; - -use rover_std::{warnln, Fs}; - -use crate::utils::expansion::expand; -use crate::{ - command::dev::{legacy::do_dev::log_err_and_continue, SupergraphOpts}, - RoverError, RoverResult, -}; - -const DEFAULT_ROUTER_SOCKET_ADDR: SocketAddr = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4000); - -/// [`RouterConfigHandler`] is reponsible for orchestrating the YAML configuration file -/// passed to the router plugin, optionally watching a user's router configuration file for changes -#[derive(Debug, Clone)] -pub struct RouterConfigHandler { - /// the router configuration reader - config_reader: RouterConfigReader, - - /// the temp path to write the patched router config out to - tmp_router_config_path: Utf8PathBuf, - - /// the temp path to write the composed schema out to - tmp_supergraph_schema_path: Utf8PathBuf, - - /// the current state of the router config - config_state: Arc>, -} - -impl TryFrom<&SupergraphOpts> for RouterConfigHandler { - type Error = RoverError; - fn try_from(value: &SupergraphOpts) -> Result { - Self::new( - value.router_config_path.clone(), - value.supergraph_address, - value.supergraph_port, - ) - } -} - -impl RouterConfigHandler { - /// Create a [`RouterConfigHandler`] - pub fn new( - input_config_path: Option, - ip_override: Option, - port_override: Option, - ) -> RoverResult { - let tmp_dir = tempfile::Builder::new().prefix("supergraph").tempdir()?; - let tmp_config_dir_path = Utf8PathBuf::try_from(tmp_dir.into_path())?; - - let tmp_router_config_path = tmp_config_dir_path.join("router.yaml"); - let tmp_supergraph_schema_path = tmp_config_dir_path.join("supergraph.graphql"); - - let config_reader = RouterConfigReader::new(input_config_path, ip_override, port_override); - - let config_state = config_reader.read()?; - - Fs::write_file(&tmp_router_config_path, &config_state.config)?; - - Ok(Self { - config_reader, - config_state: Arc::new(Mutex::new(config_state)), - tmp_router_config_path, - tmp_supergraph_schema_path, - }) - } - - /// Start up the router config handler - pub fn start(self) -> RoverResult<()> { - // if a router config was passed, start watching it in the background for changes - - if let Some(state_receiver) = self.config_reader.watch() { - // Build a Rayon Thread pool - tokio::task::spawn_blocking(move || loop { - let config_state = state_receiver - .recv() - .expect("could not watch router config"); - let _ = Fs::write_file(&self.tmp_router_config_path, &config_state.config) - .map_err(|e| log_err_and_continue(e.into())); - eprintln!("successfully updated router config"); - *self - .config_state - .lock() - .expect("could not acquire lock on router configuration state") = config_state; - }); - } - - Ok(()) - } - - /// The address the router should listen on - pub fn get_router_address(&self) -> SocketAddr { - self.config_state - .lock() - .expect("could not acquire lock on router config state") - .socket_addr - .unwrap_or(DEFAULT_ROUTER_SOCKET_ADDR) - } - - /// The path the router should listen on - pub fn get_router_listen_path(&self) -> String { - self.config_state - .lock() - .expect("could not acquire lock on router config state") - .listen_path - .clone() - } - - /// Get the name of the interprocess socket address to communicate with other rover dev sessions - pub fn get_raw_socket_name(&self) -> String { - let socket_name = format!("supergraph-{}.sock", self.get_router_address()); - #[cfg(windows)] - { - format!("\\\\.\\pipe\\{}", socket_name) - } - #[cfg(unix)] - { - format!("/tmp/{}", socket_name) - } - } - - /// The path to the composed supergraph schema - pub fn get_supergraph_schema_path(&self) -> Utf8PathBuf { - self.tmp_supergraph_schema_path.clone() - } - - /// The path to the patched router config YAML - pub fn get_router_config_path(&self) -> Utf8PathBuf { - self.tmp_router_config_path.clone() - } -} - -#[derive(Debug, Clone)] -pub struct RouterConfigState { - /// Where the router should listen - pub socket_addr: Option, - - /// the resolved YAML content - pub config: String, - - /// the path the router is listening on - pub listen_path: String, -} - -#[derive(Debug, Clone)] -struct RouterConfigReader { - input_config_path: Option, - ip_override: Option, - port_override: Option, -} - -impl RouterConfigReader { - pub fn new( - input_config_path: Option, - ip_override: Option, - port_override: Option, - ) -> Self { - Self { - input_config_path, - ip_override, - port_override, - } - } - - fn read(&self) -> RoverResult { - let mut yaml = self - .input_config_path - .as_ref() - .and_then(|path| { - Fs::assert_path_exists(path).ok().map(|_| { - let input_config_contents = Fs::read_file(path)?; - serde_yaml::from_str(&input_config_contents) - .with_context(|| format!("{} is not valid YAML.", path)) - .map_err(RoverError::from) - .and_then(expand) - .and_then(|value| match value { - serde_yaml::Value::Mapping(mapping) => Ok(mapping), - _ => Err(anyhow!("Router config should be a YAML mapping").into()), - }) - }) - }) - .transpose()? - .unwrap_or_default(); - - let yaml_socket_addr = yaml - .get("supergraph") - .and_then(|s| s.get("listen")) - .and_then(|l| l.as_str()) - .and_then(|s| s.parse::().ok()); - - // resolve the ip and port - // precedence is: - // 1) CLI option - // 2) `supergraph.listen` in `router.yaml` - // 3) Nothing—use router's defaults - let socket_addr = match (&self.ip_override, &self.port_override, yaml_socket_addr) { - (Some(ip), Some(port), _) => Some(SocketAddr::new(*ip, *port)), - (Some(ip), None, yaml) => { - let mut socket_addr = yaml.unwrap_or(DEFAULT_ROUTER_SOCKET_ADDR); - socket_addr.set_ip(*ip); - Some(socket_addr) - } - (None, Some(port), yaml) => { - let mut socket_addr = yaml.unwrap_or(DEFAULT_ROUTER_SOCKET_ADDR); - socket_addr.set_port(*port); - Some(socket_addr) - } - (None, None, Some(yaml)) => Some(yaml), - (None, None, None) => None, - }; - - if let Some(socket_addr) = socket_addr { - // update YAML with the ip and port CLI options - yaml.entry("supergraph".into()) - .or_insert_with(|| serde_yaml::Mapping::new().into()) - .as_mapping_mut() - .ok_or_else(|| anyhow!("`supergraph` key in router YAML must be a mapping"))? - .insert("listen".into(), serde_yaml::to_value(socket_addr)?); - } - - // disable the health check unless they have their own config - if yaml - .get("health_check") - .or_else(|| yaml.get("health-check")) - .and_then(|h| h.as_mapping()) - .is_none() - { - yaml.insert( - serde_yaml::to_value("health_check")?, - serde_yaml::to_value(json!({"enabled": false}))?, - ); - } - let listen_path = yaml - .get("supergraph") - .and_then(|s| s.as_mapping()) - .and_then(|l| l.get("path")) - .and_then(|p| p.as_str()) - .unwrap_or_default() - .to_string(); - - let yaml_string = serde_yaml::to_string(&yaml)?; - - if let Some(path) = &self.input_config_path { - if Fs::assert_path_exists(path).is_err() { - warnln!("{path} does not exist, creating a router config from CLI options."); - Fs::write_file(path, &yaml_string)?; - } - } - - Ok(RouterConfigState { - socket_addr, - config: yaml_string, - listen_path, - }) - } - - pub fn watch(self) -> Option> { - if let Some(input_config_path) = &self.input_config_path { - let (raw_tx, mut raw_rx) = tokio::sync::mpsc::unbounded_channel(); - let (state_tx, state_rx) = unbounded(); - let input_config_path: PathBuf = input_config_path.as_path().into(); - Fs::watch_file(input_config_path, raw_tx); - tokio::spawn(async move { - while let Some(res) = raw_rx.recv().await { - res.expect("could not watch router configuration file"); - if let Ok(results) = self.read().map_err(log_err_and_continue) { - state_tx - .send(results) - .expect("could not update router configuration file"); - } else { - eprintln!("invalid router configuration, continuing to use old config"); - } - } - }); - Some(state_rx) - } else { - None - } - } -} - -#[cfg(test)] -mod tests { - use std::net::{IpAddr, Ipv4Addr}; - - use rstest::rstest; - - use crate::command::dev::legacy::router::RouterConfigHandler; - - #[rstest] - #[cfg_attr(windows, case("\\\\.\\pipe\\supergraph-127.0.0.1:4000.sock"))] - #[cfg_attr(unix, case("/tmp/supergraph-127.0.0.1:4000.sock"))] - fn test_socket_types_correctly_detected(#[case] expected_ipc_address: String) { - let ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); - let port_number = 4000; - let r_config = RouterConfigHandler::new(None, Some(ip_addr), Some(port_number)) - .expect("failed to create config handler"); - assert_eq!( - r_config.get_raw_socket_name(), - format!("{}", expected_ipc_address) - ); - } -} diff --git a/src/command/dev/legacy/router/mod.rs b/src/command/dev/legacy/router/mod.rs deleted file mode 100644 index d38150e11..000000000 --- a/src/command/dev/legacy/router/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -mod command; -mod config; -mod runner; - -pub use command::{BackgroundTask, BackgroundTaskLog}; -pub use config::RouterConfigHandler; -pub use runner::RouterRunner; diff --git a/src/command/dev/legacy/router/runner.rs b/src/command/dev/legacy/router/runner.rs deleted file mode 100644 index 63e73feb5..000000000 --- a/src/command/dev/legacy/router/runner.rs +++ /dev/null @@ -1,360 +0,0 @@ -use anyhow::{anyhow, Context}; -use apollo_federation_types::config::RouterVersion; -use camino::Utf8PathBuf; -use crossbeam_channel::bounded; -use reqwest::Client; -use reqwest::Url; -use rover_std::{infoln, Style}; -use semver::Version; - -use std::net::SocketAddr; -use std::time::{Duration, Instant}; - -use crate::command::dev::{ - legacy::{ - do_dev::log_err_and_continue, - router::{BackgroundTask, BackgroundTaskLog}, - }, - OVERRIDE_DEV_ROUTER_VERSION, -}; -use crate::command::install::Plugin; -use crate::command::Install; -use crate::options::PluginOpts; -use crate::utils::client::StudioClientConfig; -use crate::{RoverError, RoverResult}; - -#[derive(Debug)] -pub struct RouterRunner { - supergraph_schema_path: Utf8PathBuf, - router_config_path: Utf8PathBuf, - plugin_opts: PluginOpts, - router_socket_addr: SocketAddr, - router_listen_path: String, - override_install_path: Option, - client_config: StudioClientConfig, - plugin_exe: Option, - router_handle: Option, - license: Option, -} - -impl RouterRunner { - #[allow(clippy::too_many_arguments)] - pub fn new( - supergraph_schema_path: Utf8PathBuf, - router_config_path: Utf8PathBuf, - plugin_opts: PluginOpts, - router_socket_addr: SocketAddr, - router_listen_path: String, - override_install_path: Option, - client_config: StudioClientConfig, - license: Option, - ) -> Self { - Self { - supergraph_schema_path, - router_config_path, - plugin_opts, - router_socket_addr, - router_listen_path, - override_install_path, - client_config, - router_handle: None, - plugin_exe: None, - license, - } - } - - fn install_command(&self) -> RoverResult { - let plugin = match &*OVERRIDE_DEV_ROUTER_VERSION { - Some(version) => Plugin::Router(RouterVersion::Exact(Version::parse(version)?)), - None => Plugin::Router(RouterVersion::Latest), - }; - Ok(Install { - force: false, - plugin: Some(plugin), - elv2_license_accepter: self.plugin_opts.elv2_license_accepter, - }) - } - - pub async fn maybe_install_router(&mut self) -> RoverResult { - if let Some(plugin_exe) = &self.plugin_exe { - Ok(plugin_exe.clone()) - } else { - let install_command = self.install_command()?; - let plugin_exe = install_command - .get_versioned_plugin( - self.override_install_path.clone(), - self.client_config.clone(), - self.plugin_opts.skip_update, - ) - .await?; - self.plugin_exe = Some(plugin_exe.clone()); - Ok(plugin_exe) - } - } - - pub async fn get_command_to_spawn(&mut self) -> RoverResult { - let mut command = format!( - "{plugin_exe} --supergraph {supergraph} --hot-reload --config {config} --log info --dev", - plugin_exe = self.maybe_install_router().await?, - supergraph = self.supergraph_schema_path.as_str(), - config = self.router_config_path.as_str(), - ); - - if let Some(license) = &self.license { - command.push_str(&format!(" --license {}", license)); - } - - Ok(command) - } - - pub async fn wait_for_startup(&mut self, client: Client) -> RoverResult<()> { - let mut ready = false; - let now = Instant::now(); - let seconds = 10; - let base_url = format!( - "http://{}{}/health?ready", - &self.router_socket_addr, &self.router_listen_path - ); - let endpoint = Url::parse(&base_url) - .with_context(|| format!("{base_url} is not a valid URL."))? - .to_string(); - while !ready && now.elapsed() < Duration::from_secs(seconds) { - let _ = client - .get(&endpoint) - .header("Content-Type", "application/json") - .send() - .await - .map(|_| { - ready = true; - }); - tokio::time::sleep(Duration::from_millis(250)).await; - } - - if ready { - infoln!( - "your supergraph is running! head to http://{}{} to query your supergraph", - &self - .router_socket_addr - .to_string() - .replace("127.0.0.1", "localhost") - .replace("0.0.0.0", "localhost") - .replace("[::]", "localhost") - .replace("[::1]", "localhost"), - &self.router_listen_path - ); - Ok(()) - } else { - Err(RoverError::new(anyhow!( - "the router was unable to start up", - ))) - } - } - - pub async fn wait_for_stop(&mut self, client: Client) -> RoverResult<()> { - let mut ready = true; - let now = Instant::now(); - let seconds = 5; - while ready && now.elapsed() < Duration::from_secs(seconds) { - let _ = client - .get(format!("http://{}/health?ready", &self.router_socket_addr)) - .header("Content-Type", "application/json") - .send() - .await - .and_then(|r| r.error_for_status()) - .map_err(|_| { - ready = false; - }); - std::thread::sleep(Duration::from_millis(250)); - } - - if !ready { - tracing::info!("router stopped successfully"); - Ok(()) - } else { - Err(RoverError::new(anyhow!("the router was unable to stop",))) - } - } - - pub async fn spawn(&mut self) -> RoverResult<()> { - if self.router_handle.is_none() { - let client = self.client_config.get_reqwest_client()?; - self.maybe_install_router().await?; - let (router_log_sender, router_log_receiver) = bounded(0); - let router_handle = BackgroundTask::new( - self.get_command_to_spawn().await?, - router_log_sender, - &self.client_config, - &self.plugin_opts.profile, - ) - .await?; - tracing::info!("spawning router with `{}`", router_handle.descriptor()); - - let warn_prefix = Style::WarningPrefix.paint("WARN:"); - let error_prefix = Style::ErrorPrefix.paint("ERROR:"); - let unknown_prefix = Style::ErrorPrefix.paint("UNKNOWN:"); - tokio::task::spawn_blocking(move || loop { - while let Ok(log) = router_log_receiver.recv() { - match log { - BackgroundTaskLog::Stdout(stdout) => { - if let Ok(parsed) = serde_json::from_str::(&stdout) { - let fields = &parsed["fields"]; - let level = parsed["level"].as_str().unwrap_or("UNKNOWN"); - let message = fields["message"] - .as_str() - .or_else(|| { - // Message is in a slightly different location depending on the - // version of Router - parsed["message"].as_str() - }) - .unwrap_or(&stdout); - - match level { - "INFO" => tracing::info!(%message), - "DEBUG" => tracing::debug!(%message), - "TRACE" => tracing::trace!(%message), - "WARN" => eprintln!("{} {}", warn_prefix, &message), - "ERROR" => { - eprintln!("{} {}", error_prefix, &message) - } - "UNKNOWN" => { - eprintln!("{} {}", unknown_prefix, &message) - } - _ => {} - } - } else { - eprintln!("{} {}", warn_prefix, &stdout) - } - } - BackgroundTaskLog::Stderr(stderr) => { - eprintln!("{} {}", error_prefix, &stderr) - } - }; - } - }); - - self.wait_for_startup(client).await?; - self.router_handle = Some(router_handle); - - Ok(()) - } else { - Ok(()) - } - } - - pub async fn kill(&mut self) -> RoverResult<()> { - if self.router_handle.is_some() { - tracing::info!("killing the router"); - self.router_handle = None; - if let Ok(client) = self.client_config.get_reqwest_client() { - let _ = self - .wait_for_stop(client) - .await - .map_err(log_err_and_continue); - } - } - Ok(()) - } -} - -impl Drop for RouterRunner { - fn drop(&mut self) { - let router_handle = self.router_handle.take(); - let client_config = self.client_config.clone(); - let router_socket_addr = self.router_socket_addr; - // copying the kill procedure here to emulate an async drop - tokio::task::spawn(async move { - if router_handle.is_some() { - tracing::info!("killing the router"); - if let Ok(client) = client_config.get_reqwest_client() { - let mut ready = true; - let now = Instant::now(); - let seconds = 5; - while ready && now.elapsed() < Duration::from_secs(seconds) { - let _ = client - .get(format!( - "http://{}/?query={{__typename}}", - &router_socket_addr - )) - .header("Content-Type", "application/json") - .send() - .await - .and_then(|r| r.error_for_status()) - .map_err(|_| { - ready = false; - }); - tokio::time::sleep(Duration::from_millis(250)).await; - } - - if !ready { - tracing::info!("router stopped successfully"); - } else { - log_err_and_continue(RoverError::new(anyhow!( - "the router was unable to stop", - ))); - } - } - } - }); - } -} - -#[cfg(test)] -mod tests { - use httpmock::MockServer; - use rstest::*; - use speculoos::prelude::*; - - use crate::{ - options::{LicenseAccepter, ProfileOpt}, - utils::client::ClientBuilder, - }; - - use super::*; - - #[rstest] - #[tokio::test] - async fn test_wait_for_startup() { - // GIVEN - // * a mock health endpoint that returns 200 - // * a RouterRunner - let server = MockServer::start(); - let health_mock = server.mock(|when, then| { - when.method("GET").path("/health").query_param("ready", ""); - then.status(200); - }); - - let mut router_runner = RouterRunner::new( - Default::default(), - Default::default(), - PluginOpts { - profile: ProfileOpt { - profile_name: Default::default(), - }, - elv2_license_accepter: LicenseAccepter { - elv2_license_accepted: Some(true), - }, - skip_update: true, - }, - *server.address(), - "".to_string(), - None, - StudioClientConfig::new( - None, - houston::Config::new(None::<&Utf8PathBuf>, None).unwrap(), - false, - ClientBuilder::new(), - Duration::from_secs(3).into(), - ), - None, - ); - - // WHEN waiting for router startup - let res = router_runner.wait_for_startup(Client::new()).await; - - // THEN - // * it succeeds - // * it calls the mock endpoint correctly - assert_that!(res).is_ok(); - health_mock.assert(); - } -} diff --git a/src/command/dev/legacy/schema.rs b/src/command/dev/legacy/schema.rs deleted file mode 100644 index f7f1f1937..000000000 --- a/src/command/dev/legacy/schema.rs +++ /dev/null @@ -1,198 +0,0 @@ -use std::{net::SocketAddr, time::Duration}; - -use anyhow::anyhow; -use apollo_federation_types::config::{SchemaSource, SupergraphConfig}; -use reqwest::Url; - -use rover_client::blocking::StudioClient; - -use crate::options::ProfileOpt; -use crate::{ - command::dev::{ - legacy::{ - netstat::normalize_loopback_urls, protocol::FollowerMessenger, - watcher::SubgraphSchemaWatcher, - }, - SupergraphOpts, - }, - options::OptionalSubgraphOpts, - utils::client::StudioClientConfig, - RoverError, RoverErrorSuggestion, RoverResult, -}; - -impl OptionalSubgraphOpts { - pub fn get_subgraph_watcher( - &self, - router_socket_addr: SocketAddr, - client_config: &StudioClientConfig, - follower_messenger: FollowerMessenger, - ) -> RoverResult { - tracing::info!("checking version"); - follower_messenger.version_check()?; - tracing::info!("checking for existing subgraphs"); - let session_subgraphs = follower_messenger.session_subgraphs()?; - let url = self.prompt_for_url()?; - let normalized_user_urls = normalize_loopback_urls(&url); - let normalized_supergraph_urls = normalize_loopback_urls( - &Url::parse(&format!("http://{}", router_socket_addr)).unwrap(), - ); - - for normalized_user_url in &normalized_user_urls { - for normalized_supergraph_url in &normalized_supergraph_urls { - if normalized_supergraph_url == normalized_user_url { - let mut err = RoverError::new(anyhow!("The subgraph argument `--url {}` conflicts with the supergraph argument `--supergraph-port {}`", &url, normalized_supergraph_url.port().unwrap())); - if session_subgraphs.is_none() { - err.set_suggestion(RoverErrorSuggestion::Adhoc("Set the `--supergraph-port` flag to a different port to start the local supergraph.".to_string())) - } else { - err.set_suggestion(RoverErrorSuggestion::Adhoc("Start your subgraph on a different port and re-run this command with the new `--url`.".to_string())) - } - return Err(err); - } - } - } - - let name = self.prompt_for_name()?; - let schema = self.prompt_for_schema()?; - - if let Some(session_subgraphs) = session_subgraphs { - for (session_subgraph_name, session_subgraph_url) in session_subgraphs { - if session_subgraph_name == name { - return Err(RoverError::new(anyhow!( - "subgraph with name '{}' is already running in this `rover dev` session", - &name - ))); - } - let normalized_session_urls = normalize_loopback_urls(&session_subgraph_url); - for normalized_user_url in &normalized_user_urls { - for normalized_session_url in &normalized_session_urls { - if normalized_session_url == normalized_user_url { - return Err(RoverError::new(anyhow!( - "subgraph with url '{}' is already running in this `rover dev` session", - &url - ))); - } - } - } - } - } - - if let Some(schema) = schema { - SubgraphSchemaWatcher::new_from_file_path( - (name, url), - schema, - follower_messenger, - self.subgraph_retries, - ) - } else { - let client = client_config - .get_builder() - .with_timeout(Duration::from_secs(5)) - .build()?; - SubgraphSchemaWatcher::new_from_url( - (name, url.clone()), - client, - follower_messenger, - self.subgraph_polling_interval, - None, - self.subgraph_retries, - url, - ) - } - } -} - -impl SupergraphOpts { - pub async fn get_subgraph_watchers( - &self, - client_config: &StudioClientConfig, - supergraph_config: Option, - follower_messenger: FollowerMessenger, - polling_interval: u64, - profile_opt: &ProfileOpt, - subgraph_retries: u64, - ) -> RoverResult>> { - if supergraph_config.is_none() { - return Ok(None); - } - - tracing::info!("checking version"); - follower_messenger.version_check()?; - - let client = client_config - .get_builder() - .with_timeout(Duration::from_secs(5)) - .build()?; - let mut studio_client: Option = None; - - // WARNING: from here on I took the asynch branch's code; should be validated against main - let mut res = Vec::new(); - for (yaml_subgraph_name, subgraph_config) in supergraph_config.unwrap().into_iter() { - let routing_url = subgraph_config - .routing_url - .map(|url_str| Url::parse(&url_str).map_err(RoverError::from)) - .transpose()?; - let elem = match subgraph_config.schema { - SchemaSource::File { file } => { - let routing_url = routing_url.ok_or_else(|| { - anyhow!("`routing_url` must be set when using a local schema file") - })?; - - SubgraphSchemaWatcher::new_from_file_path( - (yaml_subgraph_name, routing_url), - file, - follower_messenger.clone(), - subgraph_retries, - ) - } - SchemaSource::SubgraphIntrospection { - subgraph_url, - introspection_headers, - } => SubgraphSchemaWatcher::new_from_url( - (yaml_subgraph_name, subgraph_url.clone()), - client.clone(), - follower_messenger.clone(), - polling_interval, - introspection_headers, - subgraph_retries, - subgraph_url, - ), - SchemaSource::Sdl { sdl } => { - let routing_url = routing_url.ok_or_else(|| { - anyhow!("`routing_url` must be set when providing SDL directly") - })?; - SubgraphSchemaWatcher::new_from_sdl( - (yaml_subgraph_name, routing_url), - sdl, - follower_messenger.clone(), - subgraph_retries, - ) - } - SchemaSource::Subgraph { - graphref, - subgraph: graphos_subgraph_name, - } => { - let studio_client = if let Some(studio_client) = studio_client.as_ref() { - studio_client - } else { - let client = client_config.get_authenticated_client(profile_opt)?; - studio_client = Some(client); - studio_client.as_ref().unwrap() - }; - - SubgraphSchemaWatcher::new_from_graph_ref( - &graphref, - graphos_subgraph_name, - routing_url, - yaml_subgraph_name, - follower_messenger.clone(), - studio_client, - subgraph_retries, - ) - .await - } - }; - res.push(elem?); - } - Ok(Some(res)) - } -} diff --git a/src/command/dev/legacy/watcher.rs b/src/command/dev/legacy/watcher.rs deleted file mode 100644 index a9a800ef9..000000000 --- a/src/command/dev/legacy/watcher.rs +++ /dev/null @@ -1,344 +0,0 @@ -use std::path::PathBuf; -use std::str::FromStr; -use std::{collections::HashMap, time::Duration}; - -use anyhow::{anyhow, Context}; -use apollo_federation_types::javascript::SubgraphDefinition; -use camino::{Utf8Path, Utf8PathBuf}; -use reqwest::Client; -use tokio::time::MissedTickBehavior::Delay; -use url::Url; - -use rover_client::blocking::StudioClient; -use rover_client::operations::subgraph::fetch; -use rover_client::operations::subgraph::fetch::SubgraphFetchInput; -use rover_client::shared::GraphRef; -use rover_std::{errln, Fs}; - -use crate::{ - command::dev::legacy::{ - introspect::{IntrospectRunnerKind, UnknownIntrospectRunner}, - protocol::{FollowerMessenger, SubgraphKey}, - }, - RoverError, RoverErrorSuggestion, RoverResult, -}; - -#[derive(Debug)] -pub struct SubgraphSchemaWatcher { - schema_watcher_kind: SubgraphSchemaWatcherKind, - subgraph_key: SubgraphKey, - message_sender: FollowerMessenger, - subgraph_retries: u64, - subgraph_retry_countdown: u64, -} - -impl SubgraphSchemaWatcher { - pub fn new_from_file_path

( - subgraph_key: SubgraphKey, - path: P, - message_sender: FollowerMessenger, - subgraph_retries: u64, - ) -> RoverResult - where - P: AsRef, - { - Ok(Self { - schema_watcher_kind: SubgraphSchemaWatcherKind::File(path.as_ref().to_path_buf()), - subgraph_key, - message_sender, - subgraph_retries, - subgraph_retry_countdown: 0, - }) - } - - pub fn new_from_url( - subgraph_key: SubgraphKey, - client: Client, - message_sender: FollowerMessenger, - polling_interval: u64, - headers: Option>, - subgraph_retries: u64, - subgraph_url: Url, - ) -> RoverResult { - let headers = headers.map(|header_map| header_map.into_iter().collect()); - let introspect_runner = IntrospectRunnerKind::Unknown(UnknownIntrospectRunner::new( - subgraph_url, - client, - headers, - )); - Self::new_from_introspect_runner( - subgraph_key, - introspect_runner, - message_sender, - polling_interval, - subgraph_retries, - ) - } - - pub fn new_from_sdl( - subgraph_key: SubgraphKey, - sdl: String, - message_sender: FollowerMessenger, - subgraph_retries: u64, - ) -> RoverResult { - Ok(Self { - schema_watcher_kind: SubgraphSchemaWatcherKind::Once(sdl), - subgraph_key, - message_sender, - subgraph_retries, - subgraph_retry_countdown: 0, - }) - } - - pub async fn new_from_graph_ref( - graph_ref: &str, - graphos_subgraph_name: String, - routing_url: Option, - yaml_subgraph_name: String, - message_sender: FollowerMessenger, - client: &StudioClient, - subgraph_retries: u64, - ) -> RoverResult { - // given a graph_ref and subgraph, run subgraph fetch to - // obtain SDL and add it to subgraph_definition. - let response = fetch::run( - SubgraphFetchInput { - graph_ref: GraphRef::from_str(graph_ref)?, - subgraph_name: graphos_subgraph_name.clone(), - }, - client, - ) - .await - .map_err(RoverError::from)?; - let routing_url = match (routing_url, response.sdl.r#type) { - (Some(routing_url), _) => routing_url, - ( - None, - rover_client::shared::SdlType::Subgraph { - routing_url: Some(graph_registry_routing_url), - }, - ) => graph_registry_routing_url.parse().context(format!( - "Could not parse graph registry routing url {}", - graph_registry_routing_url - ))?, - (None, _) => { - return Err(RoverError::new(anyhow!( - "Could not find routing URL in GraphOS for subgraph {graphos_subgraph_name}" - )) - .with_suggestion(RoverErrorSuggestion::AddRoutingUrlToSupergraphYaml) - .with_suggestion( - RoverErrorSuggestion::PublishSubgraphWithRoutingUrl { - subgraph_name: yaml_subgraph_name, - graph_ref: graph_ref.to_string(), - }, - )); - } - }; - Self::new_from_sdl( - (yaml_subgraph_name, routing_url), - response.sdl.contents, - message_sender, - subgraph_retries, - ) - } - - pub fn new_from_introspect_runner( - subgraph_key: SubgraphKey, - introspect_runner: IntrospectRunnerKind, - message_sender: FollowerMessenger, - polling_interval: u64, - subgraph_retries: u64, - ) -> RoverResult { - Ok(Self { - schema_watcher_kind: SubgraphSchemaWatcherKind::Introspect( - introspect_runner, - polling_interval, - ), - subgraph_key, - message_sender, - subgraph_retries, - subgraph_retry_countdown: 0, - }) - } - - pub async fn get_subgraph_definition_and_maybe_new_runner( - &self, - retry_period: Duration, - ) -> RoverResult<(SubgraphDefinition, Option)> { - let (name, url) = self.subgraph_key.clone(); - let (sdl, refresher) = match &self.schema_watcher_kind { - SubgraphSchemaWatcherKind::Introspect(introspect_runner_kind, polling_interval) => { - match introspect_runner_kind { - IntrospectRunnerKind::Graph(graph_runner) => { - let sdl = graph_runner.run().await?; - (sdl, None) - } - IntrospectRunnerKind::Subgraph(subgraph_runner) => { - let sdl = subgraph_runner.run().await?; - (sdl, None) - } - IntrospectRunnerKind::Unknown(unknown_runner) => { - let (sdl, specific_runner) = unknown_runner.run(retry_period).await?; - ( - sdl, - Some(SubgraphSchemaWatcherKind::Introspect( - specific_runner, - *polling_interval, - )), - ) - } - } - } - SubgraphSchemaWatcherKind::File(file_path) => { - let sdl = Fs::read_file(file_path)?; - (sdl, None) - } - SubgraphSchemaWatcherKind::Once(sdl) => (sdl.clone(), None), - }; - - let subgraph_definition = SubgraphDefinition { - name, - url: url.to_string(), - sdl, - }; - - Ok((subgraph_definition, refresher)) - } - - async fn update_subgraph( - &mut self, - last_message: Option<&String>, - retry_period: Duration, - ) -> RoverResult> { - let maybe_update_message = match self - .get_subgraph_definition_and_maybe_new_runner(retry_period) - .await - { - Ok((subgraph_definition, maybe_new_refresher)) => { - if let Some(new_refresher) = maybe_new_refresher { - self.set_schema_refresher(new_refresher); - } - match last_message { - Some(last_message) => { - if &subgraph_definition.sdl != last_message { - if self.subgraph_retry_countdown < self.subgraph_retries { - eprintln!( - "subgraph connectivity restored for {}", - self.subgraph_key.0 - ) - } - self.message_sender.update_subgraph(&subgraph_definition)?; - } - } - None => { - self.message_sender.add_subgraph(&subgraph_definition)?; - } - } - self.subgraph_retry_countdown = self.subgraph_retries; - Some(subgraph_definition.sdl) - } - Err(e) => { - // `subgraph-retries` can be set by the user away from the default value of 0, - // this defaults to Rover's current behaviour. - // - // If a user does set this value to a non-zero one, and we get a non-retryable error - // from one of our subgraphs, we'll retain the old schema we had and continue - // operation. This will happen until the countdown hits 0 at which point the - // subgraph will be disconnected from the supergraph. - // - // Every time we successfully communicate with the subgraph we set the countdown - // back to the maximum value. - // - if self.subgraph_retry_countdown > 0 { - self.subgraph_retry_countdown -= 1; - errln!("error detected communicating with subgraph '{}', schema changes will not be reflected.\nWill retry but subgraph logs should be inspected", &self.subgraph_key.0); - errln!("{:}", e); - Some(e.to_string()) - } else { - eprintln!( - "retries exhausted for subgraph {}. To add more run `rover dev` with the --subgraph-retries flag.", - &self.subgraph_key.0, - ); - self.message_sender.remove_subgraph(&self.subgraph_key.0)?; - None - } - } - }; - - Ok(maybe_update_message) - } - - /// Start checking for subgraph updates and sending them to the main process. - /// - /// This function will block forever for `SubgraphSchemaWatcherKind` that poll for changes—so it - /// should be started in a separate thread. - pub async fn watch_subgraph_for_changes(&mut self, retry_period: Duration) -> RoverResult<()> { - let mut last_message = None; - match self.schema_watcher_kind.clone() { - SubgraphSchemaWatcherKind::Introspect(introspect_runner_kind, polling_interval) => { - let endpoint = introspect_runner_kind.endpoint(); - eprintln!( - "polling {} every {} {}", - &endpoint, - polling_interval, - match polling_interval { - 1 => "second", - _ => "seconds", - } - ); - let mut interval = tokio::time::interval(Duration::from_secs(polling_interval)); - interval.set_missed_tick_behavior(Delay); - loop { - last_message = self - .update_subgraph(last_message.as_ref(), retry_period) - .await?; - interval.tick().await; - } - } - SubgraphSchemaWatcherKind::File(path) => { - // populate the schema for the first time (last_message is always None to start) - last_message = self - .update_subgraph(last_message.as_ref(), retry_period) - .await?; - - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - - let watch_path: PathBuf = path.as_path().into(); - - Fs::watch_file(watch_path, tx); - - while let Some(res) = rx.recv().await { - match res { - Ok(()) => (), - Err(err) => return Err(anyhow::Error::from(err).into()), - } - last_message = self - .update_subgraph(last_message.as_ref(), retry_period) - .await?; - } - } - SubgraphSchemaWatcherKind::Once(_) => { - self.update_subgraph(None, retry_period).await?; - } - } - Ok(()) - } - - pub fn set_schema_refresher(&mut self, new_refresher: SubgraphSchemaWatcherKind) { - self.schema_watcher_kind = new_refresher; - } - - pub fn get_name(&self) -> String { - self.subgraph_key.0.to_string() - } -} - -#[derive(Debug, Clone)] -pub enum SubgraphSchemaWatcherKind { - /// Poll an endpoint via introspection - Introspect(IntrospectRunnerKind, u64), - /// Watch a file on disk - File(Utf8PathBuf), - /// Don't ever update, schema is only pulled once - Once(String), -} diff --git a/src/command/dev/mod.rs b/src/command/dev/mod.rs index c862cad28..2ddb3bb7e 100644 --- a/src/command/dev/mod.rs +++ b/src/command/dev/mod.rs @@ -1,7 +1,17 @@ -use std::net::IpAddr; +#![warn(missing_docs)] use apollo_federation_types::config::FederationVersion; use camino::Utf8PathBuf; + +#[cfg(feature = "composition-js")] +mod do_dev; +#[cfg(not(feature = "composition-js"))] +mod no_dev; +#[cfg(feature = "composition-js")] +mod router; + +use std::net::IpAddr; + use clap::Parser; use derive_getters::Getters; use rover_client::shared::GraphRef; @@ -12,12 +22,9 @@ use crate::{ utils::parsers::FileDescriptorType, }; -#[cfg(not(feature = "dev-next"))] -pub mod legacy; -#[cfg(feature = "dev-next")] -pub mod next; - #[derive(Debug, Serialize, Parser)] +/// Command that represents running a local router, and composition to test local changes to +/// subgraphs. pub struct Dev { #[clap(flatten)] pub(crate) opts: DevOpts, diff --git a/src/command/dev/legacy/no_dev.rs b/src/command/dev/no_dev.rs similarity index 100% rename from src/command/dev/legacy/no_dev.rs rename to src/command/dev/no_dev.rs diff --git a/src/command/dev/next/router/binary.rs b/src/command/dev/router/binary.rs similarity index 100% rename from src/command/dev/next/router/binary.rs rename to src/command/dev/router/binary.rs diff --git a/src/command/dev/next/router/config/mod.rs b/src/command/dev/router/config/mod.rs similarity index 96% rename from src/command/dev/next/router/config/mod.rs rename to src/command/dev/router/config/mod.rs index a451084d6..843a7e3d0 100644 --- a/src/command/dev/next/router/config/mod.rs +++ b/src/command/dev/router/config/mod.rs @@ -140,11 +140,13 @@ impl RunRouterConfig { // file // // Development note: any future overrides should go into this default config - let mut default_config = RunRouterConfigFinal::default(); - default_config.address = self.state.router_address; + let default_config = RunRouterConfigFinal { + address: self.state.router_address, + ..Default::default() + }; match path { - Some(path) => match read_file_impl.read_file(&path).await { + Some(path) => match read_file_impl.read_file(path).await { Ok(contents) => { let yaml = serde_yaml::from_str(&contents).map_err(|err| { ReadRouterConfigError::Deserialization { diff --git a/src/command/dev/next/router/config/parser.rs b/src/command/dev/router/config/parser.rs similarity index 97% rename from src/command/dev/next/router/config/parser.rs rename to src/command/dev/router/config/parser.rs index 5b1dd7a2d..63be658b0 100644 --- a/src/command/dev/next/router/config/parser.rs +++ b/src/command/dev/router/config/parser.rs @@ -143,9 +143,8 @@ mod tests { use rstest::rstest; use speculoos::prelude::*; - use crate::command::dev::next::router::config::RouterAddress; - use super::RouterConfigParser; + use crate::command::dev::router::config::RouterAddress; #[rstest] #[case("127.0.0.1", SocketAddr::from_str("127.0.0.1:80").unwrap())] @@ -234,7 +233,7 @@ health_check: r#"--- "# }; - let config_yaml = serde_yaml::from_str(&config_yaml_str)?; + let config_yaml = serde_yaml::from_str(config_yaml_str)?; let router_config = RouterConfigParser { yaml: &config_yaml, address: SocketAddr::from_str("127.0.0.1:80")?, @@ -251,7 +250,7 @@ health_check: r#"--- "# }; - let config_yaml = serde_yaml::from_str(&config_yaml_str)?; + let config_yaml = serde_yaml::from_str(config_yaml_str)?; let router_config = RouterConfigParser { yaml: &config_yaml, address: SocketAddr::from_str("127.0.0.1:80")?, @@ -268,7 +267,7 @@ supergraph: path: /custom-path "# }; - let config_yaml = serde_yaml::from_str(&config_yaml_str)?; + let config_yaml = serde_yaml::from_str(config_yaml_str)?; let router_config = RouterConfigParser { yaml: &config_yaml, diff --git a/src/command/dev/next/router/config/remote.rs b/src/command/dev/router/config/remote.rs similarity index 100% rename from src/command/dev/next/router/config/remote.rs rename to src/command/dev/router/config/remote.rs diff --git a/src/command/dev/next/router/config/state.rs b/src/command/dev/router/config/state.rs similarity index 100% rename from src/command/dev/next/router/config/state.rs rename to src/command/dev/router/config/state.rs diff --git a/src/command/dev/next/router/hot_reload.rs b/src/command/dev/router/hot_reload.rs similarity index 99% rename from src/command/dev/next/router/hot_reload.rs rename to src/command/dev/router/hot_reload.rs index ec0617207..eeaa95a12 100644 --- a/src/command/dev/next/router/hot_reload.rs +++ b/src/command/dev/router/hot_reload.rs @@ -6,15 +6,13 @@ use std::{ use buildstructor::Builder; use camino::Utf8PathBuf; use futures::StreamExt; +use rover_std::{debugln, errln, infoln}; use serde_yaml::Value; use tap::TapFallible; use tracing::debug; -use crate::{subtask::SubtaskHandleStream, utils::effect::write_file::WriteFile}; - use super::config::{parser::RouterConfigParser, RouterConfig}; - -use rover_std::{debugln, errln, infoln}; +use crate::{subtask::SubtaskHandleStream, utils::effect::write_file::WriteFile}; pub enum RouterUpdateEvent { SchemaChanged { schema: String }, @@ -194,10 +192,11 @@ where mod tests { use std::net::{IpAddr, Ipv4Addr}; - use super::*; use rstest::{fixture, rstest}; use speculoos::prelude::*; + use super::*; + #[fixture] fn router_config() -> &'static str { indoc::indoc! { r#" @@ -250,7 +249,7 @@ headers: println!("{config}"); println!("{router_config_expectation}"); - &config.to_string() == router_config_expectation + config.to_string() == router_config_expectation }); } } diff --git a/src/command/dev/next/router/install.rs b/src/command/dev/router/install.rs similarity index 95% rename from src/command/dev/next/router/install.rs rename to src/command/dev/router/install.rs index b803582e5..3fed3c66c 100644 --- a/src/command/dev/next/router/install.rs +++ b/src/command/dev/router/install.rs @@ -3,14 +3,13 @@ use async_trait::async_trait; use camino::{Utf8Path, Utf8PathBuf}; use semver::Version; +use super::binary::RouterBinary; use crate::{ command::{install::Plugin, Install}, options::LicenseAccepter, utils::{client::StudioClientConfig, effect::install::InstallBinary}, }; -use super::binary::RouterBinary; - #[derive(thiserror::Error, Debug)] #[error("Failed to install the router")] pub enum InstallRouterError { @@ -94,7 +93,7 @@ fn version_from_path(path: &Utf8Path) -> Result { #[cfg(test)] mod tests { - use std::{str::FromStr, time::Duration}; + use std::{env, str::FromStr, time::Duration}; use anyhow::Result; use apollo_federation_types::config::RouterVersion; @@ -109,6 +108,7 @@ mod tests { use speculoos::prelude::*; use tracing_test::traced_test; + use super::InstallRouter; use crate::{ options::LicenseAccepter, utils::{ @@ -117,8 +117,6 @@ mod tests { }, }; - use super::InstallRouter; - #[fixture] #[once] fn http_server() -> MockServer { @@ -205,7 +203,7 @@ mod tests { let mut archive = tar::Builder::new(enc); let contents = b"router"; let mut header = tar::Header::new_gnu(); - header.set_path("dist/router")?; + header.set_path(format!("{}{}", "dist/router", env::consts::EXE_SUFFIX))?; header.set_size(contents.len().try_into().unwrap()); header.set_cksum(); archive.append(&header, &contents[..]).unwrap(); @@ -237,9 +235,11 @@ mod tests { let subject = assert_that!(binary).is_ok().subject; assert_that!(subject.version()).is_equal_to(&Version::from_str("1.57.1")?); - let installed_binary_path = override_install_path - .path() - .join(".rover/bin/router-v1.57.1"); + let installed_binary_path = override_install_path.path().join(format!( + "{}{}", + ".rover/bin/router-v1.57.1", + env::consts::EXE_SUFFIX + )); assert_that!(subject.exe()) .is_equal_to(&Utf8PathBuf::from_path_buf(installed_binary_path.clone()).unwrap()); assert_that!(installed_binary_path.exists()).is_equal_to(true); diff --git a/src/command/dev/next/router/mod.rs b/src/command/dev/router/mod.rs similarity index 100% rename from src/command/dev/next/router/mod.rs rename to src/command/dev/router/mod.rs diff --git a/src/command/dev/next/router/run.rs b/src/command/dev/router/run.rs similarity index 98% rename from src/command/dev/next/router/run.rs rename to src/command/dev/router/run.rs index 8e58e400c..82ce79ae3 100644 --- a/src/command/dev/next/router/run.rs +++ b/src/command/dev/router/run.rs @@ -4,7 +4,7 @@ use apollo_federation_types::config::RouterVersion; use camino::{Utf8Path, Utf8PathBuf}; use futures::{ stream::{self, BoxStream}, - StreamExt, TryFutureExt, + StreamExt, }; use houston::Credential; use rover_client::{ @@ -24,9 +24,9 @@ use super::{ watchers::router_config::RouterConfigWatcher, }; use crate::{ - command::dev::next::{ + command::dev::{ router::hot_reload::{HotReloadConfig, HotReloadConfigOverrides}, - FileWatcher, + router::watchers::file::FileWatcher, }, composition::events::CompositionEvent, options::LicenseAccepter, @@ -55,7 +55,7 @@ impl Default for RunRouter { } impl RunRouter { - pub async fn install( + pub async fn install( self, router_version: RouterVersion, studio_client_config: StudioClientConfig, @@ -252,7 +252,7 @@ impl RunRouter { None => { return Err(RunRouterBinaryError::Internal { dependency: "Router Config Validation".to_string(), - err: format!("Router Config passed validation incorrectly, healthchecks are enabled but missing an endpoint"), + err: String::from("Router Config passed validation incorrectly, healthchecks are enabled but missing an endpoint") }) } }; @@ -396,7 +396,7 @@ mod state { use tokio::task::AbortHandle; use tokio_stream::wrappers::UnboundedReceiverStream; - use crate::command::dev::next::router::{ + use crate::command::dev::router::{ binary::{RouterBinary, RouterLog, RunRouterBinaryError}, config::{remote::RemoteRouterConfig, RouterConfigFinal}, hot_reload::HotReloadEvent, diff --git a/src/command/dev/next/router/watchers/file.rs b/src/command/dev/router/watchers/file.rs similarity index 99% rename from src/command/dev/next/router/watchers/file.rs rename to src/command/dev/router/watchers/file.rs index 297dfbbd5..4e3abb5a6 100644 --- a/src/command/dev/next/router/watchers/file.rs +++ b/src/command/dev/router/watchers/file.rs @@ -61,13 +61,14 @@ impl FileWatcher { #[cfg(test)] mod tests { - use futures::StreamExt; - use speculoos::assert_that; - use speculoos::option::OptionAssertions; use std::fs::OpenOptions; use std::io::Write; use std::time::Duration; + use futures::StreamExt; + use speculoos::assert_that; + use speculoos::option::OptionAssertions; + use super::*; #[tokio::test] diff --git a/src/command/dev/next/router/watchers/mod.rs b/src/command/dev/router/watchers/mod.rs similarity index 100% rename from src/command/dev/next/router/watchers/mod.rs rename to src/command/dev/router/watchers/mod.rs diff --git a/src/command/dev/next/router/watchers/router_config.rs b/src/command/dev/router/watchers/router_config.rs similarity index 87% rename from src/command/dev/next/router/watchers/router_config.rs rename to src/command/dev/router/watchers/router_config.rs index a5db3c816..52291304f 100644 --- a/src/command/dev/next/router/watchers/router_config.rs +++ b/src/command/dev/router/watchers/router_config.rs @@ -2,12 +2,12 @@ use futures::StreamExt; use tap::TapFallible; use crate::{ - command::dev::next::router::{config::RouterConfig, hot_reload::RouterUpdateEvent}, + command::dev::router::{ + config::RouterConfig, hot_reload::RouterUpdateEvent, watchers::file::FileWatcher, + }, subtask::SubtaskHandleUnit, }; -use crate::command::dev::next::FileWatcher; - /// Watches for router config changes pub struct RouterConfigWatcher { file_watcher: FileWatcher, diff --git a/src/command/graph/mod.rs b/src/command/graph/mod.rs index cb7747b18..15b9c2f78 100644 --- a/src/command/graph/mod.rs +++ b/src/command/graph/mod.rs @@ -6,16 +6,13 @@ mod lint; mod publish; use clap::Parser; -#[cfg(not(feature = "dev-next"))] -pub use introspect::Introspect; +use rover_client::shared::GitContext; use serde::Serialize; use crate::options::OutputOpts; use crate::utils::client::StudioClientConfig; use crate::{RoverOutput, RoverResult}; -use rover_client::shared::GitContext; - #[derive(Debug, Serialize, Parser)] pub struct Graph { #[clap(subcommand)] diff --git a/src/command/subgraph/mod.rs b/src/command/subgraph/mod.rs index 2f487af8f..d4853b1f4 100644 --- a/src/command/subgraph/mod.rs +++ b/src/command/subgraph/mod.rs @@ -6,18 +6,14 @@ mod lint; mod list; mod publish; -#[cfg(not(feature = "dev-next"))] -pub use introspect::Introspect; - use clap::Parser; +use rover_client::shared::GitContext; use serde::Serialize; use crate::options::OutputOpts; use crate::utils::client::StudioClientConfig; use crate::{RoverOutput, RoverResult}; -use rover_client::shared::GitContext; - #[derive(Debug, Serialize, Parser)] pub struct Subgraph { #[clap(subcommand)] diff --git a/src/command/supergraph/compose/do_compose.rs b/src/command/supergraph/compose/do_compose.rs index a5089d454..ee489a631 100644 --- a/src/command/supergraph/compose/do_compose.rs +++ b/src/command/supergraph/compose/do_compose.rs @@ -1,74 +1,21 @@ -// TODO: remove once we're no longer using the composition-rewrite feature flag -#[allow(unused_imports)] -use std::{ - env::current_dir, - fs::File, - io::{stdin, Read, Write}, - process::Command, - str, -}; +use std::io::stdin; -use anyhow::{anyhow, Context}; - -// TODO: remove once we're no longer using the composition-rewrite feature flag -#[cfg(not(feature = "dev-next"))] -use apollo_federation_types::config::FederationVersion::LatestFedTwo; -use apollo_federation_types::{ - config::{FederationVersion, PluginVersion, SupergraphConfig}, - rover::BuildResult, -}; +use apollo_federation_types::config::FederationVersion; use camino::Utf8PathBuf; use clap::{Args, Parser}; use derive_getters::Getters; -use rover_client::{shared::GraphRef, RoverClientError}; -use rover_std::warnln; -use semver::Version; +use rover_client::shared::GraphRef; use serde::Serialize; -#[cfg(feature = "composition-rewrite")] use tower::ServiceExt; -// TODO: remove once we're no longer using the composition-rewrite feature flag -#[allow(unused_imports)] -use tempfile::tempdir; - -// TODO: remove once we're no longer using the composition-rewrite feature flag -#[allow(unused_imports)] -use crate::{ - command::{ - install::{Install, Plugin}, - supergraph::compose::CompositionOutput, - }, - composition::{ - events::CompositionEvent, - runner::Runner, - supergraph::{ - binary::{OutputTarget, SupergraphBinary}, - config::{ - full::{ - FullyResolvedSubgraph, FullyResolvedSubgraphs, FullyResolvedSupergraphConfig, - }, - resolver::SupergraphConfigResolver, - unresolved::UnresolvedSupergraphConfig, - }, - install::InstallSupergraph, - version::SupergraphVersion, - }, - }, - options::PluginOpts, - utils::{ - client::StudioClientConfig, - effect::{ - exec::TokioCommand, - install::InstallBinary, - read_file::FsReadFile, - write_file::{FsWriteFile, WriteFile}, - }, - expansion::expand, - parsers::FileDescriptorType, - supergraph_config::{expand_supergraph_yaml, get_supergraph_config, RemoteSubgraphs}, - }, - RoverError, RoverErrorSuggestion, RoverOutput, RoverResult, -}; +use crate::composition::supergraph::config::resolver::SubgraphPrompt; +use crate::options::PluginOpts; +use crate::utils::client::StudioClientConfig; +use crate::utils::effect::exec::TokioCommand; +use crate::utils::effect::read_file::FsReadFile; +use crate::utils::effect::write_file::FsWriteFile; +use crate::utils::parsers::FileDescriptorType; +use crate::{RoverOutput, RoverResult}; #[derive(Debug, Serialize, Parser)] pub struct Compose { @@ -111,53 +58,6 @@ pub struct SupergraphComposeOpts { } impl Compose { - #[cfg(not(feature = "dev-next"))] - pub fn new(compose_opts: PluginOpts) -> Self { - Self { - opts: SupergraphComposeOpts { - plugin_opts: compose_opts, - federation_version: Some(LatestFedTwo), - supergraph_config_source: SupergraphConfigSource { - supergraph_yaml: Some(FileDescriptorType::File("RAM".into())), - graph_ref: None, - }, - }, - } - } - - pub(crate) async fn maybe_install_supergraph( - &self, - override_install_path: Option, - client_config: StudioClientConfig, - federation_version: FederationVersion, - ) -> RoverResult { - let plugin = Plugin::Supergraph(federation_version.clone()); - if federation_version.is_fed_two() { - self.opts - .plugin_opts - .elv2_license_accepter - .require_elv2_license(&client_config)?; - } - - // and create our plugin that we may need to install from it - let install_command = Install { - force: false, - plugin: Some(plugin), - elv2_license_accepter: self.opts.plugin_opts.elv2_license_accepter, - }; - - // maybe do the install, maybe find a pre-existing installation, maybe fail - let plugin_exe = install_command - .get_versioned_plugin( - override_install_path, - client_config, - self.opts.plugin_opts.skip_update, - ) - .await?; - Ok(plugin_exe) - } - - #[cfg(feature = "composition-rewrite")] pub async fn run( &self, override_install_path: Option, @@ -213,6 +113,7 @@ impl Compose { resolve_introspect_subgraph_factory, fetch_remote_subgraph_factory, self.opts.federation_version.clone(), + None::<&SubgraphPrompt>, ) .await? .install_supergraph_binary( @@ -233,226 +134,4 @@ impl Compose { Ok(RoverOutput::CompositionResult(composition_success.into())) } - - #[cfg(not(feature = "composition-rewrite"))] - pub async fn run( - &self, - override_install_path: Option, - client_config: StudioClientConfig, - output_file: Option, - ) -> RoverResult { - let mut supergraph_config = get_supergraph_config( - &self.opts.supergraph_config_source.graph_ref, - &self.opts.supergraph_config_source.supergraph_yaml.clone(), - self.opts.federation_version.as_ref(), - client_config.clone(), - &self.opts.plugin_opts.profile, - true, - ) - .await? - .ok_or_else(|| anyhow!("error getting supergraph config"))?; - - self.compose( - override_install_path, - client_config, - &mut supergraph_config, - output_file, - ) - .await - } - - pub async fn compose( - &self, - override_install_path: Option, - client_config: StudioClientConfig, - supergraph_config: &mut SupergraphConfig, - output_file: Option, - ) -> RoverResult { - let output = self - .exec( - override_install_path, - client_config, - supergraph_config, - output_file, - ) - .await?; - Ok(RoverOutput::CompositionResult(output)) - } - - pub async fn exec( - &self, - override_install_path: Option, - client_config: StudioClientConfig, - supergraph_config: &mut SupergraphConfig, - output_file: Option, - ) -> RoverResult { - let mut output_file = output_file; - // first, grab the _actual_ federation version from the config we just resolved - // (this will always be `Some` as long as we have created with `resolve_supergraph_yaml` so it is safe to unwrap) - let federation_version = supergraph_config.get_federation_version().unwrap(); - - let exe = self - .maybe_install_supergraph( - override_install_path, - client_config, - federation_version.clone(), - ) - .await?; - - // _then_, overwrite the federation_version with _only_ the major version - // before sending it to the supergraph plugin. - // we do this because the supergraph binaries _only_ check if the major version is correct - // and we may want to introduce other semver things in the future. - // this technique gives us forward _and_ backward compatibility - // because the supergraph plugin itself only has to parse "federation_version: 1" or "federation_version: 2" - let v = match federation_version.get_major_version() { - 0 | 1 => FederationVersion::LatestFedOne, - 2 => FederationVersion::LatestFedTwo, - _ => unreachable!("This version of Rover does not support major versions of federation other than 1 and 2.") - }; - supergraph_config.set_federation_version(v); - let num_subgraphs = supergraph_config.get_subgraph_definitions()?.len(); - let supergraph_config_yaml = serde_yaml::to_string(&supergraph_config)?; - let dir = tempfile::Builder::new().prefix("supergraph").tempdir()?; - tracing::debug!("temp dir created at {}", dir.path().display()); - let yaml_path = Utf8PathBuf::try_from(dir.path().join("config.yml"))?; - let mut f = File::create(&yaml_path)?; - f.write_all(supergraph_config_yaml.as_bytes())?; - f.sync_all()?; - tracing::debug!("config file written to {}", &yaml_path); - - let federation_version = Self::extract_federation_version(&exe)?; - let exact_version = federation_version - .get_exact() - // This should be impossible to get to because we convert to a FederationVersion a few - // lines above and so _should_ have an exact version - .ok_or(RoverError::new(anyhow!( - "failed to get exact Federation version" - )))?; - - eprintln!( - "composing supergraph with Federation {}", - &federation_version.get_tarball_version() - ); - - // When the `--output` flag is used, we need a supergraph binary version that is at least - // v2.9.0. We ignore that flag for composition when we have anything less than that - if output_file.is_some() - && (exact_version.major < 2 || (exact_version.major == 2 && exact_version.minor < 9)) - { - warnln!("ignoring `--output` because it is not supported in this version of the dependent binary, `supergraph`: {}. Upgrade to Federation 2.9.0 or greater to install a version of the binary that supports it.", federation_version); - output_file = None; - } - - // Whether we use stdout or a file dependson whether the the `--output` option was used - let content = match output_file { - // If it was, we use a file in the supergraph binary; this cuts down the overall time - // it takes to do composition when we're working on really large compositions, but it - // carries with it the assumption that stdout is superfluous - Some(filepath) => { - Command::new(&exe) - .args(["compose", yaml_path.as_ref(), filepath.as_ref()]) - .output() - .context("Failed to execute command")?; - - let mut composition_file = std::fs::File::open(&filepath).unwrap(); - let mut content: String = String::new(); - composition_file.read_to_string(&mut content).unwrap(); - content - } - // When we aren't using `--output`, we dump the composition directly to stdout - None => { - let output = Command::new(&exe) - .args(["compose", yaml_path.as_ref()]) - .output() - .context("Failed to execute command")?; - - let content = str::from_utf8(&output.stdout) - .with_context(|| format!("Could not parse output of `{} compose`", &exe))?; - content.to_string() - } - }; - - // Make sure the composition is well-formed - let composition = match serde_json::from_str::(&content) { - Ok(res) => res, - Err(err) => { - return Err(anyhow!("{}", err)) - .with_context(|| anyhow!("{} compose output: {}", &exe, content)) - .with_context(|| anyhow!("Output from `{} compose` was malformed.", &exe)) - .map_err(|e| { - let mut error = RoverError::new(e); - error.set_suggestion(RoverErrorSuggestion::SubmitIssue); - error - }) - } - }; - - match composition { - Ok(build_output) => Ok(CompositionOutput { - hints: build_output.hints, - supergraph_sdl: build_output.supergraph_sdl, - federation_version: Some(format_version(federation_version.to_string())), - }), - Err(build_errors) => Err(RoverError::from(RoverClientError::BuildErrors { - source: build_errors, - num_subgraphs, - })), - } - } - - /// Extracts the Federation Version from the executable - fn extract_federation_version(exe: &Utf8PathBuf) -> Result { - let file_name = exe.file_name().unwrap(); - let without_exe = file_name.strip_suffix(".exe").unwrap_or(file_name); - let version = match Version::parse( - without_exe - .strip_prefix("supergraph-v") - .unwrap_or(without_exe), - ) { - Ok(version) => version, - Err(err) => return Err(RoverError::new(err)), - }; - - match version.major { - 0 | 1 => Ok(FederationVersion::ExactFedOne(version)), - 2 => Ok(FederationVersion::ExactFedTwo(version)), - _ => Err(RoverError::new(anyhow!("unsupported Federation version"))), - } - } -} - -/// Format the a Version string (coming from an exact version, which includes a `=` rather than a -/// `v`) for readability -fn format_version(version: String) -> String { - let unformatted = &version[1..]; - let mut formatted = unformatted.to_string(); - formatted.insert(0, 'v'); - formatted -} - -#[cfg(test)] -mod tests { - use rstest::rstest; - use speculoos::assert_that; - - use super::*; - - #[rstest] - #[case::simple_binary("a/b/c/d/supergraph-v2.8.5", "v2.8.5")] - #[case::simple_windows_binary("a/b/supergraph-v2.9.1.exe", "v2.9.1")] - #[case::complicated_semver( - "a/b/supergraph-v1.2.3-SNAPSHOT.123+asdf", - "v1.2.3-SNAPSHOT.123+asdf" - )] - #[case::complicated_semver_windows( - "a/b/supergraph-v1.2.3-SNAPSHOT.123+asdf.exe", - "v1.2.3-SNAPSHOT.123+asdf" - )] - fn it_can_extract_a_version_correctly(#[case] file_path: &str, #[case] expected_value: &str) { - let mut fake_path = Utf8PathBuf::new(); - fake_path.push(file_path); - let result = Compose::extract_federation_version(&fake_path).unwrap(); - assert_that(&result).matches(|f| format_version(f.to_string()) == expected_value); - } } diff --git a/src/options/compose.rs b/src/options/compose.rs index cc48309e9..1b335ffcd 100644 --- a/src/options/compose.rs +++ b/src/options/compose.rs @@ -1,11 +1,8 @@ -use super::ProfileOpt; -use crate::options::LicenseAccepter; - use clap::Parser; use serde::Serialize; -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -use crate::{utils::client::StudioClientConfig, RoverResult}; +use super::ProfileOpt; +use crate::options::LicenseAccepter; #[cfg_attr(test, derive(Default))] #[derive(Debug, Clone, Serialize, Parser)] @@ -22,11 +19,3 @@ pub struct PluginOpts { #[arg(long = "skip-update")] pub skip_update: bool, } - -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -impl PluginOpts { - pub fn prompt_for_license_accept(&self, client_config: &StudioClientConfig) -> RoverResult<()> { - self.elv2_license_accepter - .require_elv2_license(client_config) - } -} diff --git a/src/options/subgraph.rs b/src/options/subgraph.rs index b41cbd462..64cad377a 100644 --- a/src/options/subgraph.rs +++ b/src/options/subgraph.rs @@ -1,24 +1,7 @@ -#[cfg(not(feature = "dev-next"))] -use std::io::{self, IsTerminal}; - -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -use anyhow::{Context, Result}; use camino::Utf8PathBuf; use clap::{self, Parser}; -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -use clap::{error::ErrorKind as ClapErrorKind, CommandFactory}; -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -use dialoguer::Input; -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -use reqwest::Url; use serde::{Deserialize, Serialize}; -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -use rover_std::{Fs, Style}; - -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -use crate::cli::Rover; - #[derive(Debug, Clone, Serialize, Deserialize, Parser)] pub struct SubgraphOpt { /// The name of the subgraph. @@ -69,99 +52,3 @@ pub struct OptionalSubgraphOpts { #[serde(skip_serializing)] pub subgraph_retries: u64, } - -#[cfg(all(feature = "composition-js", not(feature = "dev-next")))] -impl OptionalSubgraphOpts { - pub fn prompt_for_name(&self) -> Result { - if let Some(name) = &self.subgraph_name { - Ok(name.to_string()) - } else if io::stderr().is_terminal() { - let mut input = Input::new().with_prompt("what is the name of this subgraph?"); - if let Some(dirname) = Self::maybe_name_from_dir() { - input = input.default(dirname); - } - let name: String = input.interact_text()?; - Ok(name) - } else { - let mut cmd = Rover::command(); - cmd.error( - ClapErrorKind::MissingRequiredArgument, - "--name is required when not attached to a TTY", - ) - .exit(); - } - } - - pub fn prompt_for_url(&self) -> Result { - let url_context = |input| format!("'{}' is not a valid subgraph URL.", &input); - if let Some(subgraph_url) = &self.subgraph_url { - Ok(subgraph_url - .parse() - .with_context(|| url_context(subgraph_url))?) - } else if io::stderr().is_terminal() { - let input: String = Input::new() - .with_prompt("what URL is your subgraph running on?") - .interact_text()?; - Ok(input.parse().with_context(|| url_context(&input))?) - } else { - let mut cmd = Rover::command(); - cmd.error( - ClapErrorKind::MissingRequiredArgument, - "--url is required when not attached to a TTY", - ) - .exit(); - } - } - - pub fn prompt_for_schema(&self) -> Result> { - if let Some(schema) = &self.subgraph_schema_path { - Fs::assert_path_exists(schema)?; - Ok(Some(schema.clone())) - } else { - let possible_schemas: Vec = Fs::get_dir_entries("./") - .map(|entries| { - entries.flatten().filter_map(|entry| { - let mut result = None; - if let Ok(file_type) = entry.file_type() { - if file_type.is_file() { - let entry_path = entry.path(); - if let Some(extension) = entry_path.extension() { - if extension == "graphql" || extension == "gql" { - if let Some(file_stem) = entry_path.file_stem() { - if !file_stem.contains("supergraph") { - result = Some(entry.path().to_path_buf()); - } - } - } - } - } - } - result - }) - })? - .collect(); - - let warn_prefix = Style::WarningPrefix.paint("WARN:"); - match possible_schemas.len() { - 0 => { - eprintln!("{0} could not detect a schema in the current working directory. to watch a schema, pass the {1} argument", &warn_prefix, Style::Command.paint("'--schema '")); - Ok(None) - } - 1 => { - eprintln!("{0} if you would like to watch {1} for changes instead of introspecting every second, re-run this command with the {1} argument", &warn_prefix, Style::Command.paint(format!("'--schema {}'", possible_schemas[0]))); - Ok(None) - } - _ => { - eprintln!("{0} detected multiple schemas in the current working directory. you can only watch one schema at a time. to watch a schema, pass the {1} argument", &warn_prefix, Style::Command.paint("'--schema '")); - Ok(None) - } - } - } - } - - fn maybe_name_from_dir() -> Option { - std::env::current_dir() - .ok() - .and_then(|x| x.file_name().map(|x| x.to_string_lossy().to_lowercase())) - } -} diff --git a/src/utils/client.rs b/src/utils/client.rs index 3c8d8469a..87f04d1b8 100644 --- a/src/utils/client.rs +++ b/src/utils/client.rs @@ -1,20 +1,19 @@ use core::fmt; use std::{io, str::FromStr, time::Duration}; -use crate::{options::ProfileOpt, PKG_NAME, PKG_VERSION}; use anyhow::Result; - use derive_getters::Getters; use houston as config; use reqwest::Client; use rover_client::blocking::StudioClient; - use rover_http::{HttpService, ReqwestService}; use rover_studio::HttpStudioServiceLayer; use serde::Serialize; use tower::{ServiceBuilder, ServiceExt}; use url::Url; +use crate::{options::ProfileOpt, PKG_NAME, PKG_VERSION}; + /// the Apollo graph registry's production API endpoint const STUDIO_PROD_API_ENDPOINT: &str = "https://api.apollographql.com/graphql"; @@ -170,11 +169,6 @@ impl StudioClientConfig { } } - #[cfg(feature = "composition-js")] - pub(crate) fn get_builder(&self) -> ClientBuilder { - self.client_builder - } - pub fn service(&self) -> Result { let client = self.get_reqwest_client()?; Ok(ReqwestService::builder()