Skip to content

Commit

Permalink
ROVER-257 Hot-reloading of Federation Version in supergraph.yaml (#…
Browse files Browse the repository at this point in the history
…2347)

One of the key features that is required for the LSP is the ability to
change the Federation Version specified in the `supergraph.yaml` and for
it to then reload in the active session, rather than needing a restart.
This is implemented as of this PR, and due to the nature of the previous
refactor, has also been added to `rover dev` as well!

The way this has been done is to extend the FederationWatcher to track
when a change of version occurs and then to get the CompositionWatcher
to update its own internal state (downloading a new version when
necessary). The outcome has been tested and performs w.r.t the LSP as
we'd expect. Below is an example of the logging with it working for
`rover dev`:

I've also implemented a feature whereby we don't do the auto-updating
behaviour if a user is specifically overriding the version of
composition to be used via an environment variable, though for the
moment this is only relevant to `rover dev`.
  • Loading branch information
jonathanrainer authored Jan 20, 2025
1 parent 30a53c2 commit b95a753
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 43 deletions.
16 changes: 14 additions & 2 deletions src/command/dev/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use rover_std::{errln, infoln, warnln};
use semver::Version;
use tower::ServiceExt;

use self::router::config::{RouterAddress, RunRouterConfig};
use crate::composition::supergraph::binary::OutputTarget;
use crate::composition::FederationUpdaterConfig;
use crate::{
command::{
dev::{OVERRIDE_DEV_COMPOSITION_VERSION, OVERRIDE_DEV_ROUTER_VERSION},
Expand Down Expand Up @@ -43,8 +45,6 @@ use crate::{
RoverOutput, RoverResult,
};

use self::router::config::RouterAddress;

mod router;

impl Dev {
Expand Down Expand Up @@ -163,6 +163,17 @@ impl Dev {
&Config::new(home_override.as_ref(), api_key_override)?,
)?;

// Set up an updater config, but only if we're not overriding the version ourselves. If
// we are then we don't need one, so it becomes None.
let federation_updater_config = match self.opts.supergraph_opts.federation_version {
Some(_) => None,
None => Some(FederationUpdaterConfig {
studio_client_config: client_config.clone(),
elv2_licence_accepter: elv2_license_accepter,
skip_update,
}),
};

let composition_runner = composition_pipeline
.runner(
exec_command_impl,
Expand All @@ -174,6 +185,7 @@ impl Dev {
tmp_config_dir_path.clone(),
OutputTarget::Stdout,
false,
federation_updater_config,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion src/command/dev/next/router/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,14 @@ impl RunRouter<state::Watch> {

let composition_messages =
tokio_stream::StreamExt::filter_map(composition_messages, |event| match event {
CompositionEvent::Started => None,
CompositionEvent::Error(err) => {
tracing::error!("Composition error {:?}", err);
None
}
CompositionEvent::Success(success) => Some(RouterUpdateEvent::SchemaChanged {
schema: success.supergraph_sdl().to_string(),
}),
_ => None,
})
.boxed();

Expand Down
15 changes: 13 additions & 2 deletions src/command/lsp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ use crate::composition::supergraph::config::lazy::LazilyResolvedSupergraphConfig
use crate::composition::supergraph::config::resolver::fetch_remote_subgraph::MakeFetchRemoteSubgraph;
use crate::composition::supergraph::config::resolver::fetch_remote_subgraphs::MakeFetchRemoteSubgraphs;
use crate::composition::supergraph::config::resolver::{SubgraphPrompt, SupergraphConfigResolver};
use crate::composition::supergraph::install::InstallSupergraphError;
use crate::composition::SupergraphConfigResolutionError::PathDoesNotPointToAFile;
use crate::composition::{
CompositionError, CompositionSubgraphAdded, CompositionSubgraphRemoved, CompositionSuccess,
SupergraphConfigResolutionError,
FederationUpdaterConfig, SupergraphConfigResolutionError,
};
use crate::options::ProfileOpt;
use crate::utils::effect::exec::TokioCommand;
Expand Down Expand Up @@ -269,7 +270,12 @@ async fn start_composition(
}
CompositionEvent::Error(err) => {
debug!("Composition failed: {err}");
let message = format!("Composition failed to run: {err}",);
let message = match err {
CompositionError::ErrorUpdatingFederationVersion(
InstallSupergraphError::MissingDependency { err },
) => format!("Supergraph Version could not be updated: {err}"),
_ => format!("Composition failed to run: {err}",),
};
let diagnostic = Diagnostic::new_simple(Range::default(), message);
language_server
.publish_diagnostics(supergraph_yaml_url.clone(), vec![diagnostic])
Expand Down Expand Up @@ -345,6 +351,11 @@ async fn create_composition_stream(
Utf8PathBuf::try_from(temp_dir())?,

Check notice on line 351 in src/command/lsp/mod.rs

View check run for this annotation

Apollo SecOps / Static App Security Check

rules.providers.semgrep.security.rust.lang.security.temp-dir

temp_dir should not be used for security operations. From the docs: 'The temporary directory may be shared among users, or between processes with different privileges; thus, the creation of any files or directories in the temporary directory must use a secure method to create a uniquely named file. Creating a file or directory with a fixed or predictable name may result in “insecure temporary file” security vulnerabilities.'
OutputTarget::InMemory,
true,
Some(FederationUpdaterConfig {
studio_client_config: client_config,
elv2_licence_accepter: lsp_opts.plugin_opts.elv2_license_accepter,
skip_update: lsp_opts.plugin_opts.skip_update,
}),
)
.await?
.run())
Expand Down
12 changes: 12 additions & 0 deletions src/composition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use derive_getters::Getters;
use crate::composition::supergraph::config::resolver::{
LoadRemoteSubgraphsError, LoadSupergraphConfigError, ResolveSupergraphConfigError,
};
use crate::composition::supergraph::install::InstallSupergraphError;
use crate::options::LicenseAccepter;
use crate::utils::client::StudioClientConfig;

pub mod events;
pub mod pipeline;
Expand All @@ -25,6 +28,13 @@ pub mod types;
#[cfg(feature = "composition-js")]
mod watchers;

#[derive(Debug, Clone)]
pub struct FederationUpdaterConfig {
pub(crate) studio_client_config: StudioClientConfig,
pub(crate) elv2_licence_accepter: LicenseAccepter,
pub(crate) skip_update: bool,
}

#[derive(Getters, Debug, Clone, Eq, PartialEq)]
pub struct CompositionSuccess {
pub(crate) supergraph_sdl: String,
Expand Down Expand Up @@ -65,6 +75,8 @@ pub enum CompositionError {
SerdeYaml(#[from] serde_yaml::Error),
#[error("{}", .0)]
InvalidSupergraphConfig(String),
#[error("Error when updating Federation Version:\n{}", .0)]
ErrorUpdatingFederationVersion(#[from] InstallSupergraphError),
}

#[derive(Debug, Eq, PartialEq)]
Expand Down
4 changes: 3 additions & 1 deletion src/composition/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::{
},
install::{InstallSupergraph, InstallSupergraphError},
},
CompositionError, CompositionSuccess,
CompositionError, CompositionSuccess, FederationUpdaterConfig,
};
use crate::composition::pipeline::CompositionPipelineError::FederationOneWithFederationTwoSubgraphs;
use crate::{
Expand Down Expand Up @@ -261,6 +261,7 @@ impl CompositionPipeline<state::Run> {
output_dir: Utf8PathBuf,
output_target: OutputTarget,
compose_on_initialisation: bool,
federation_updater_config: Option<FederationUpdaterConfig>,
) -> Result<CompositionRunner<ExecC, ReadF, WriteF>, CompositionPipelineError>
where
ReadF: ReadFile + Debug + Eq + PartialEq + Send + Sync + 'static,
Expand Down Expand Up @@ -293,6 +294,7 @@ impl CompositionPipeline<state::Run> {
output_dir,
compose_on_initialisation,
output_target,
federation_updater_config,
);
Ok(runner)
}
Expand Down
21 changes: 16 additions & 5 deletions src/composition/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::{
},
},
watchers::{composition::CompositionWatcher, subgraphs::SubgraphWatchers},
FederationUpdaterConfig,
};
use crate::composition::supergraph::binary::OutputTarget;
use crate::composition::watchers::federation::FederationWatcher;
Expand Down Expand Up @@ -137,23 +138,33 @@ impl Runner<state::SetupCompositionWatcher> {
temp_dir: Utf8PathBuf,
compose_on_initialisation: bool,
output_target: OutputTarget,
federation_updater_config: Option<FederationUpdaterConfig>,
) -> Runner<state::Run<ExecC, ReadF, WriteF>>
where
ExecC: ExecCommand + Debug + Eq + PartialEq + Send + Sync + 'static,
ReadF: ReadFile + Debug + Eq + PartialEq + Send + Sync + 'static,
WriteF: WriteFile + Debug + Eq + PartialEq + Send + Sync + 'static,
{
// Create a handler for supergraph composition events.
let composition_watcher = CompositionWatcher::builder()
let composition_watcher_builder = CompositionWatcher::builder()
.supergraph_config(supergraph_config)
.supergraph_binary(supergraph_binary)
.exec_command(exec_command)
.read_file(read_file)
.write_file(write_file)
.temp_dir(temp_dir)
.compose_on_initialisation(compose_on_initialisation)
.output_target(output_target)
.build();
.output_target(output_target);

let composition_watcher = if let Some(federation_updater_config) = federation_updater_config
{
composition_watcher_builder
.federation_updater_config(federation_updater_config)
.build()
} else {
composition_watcher_builder.build()
};

Runner {
state: state::Run {
subgraph_watchers: self.state.subgraph_watchers,
Expand Down Expand Up @@ -209,7 +220,7 @@ where
// events in order to trigger recomposition.
let (composition_messages, composition_subtask) =
Subtask::new(self.state.composition_watcher);
composition_subtask.run(subgraph_change_stream.boxed());
composition_subtask.run(select(subgraph_change_stream, federation_watcher_stream).boxed());

// Start subgraph watchers, listening for events from the supergraph change stream.
subgraph_watcher_subtask.run(
Expand Down Expand Up @@ -245,6 +256,6 @@ where
supergraph_config_subtask.run();
}

select(composition_messages, federation_watcher_stream).boxed()
composition_messages.boxed()
}
}
69 changes: 59 additions & 10 deletions src/composition/watchers/composition.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use apollo_federation_types::config::SchemaSource::Sdl;
use apollo_federation_types::config::SupergraphConfig;
use apollo_federation_types::config::{FederationVersion, SupergraphConfig};
use buildstructor::Builder;
use camino::Utf8PathBuf;
use futures::stream::BoxStream;
use rover_std::{errln, infoln};
use rover_std::{errln, infoln, warnln};
use tap::TapFallible;
use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle};
use tokio_stream::StreamExt;
use tracing::error;

use crate::composition::supergraph::install::InstallSupergraph;
use crate::composition::watchers::composition::CompositionInputEvent::{
Federation, Passthrough, Subgraph,
};
use crate::composition::{
CompositionError, CompositionSubgraphAdded, CompositionSubgraphRemoved, CompositionSuccess,
FederationUpdaterConfig,
};
use crate::utils::effect::install::InstallBinary;
use crate::{
composition::{
events::CompositionEvent,
Expand All @@ -25,9 +31,22 @@ use crate::{
utils::effect::{exec::ExecCommand, read_file::ReadFile, write_file::WriteFile},
};

/// Event to represent an input to the CompositionWatcher, depending on the source the event comes
/// from. This is really like a Union type over multiple disparate events
pub enum CompositionInputEvent {
/// Variant to represent if the change comes from a change to subgraphs
Subgraph(SubgraphEvent),
/// Variant to represent if the change comes from a change in the Federation Version
Federation(FederationVersion),
/// Variant to something that we do not want to perform Composition on but needs to be passed
/// through to the final stream of Composition Events.
Passthrough(CompositionEvent),
}

#[derive(Builder, Debug)]
pub struct CompositionWatcher<ExecC, ReadF, WriteF> {
supergraph_config: FullyResolvedSupergraphConfig,
federation_updater_config: Option<FederationUpdaterConfig>,
supergraph_binary: SupergraphBinary,
exec_command: ExecC,
read_file: ReadF,
Expand All @@ -43,11 +62,11 @@ where
ReadF: ReadFile + Send + Sync + 'static,
WriteF: WriteFile + Send + Sync + 'static,
{
type Input = SubgraphEvent;
type Input = CompositionInputEvent;
type Output = CompositionEvent;

fn handle(
self,
mut self,
sender: UnboundedSender<Self::Output>,
mut input: BoxStream<'static, Self::Input>,
) -> AbortHandle {
Expand Down Expand Up @@ -84,7 +103,7 @@ where

while let Some(event) = input.next().await {
match event {
SubgraphEvent::SubgraphChanged(subgraph_schema_changed) => {
Subgraph(SubgraphEvent::SubgraphChanged(subgraph_schema_changed)) => {
let name = subgraph_schema_changed.name().clone();
let sdl = subgraph_schema_changed.sdl().clone();
let message = format!("Schema change detected for subgraph: {}", &name);
Expand All @@ -107,7 +126,7 @@ where
.tap_err(|err| error!("{:?}", err));
};
}
SubgraphEvent::SubgraphRemoved(subgraph_removed) => {
Subgraph(SubgraphEvent::SubgraphRemoved(subgraph_removed)) => {
let name = subgraph_removed.name();
tracing::info!("Subgraph removed: {}", name);
supergraph_config.remove_subgraph(name);
Expand All @@ -117,6 +136,35 @@ where
))
.tap_err(|err| error!("{:?}", err));
}
Federation(fed_version) => {
if let Some(federation_updater_config) = self.federation_updater_config.clone() {
tracing::info!("Attempting to change supergraph version to {:?}", fed_version);
infoln!("Attempting to change supergraph version to {}", fed_version.get_exact().unwrap());
let install_res =
InstallSupergraph::new(fed_version, federation_updater_config.studio_client_config.clone())
.install(None, federation_updater_config.elv2_licence_accepter, federation_updater_config.skip_update)
.await;
match install_res {
Ok(supergraph_binary) => {
tracing::info!("Supergraph version changed to {:?}", supergraph_binary.version());
infoln!("Supergraph version changed to {}", supergraph_binary.version().to_string());
self.supergraph_binary = supergraph_binary
}
Err(err) => {
tracing::warn!("Failed to change supergraph version, current version has been retained...");
warnln!("Failed to change supergraph version, current version has been retained...");
let _ = sender.send(CompositionEvent::Error(err.into())).tap_err(|err| error!("{:?}", err));
continue;
}
}
} else {
tracing::warn!("Detected Federation Version change but due to overrides (CLI flags, ENV vars etc.) this was not actioned.")
}
}
Passthrough(ev) => {
let _ = sender.send(ev).tap_err(|err| error!("{:?}", err));
continue;
}
}

if let Err(err) = self
Expand Down Expand Up @@ -232,8 +280,9 @@ mod tests {
use speculoos::prelude::*;
use tracing_test::traced_test;

use super::CompositionWatcher;
use super::{CompositionInputEvent, CompositionWatcher};
use crate::composition::supergraph::binary::OutputTarget;
use crate::composition::watchers::composition::CompositionInputEvent::Subgraph;
use crate::composition::CompositionSubgraphAdded;
use crate::{
composition::{
Expand Down Expand Up @@ -329,12 +378,12 @@ mod tests {
.output_target(OutputTarget::Stdout)
.build();

let subgraph_change_events: BoxStream<SubgraphEvent> = once(async {
SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::new(
let subgraph_change_events: BoxStream<CompositionInputEvent> = once(async {
Subgraph(SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::new(
subgraph_name,
subgraph_sdl,
"https://example.com".to_string(),
))
)))
})
.boxed();
let (mut composition_messages, composition_subtask) = Subtask::new(composition_handler);
Expand Down
Loading

0 comments on commit b95a753

Please sign in to comment.