diff --git a/src/command/dev/next/mod.rs b/src/command/dev/next/mod.rs index 72a03fc26..4fb1e32e7 100644 --- a/src/command/dev/next/mod.rs +++ b/src/command/dev/next/mod.rs @@ -15,7 +15,9 @@ use rover_std::{errln, infoln, warnln}; use semver::Version; use tower::ServiceExt; +use self::router::config::{RouterAddress, RunRouterConfig}; use crate::composition::supergraph::binary::OutputTarget; +use crate::composition::FederationUpdaterConfig; use crate::{ command::{ dev::{OVERRIDE_DEV_COMPOSITION_VERSION, OVERRIDE_DEV_ROUTER_VERSION}, @@ -43,8 +45,6 @@ use crate::{ RoverOutput, RoverResult, }; -use self::router::config::RouterAddress; - mod router; impl Dev { @@ -163,6 +163,17 @@ impl Dev { &Config::new(home_override.as_ref(), api_key_override)?, )?; + // Set up an updater config, but only if we're not overriding the version ourselves. If + // we are then we don't need one, so it becomes None. + let federation_updater_config = match self.opts.supergraph_opts.federation_version { + Some(_) => None, + None => Some(FederationUpdaterConfig { + studio_client_config: client_config.clone(), + elv2_licence_accepter: elv2_license_accepter, + skip_update, + }), + }; + let composition_runner = composition_pipeline .runner( exec_command_impl, @@ -174,6 +185,7 @@ impl Dev { tmp_config_dir_path.clone(), OutputTarget::Stdout, false, + federation_updater_config, ) .await?; diff --git a/src/command/dev/next/router/run.rs b/src/command/dev/next/router/run.rs index b4646edb7..8e58e400c 100644 --- a/src/command/dev/next/router/run.rs +++ b/src/command/dev/next/router/run.rs @@ -332,7 +332,6 @@ impl RunRouter { let composition_messages = tokio_stream::StreamExt::filter_map(composition_messages, |event| match event { - CompositionEvent::Started => None, CompositionEvent::Error(err) => { tracing::error!("Composition error {:?}", err); None @@ -340,6 +339,7 @@ impl RunRouter { CompositionEvent::Success(success) => Some(RouterUpdateEvent::SchemaChanged { schema: success.supergraph_sdl().to_string(), }), + _ => None, }) .boxed(); diff --git a/src/command/lsp/mod.rs b/src/command/lsp/mod.rs index 048f4feb8..b9767cad8 100644 --- a/src/command/lsp/mod.rs +++ b/src/command/lsp/mod.rs @@ -27,10 +27,11 @@ use crate::composition::supergraph::config::lazy::LazilyResolvedSupergraphConfig use crate::composition::supergraph::config::resolver::fetch_remote_subgraph::MakeFetchRemoteSubgraph; use crate::composition::supergraph::config::resolver::fetch_remote_subgraphs::MakeFetchRemoteSubgraphs; use crate::composition::supergraph::config::resolver::{SubgraphPrompt, SupergraphConfigResolver}; +use crate::composition::supergraph::install::InstallSupergraphError; use crate::composition::SupergraphConfigResolutionError::PathDoesNotPointToAFile; use crate::composition::{ CompositionError, CompositionSubgraphAdded, CompositionSubgraphRemoved, CompositionSuccess, - SupergraphConfigResolutionError, + FederationUpdaterConfig, SupergraphConfigResolutionError, }; use crate::options::ProfileOpt; use crate::utils::effect::exec::TokioCommand; @@ -269,7 +270,12 @@ async fn start_composition( } CompositionEvent::Error(err) => { debug!("Composition failed: {err}"); - let message = format!("Composition failed to run: {err}",); + let message = match err { + CompositionError::ErrorUpdatingFederationVersion( + InstallSupergraphError::MissingDependency { err }, + ) => format!("Supergraph Version could not be updated: {err}"), + _ => format!("Composition failed to run: {err}",), + }; let diagnostic = Diagnostic::new_simple(Range::default(), message); language_server .publish_diagnostics(supergraph_yaml_url.clone(), vec![diagnostic]) @@ -345,6 +351,11 @@ async fn create_composition_stream( Utf8PathBuf::try_from(temp_dir())?, OutputTarget::InMemory, true, + Some(FederationUpdaterConfig { + studio_client_config: client_config, + elv2_licence_accepter: lsp_opts.plugin_opts.elv2_license_accepter, + skip_update: lsp_opts.plugin_opts.skip_update, + }), ) .await? .run()) diff --git a/src/composition/mod.rs b/src/composition/mod.rs index 7943084b7..1a4170c9a 100644 --- a/src/composition/mod.rs +++ b/src/composition/mod.rs @@ -13,6 +13,9 @@ use derive_getters::Getters; use crate::composition::supergraph::config::resolver::{ LoadRemoteSubgraphsError, LoadSupergraphConfigError, ResolveSupergraphConfigError, }; +use crate::composition::supergraph::install::InstallSupergraphError; +use crate::options::LicenseAccepter; +use crate::utils::client::StudioClientConfig; pub mod events; pub mod pipeline; @@ -25,6 +28,13 @@ pub mod types; #[cfg(feature = "composition-js")] mod watchers; +#[derive(Debug, Clone)] +pub struct FederationUpdaterConfig { + pub(crate) studio_client_config: StudioClientConfig, + pub(crate) elv2_licence_accepter: LicenseAccepter, + pub(crate) skip_update: bool, +} + #[derive(Getters, Debug, Clone, Eq, PartialEq)] pub struct CompositionSuccess { pub(crate) supergraph_sdl: String, @@ -65,6 +75,8 @@ pub enum CompositionError { SerdeYaml(#[from] serde_yaml::Error), #[error("{}", .0)] InvalidSupergraphConfig(String), + #[error("Error when updating Federation Version:\n{}", .0)] + ErrorUpdatingFederationVersion(#[from] InstallSupergraphError), } #[derive(Debug, Eq, PartialEq)] diff --git a/src/composition/pipeline.rs b/src/composition/pipeline.rs index dce9b85bc..99887e1f3 100644 --- a/src/composition/pipeline.rs +++ b/src/composition/pipeline.rs @@ -28,7 +28,7 @@ use super::{ }, install::{InstallSupergraph, InstallSupergraphError}, }, - CompositionError, CompositionSuccess, + CompositionError, CompositionSuccess, FederationUpdaterConfig, }; use crate::composition::pipeline::CompositionPipelineError::FederationOneWithFederationTwoSubgraphs; use crate::{ @@ -261,6 +261,7 @@ impl CompositionPipeline { output_dir: Utf8PathBuf, output_target: OutputTarget, compose_on_initialisation: bool, + federation_updater_config: Option, ) -> Result, CompositionPipelineError> where ReadF: ReadFile + Debug + Eq + PartialEq + Send + Sync + 'static, @@ -293,6 +294,7 @@ impl CompositionPipeline { output_dir, compose_on_initialisation, output_target, + federation_updater_config, ); Ok(runner) } diff --git a/src/composition/runner/mod.rs b/src/composition/runner/mod.rs index ad8284309..ba756b58b 100644 --- a/src/composition/runner/mod.rs +++ b/src/composition/runner/mod.rs @@ -26,6 +26,7 @@ use super::{ }, }, watchers::{composition::CompositionWatcher, subgraphs::SubgraphWatchers}, + FederationUpdaterConfig, }; use crate::composition::supergraph::binary::OutputTarget; use crate::composition::watchers::federation::FederationWatcher; @@ -137,6 +138,7 @@ impl Runner { temp_dir: Utf8PathBuf, compose_on_initialisation: bool, output_target: OutputTarget, + federation_updater_config: Option, ) -> Runner> where ExecC: ExecCommand + Debug + Eq + PartialEq + Send + Sync + 'static, @@ -144,7 +146,7 @@ impl Runner { WriteF: WriteFile + Debug + Eq + PartialEq + Send + Sync + 'static, { // Create a handler for supergraph composition events. - let composition_watcher = CompositionWatcher::builder() + let composition_watcher_builder = CompositionWatcher::builder() .supergraph_config(supergraph_config) .supergraph_binary(supergraph_binary) .exec_command(exec_command) @@ -152,8 +154,17 @@ impl Runner { .write_file(write_file) .temp_dir(temp_dir) .compose_on_initialisation(compose_on_initialisation) - .output_target(output_target) - .build(); + .output_target(output_target); + + let composition_watcher = if let Some(federation_updater_config) = federation_updater_config + { + composition_watcher_builder + .federation_updater_config(federation_updater_config) + .build() + } else { + composition_watcher_builder.build() + }; + Runner { state: state::Run { subgraph_watchers: self.state.subgraph_watchers, @@ -209,7 +220,7 @@ where // events in order to trigger recomposition. let (composition_messages, composition_subtask) = Subtask::new(self.state.composition_watcher); - composition_subtask.run(subgraph_change_stream.boxed()); + composition_subtask.run(select(subgraph_change_stream, federation_watcher_stream).boxed()); // Start subgraph watchers, listening for events from the supergraph change stream. subgraph_watcher_subtask.run( @@ -245,6 +256,6 @@ where supergraph_config_subtask.run(); } - select(composition_messages, federation_watcher_stream).boxed() + composition_messages.boxed() } } diff --git a/src/composition/watchers/composition.rs b/src/composition/watchers/composition.rs index 5f70343ed..634f5385f 100644 --- a/src/composition/watchers/composition.rs +++ b/src/composition/watchers/composition.rs @@ -1,17 +1,23 @@ use apollo_federation_types::config::SchemaSource::Sdl; -use apollo_federation_types::config::SupergraphConfig; +use apollo_federation_types::config::{FederationVersion, SupergraphConfig}; use buildstructor::Builder; use camino::Utf8PathBuf; use futures::stream::BoxStream; -use rover_std::{errln, infoln}; +use rover_std::{errln, infoln, warnln}; use tap::TapFallible; use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle}; use tokio_stream::StreamExt; use tracing::error; +use crate::composition::supergraph::install::InstallSupergraph; +use crate::composition::watchers::composition::CompositionInputEvent::{ + Federation, Passthrough, Subgraph, +}; use crate::composition::{ CompositionError, CompositionSubgraphAdded, CompositionSubgraphRemoved, CompositionSuccess, + FederationUpdaterConfig, }; +use crate::utils::effect::install::InstallBinary; use crate::{ composition::{ events::CompositionEvent, @@ -25,9 +31,22 @@ use crate::{ utils::effect::{exec::ExecCommand, read_file::ReadFile, write_file::WriteFile}, }; +/// Event to represent an input to the CompositionWatcher, depending on the source the event comes +/// from. This is really like a Union type over multiple disparate events +pub enum CompositionInputEvent { + /// Variant to represent if the change comes from a change to subgraphs + Subgraph(SubgraphEvent), + /// Variant to represent if the change comes from a change in the Federation Version + Federation(FederationVersion), + /// Variant to something that we do not want to perform Composition on but needs to be passed + /// through to the final stream of Composition Events. + Passthrough(CompositionEvent), +} + #[derive(Builder, Debug)] pub struct CompositionWatcher { supergraph_config: FullyResolvedSupergraphConfig, + federation_updater_config: Option, supergraph_binary: SupergraphBinary, exec_command: ExecC, read_file: ReadF, @@ -43,11 +62,11 @@ where ReadF: ReadFile + Send + Sync + 'static, WriteF: WriteFile + Send + Sync + 'static, { - type Input = SubgraphEvent; + type Input = CompositionInputEvent; type Output = CompositionEvent; fn handle( - self, + mut self, sender: UnboundedSender, mut input: BoxStream<'static, Self::Input>, ) -> AbortHandle { @@ -84,7 +103,7 @@ where while let Some(event) = input.next().await { match event { - SubgraphEvent::SubgraphChanged(subgraph_schema_changed) => { + Subgraph(SubgraphEvent::SubgraphChanged(subgraph_schema_changed)) => { let name = subgraph_schema_changed.name().clone(); let sdl = subgraph_schema_changed.sdl().clone(); let message = format!("Schema change detected for subgraph: {}", &name); @@ -107,7 +126,7 @@ where .tap_err(|err| error!("{:?}", err)); }; } - SubgraphEvent::SubgraphRemoved(subgraph_removed) => { + Subgraph(SubgraphEvent::SubgraphRemoved(subgraph_removed)) => { let name = subgraph_removed.name(); tracing::info!("Subgraph removed: {}", name); supergraph_config.remove_subgraph(name); @@ -117,6 +136,35 @@ where )) .tap_err(|err| error!("{:?}", err)); } + Federation(fed_version) => { + if let Some(federation_updater_config) = self.federation_updater_config.clone() { + tracing::info!("Attempting to change supergraph version to {:?}", fed_version); + infoln!("Attempting to change supergraph version to {}", fed_version.get_exact().unwrap()); + let install_res = + InstallSupergraph::new(fed_version, federation_updater_config.studio_client_config.clone()) + .install(None, federation_updater_config.elv2_licence_accepter, federation_updater_config.skip_update) + .await; + match install_res { + Ok(supergraph_binary) => { + tracing::info!("Supergraph version changed to {:?}", supergraph_binary.version()); + infoln!("Supergraph version changed to {}", supergraph_binary.version().to_string()); + self.supergraph_binary = supergraph_binary + } + Err(err) => { + tracing::warn!("Failed to change supergraph version, current version has been retained..."); + warnln!("Failed to change supergraph version, current version has been retained..."); + let _ = sender.send(CompositionEvent::Error(err.into())).tap_err(|err| error!("{:?}", err)); + continue; + } + } + } else { + tracing::warn!("Detected Federation Version change but due to overrides (CLI flags, ENV vars etc.) this was not actioned.") + } + } + Passthrough(ev) => { + let _ = sender.send(ev).tap_err(|err| error!("{:?}", err)); + continue; + } } if let Err(err) = self @@ -232,8 +280,9 @@ mod tests { use speculoos::prelude::*; use tracing_test::traced_test; - use super::CompositionWatcher; + use super::{CompositionInputEvent, CompositionWatcher}; use crate::composition::supergraph::binary::OutputTarget; + use crate::composition::watchers::composition::CompositionInputEvent::Subgraph; use crate::composition::CompositionSubgraphAdded; use crate::{ composition::{ @@ -329,12 +378,12 @@ mod tests { .output_target(OutputTarget::Stdout) .build(); - let subgraph_change_events: BoxStream = once(async { - SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::new( + let subgraph_change_events: BoxStream = once(async { + Subgraph(SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::new( subgraph_name, subgraph_sdl, "https://example.com".to_string(), - )) + ))) }) .boxed(); let (mut composition_messages, composition_subtask) = Subtask::new(composition_handler); diff --git a/src/composition/watchers/federation.rs b/src/composition/watchers/federation.rs index 220bf788e..e8111cae5 100644 --- a/src/composition/watchers/federation.rs +++ b/src/composition/watchers/federation.rs @@ -1,3 +1,4 @@ +use apollo_federation_types::config::FederationVersion::LatestFedTwo; use futures::stream::BoxStream; use futures::StreamExt; use tap::TapFallible; @@ -6,6 +7,7 @@ use tokio::task::AbortHandle; use tracing::error; use crate::composition::events::CompositionEvent; +use crate::composition::watchers::composition::CompositionInputEvent; use crate::composition::watchers::watcher::supergraph_config::{ SupergraphConfigDiff, SupergraphConfigSerialisationError, }; @@ -17,7 +19,7 @@ pub struct FederationWatcher {} impl SubtaskHandleStream for FederationWatcher { type Input = Result; - type Output = CompositionEvent; + type Output = CompositionInputEvent; fn handle( self, @@ -26,15 +28,26 @@ impl SubtaskHandleStream for FederationWatcher { ) -> AbortHandle { tokio::task::spawn(async move { while let Some(recv_res) = input.next().await { - if let Err(SupergraphConfigSerialisationError::DeserializingConfigError { - source, - }) = recv_res - { - let _ = sender - .send(CompositionEvent::Error( - CompositionError::InvalidSupergraphConfig(source.message()), - )) - .tap_err(|err| error!("{:?}", err)); + match recv_res { + Ok(diff) => { + if let Some(fed_version) = diff.federation_version() { + let _ = sender + .send(CompositionInputEvent::Federation( + fed_version.clone().unwrap_or(LatestFedTwo), + )) + .tap_err(|err| error!("{:?}", err)); + } + } + Err(SupergraphConfigSerialisationError::DeserializingConfigError { + source, + }) => { + let _ = sender + .send(CompositionInputEvent::Passthrough(CompositionEvent::Error( + CompositionError::InvalidSupergraphConfig(source.message()), + ))) + .tap_err(|err| error!("{:?}", err)); + } + Err(_) => {} } } }) diff --git a/src/composition/watchers/subgraphs.rs b/src/composition/watchers/subgraphs.rs index 077468676..500b902f8 100644 --- a/src/composition/watchers/subgraphs.rs +++ b/src/composition/watchers/subgraphs.rs @@ -12,6 +12,8 @@ use super::watcher::{ subgraph::{NonRepeatingFetch, SubgraphWatcher, SubgraphWatcherKind}, supergraph_config::SupergraphConfigDiff, }; +use crate::composition::watchers::composition::CompositionInputEvent; +use crate::composition::watchers::composition::CompositionInputEvent::Subgraph; use crate::composition::watchers::watcher::supergraph_config::SupergraphConfigSerialisationError; use crate::{ composition::supergraph::config::{ @@ -149,7 +151,7 @@ pub struct SubgraphSchemaRemoved { impl SubtaskHandleStream for SubgraphWatchers { type Input = Result; - type Output = SubgraphEvent; + type Output = CompositionInputEvent; fn handle( self, @@ -210,7 +212,7 @@ impl SubtaskHandleStream for SubgraphWatchers { struct SubgraphHandles { abort_handles: HashMap, - sender: UnboundedSender, + sender: UnboundedSender, resolve_introspect_subgraph_factory: ResolveIntrospectSubgraphFactory, fetch_remote_subgraph_factory: FetchRemoteSubgraphFactory, supergraph_config_root: Utf8PathBuf, @@ -218,7 +220,7 @@ struct SubgraphHandles { impl SubgraphHandles { pub fn new( - sender: UnboundedSender, + sender: UnboundedSender, watchers: HashMap, resolve_introspect_subgraph_factory: ResolveIntrospectSubgraphFactory, fetch_remote_subgraph_factory: FetchRemoteSubgraphFactory, @@ -238,7 +240,7 @@ impl SubgraphHandles { while let Some(subgraph) = messages.next().await { tracing::info!("Subgraph change detected: {:?}", subgraph); let _ = sender - .send(SubgraphEvent::SubgraphChanged(subgraph.into())) + .send(Subgraph(SubgraphEvent::SubgraphChanged(subgraph.into()))) .tap_err(|err| tracing::error!("{:?}", err)); } } @@ -333,7 +335,7 @@ impl SubgraphHandles { .map(|subgraph| { let _ = self .sender - .send(SubgraphEvent::SubgraphChanged(subgraph.into())) + .send(Subgraph(SubgraphEvent::SubgraphChanged(subgraph.into()))) .tap_err(|err| tracing::error!("{:?}", err)); }); } @@ -349,9 +351,11 @@ impl SubgraphHandles { let _ = self .sender - .send(SubgraphEvent::SubgraphRemoved(SubgraphSchemaRemoved { - name: subgraph.to_string(), - })) + .send(Subgraph(SubgraphEvent::SubgraphRemoved( + SubgraphSchemaRemoved { + name: subgraph.to_string(), + }, + ))) .tap_err(|err| tracing::error!("{:?}", err)); } @@ -367,7 +371,7 @@ impl SubgraphHandles { .map(|subgraph| { let _ = self .sender - .send(SubgraphEvent::SubgraphChanged(subgraph.into())) + .send(Subgraph(SubgraphEvent::SubgraphChanged(subgraph.into()))) .tap_err(|err| tracing::error!("{:?}", err)); }); } @@ -382,7 +386,9 @@ impl SubgraphHandles { Subtask::::new(subgraph_watcher); let _ = self .sender - .send(SubgraphEvent::SubgraphChanged(subgraph.clone().into())) + .send(Subgraph(SubgraphEvent::SubgraphChanged( + subgraph.clone().into(), + ))) .tap_err(|err| tracing::error!("{:?}", err)); let messages_abort_handle = tokio::spawn({ @@ -390,7 +396,7 @@ impl SubgraphHandles { async move { while let Some(subgraph) = messages.next().await { let _ = sender - .send(SubgraphEvent::SubgraphChanged(subgraph.into())) + .send(Subgraph(SubgraphEvent::SubgraphChanged(subgraph.into()))) .tap_err(|err| tracing::error!("{:?}", err)); } } diff --git a/src/composition/watchers/watcher/supergraph_config.rs b/src/composition/watchers/watcher/supergraph_config.rs index addbd9711..b1dd9e053 100644 --- a/src/composition/watchers/watcher/supergraph_config.rs +++ b/src/composition/watchers/watcher/supergraph_config.rs @@ -1,7 +1,9 @@ use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; -use apollo_federation_types::config::{ConfigError, SubgraphConfig, SupergraphConfig}; +use apollo_federation_types::config::{ + ConfigError, FederationVersion, SubgraphConfig, SupergraphConfig, +}; use derive_getters::Getters; use futures::StreamExt; use rover_std::errln; @@ -9,8 +11,10 @@ use tap::TapFallible; use thiserror::Error; use tokio::sync::broadcast::Sender; use tokio::task::AbortHandle; +use tracing::debug; use super::file::FileWatcher; +use crate::composition::supergraph::config::federation::FederationVersionResolver; use crate::composition::supergraph::config::{ error::ResolveSubgraphError, lazy::LazilyResolvedSupergraphConfig, unresolved::UnresolvedSupergraphConfig, @@ -59,6 +63,7 @@ impl SubtaskHandleMultiStream for SupergraphConfigWatcher { let unresolved_supergraph_config = UnresolvedSupergraphConfig::builder() .origin_path(supergraph_config_path.clone()) .subgraphs(subgraphs) + .federation_version_resolver(FederationVersionResolver::default().from_supergraph_config(Some(&supergraph_config))) .build(); let supergraph_config = LazilyResolvedSupergraphConfig::resolve( &supergraph_config_path.parent().unwrap().to_path_buf(), @@ -123,6 +128,7 @@ pub struct SupergraphConfigDiff { added: Vec<(String, SubgraphConfig)>, changed: Vec<(String, SubgraphConfig)>, removed: Vec, + federation_version: Option>, } impl SupergraphConfigDiff { @@ -184,10 +190,22 @@ impl SupergraphConfigDiff { }) .collect::>(); + let federation_version = if old.get_federation_version() != new.get_federation_version() { + debug!( + "Detected federation version change. Changing from {:?} to {:?}", + old.get_federation_version(), + new.get_federation_version() + ); + Some(new.get_federation_version()) + } else { + None + }; + Ok(SupergraphConfigDiff { added, changed, removed, + federation_version, }) } }