Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROVER-286 Removing routing_url gives correct error message #2360

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/composition/supergraph/config/full/subgraph/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,16 @@ impl Service<()> for ResolveFileSubgraph {
let schema = Fs::read_file(&file).map_err(|err| ResolveSubgraphError::Fs {
source: Arc::new(Box::new(err)),
})?;
let routing_url = unresolved_subgraph.routing_url().clone().ok_or_else(|| {
ResolveSubgraphError::MissingRoutingUrl {
subgraph: unresolved_subgraph.name().to_string(),
}
})?;

Ok(FullyResolvedSubgraph::builder()
let builder = FullyResolvedSubgraph::builder()
.name(subgraph_name)
.routing_url(routing_url)
.schema(schema)
.schema_source(schema_source)
.build())
.schema_source(schema_source);

Ok(match unresolved_subgraph.routing_url() {
None => builder.build(),
Some(routing_url) => builder.routing_url(routing_url).build(),
})
};
Box::pin(fut)
}
Expand Down
20 changes: 9 additions & 11 deletions src/composition/supergraph/config/full/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub type FullyResolveSubgraphService =
#[derive(Clone, Debug, Eq, PartialEq, Getters)]
pub struct FullyResolvedSubgraph {
name: String,
routing_url: String,
pub(crate) routing_url: Option<String>,
schema: String,
schema_source: SchemaSource,
pub(crate) is_fed_two: bool,
Expand All @@ -44,7 +44,7 @@ impl FullyResolvedSubgraph {
pub fn new(
name: String,
schema: String,
routing_url: String,
routing_url: Option<String>,
schema_source: SchemaSource,
) -> FullyResolvedSubgraph {
let is_fed_two = schema_contains_link_directive(&schema);
Expand Down Expand Up @@ -124,16 +124,14 @@ impl FullyResolvedSubgraph {
let unresolved_subgraph = unresolved_subgraph.clone();
let sdl = sdl.to_string();
async move {
Ok(FullyResolvedSubgraph::builder()
let builder = FullyResolvedSubgraph::builder()
.name(unresolved_subgraph.name().to_string())
.routing_url(unresolved_subgraph.routing_url().clone().ok_or_else(
|| ResolveSubgraphError::MissingRoutingUrl {
subgraph: unresolved_subgraph.name().to_string(),
},
)?)
.schema(sdl.to_string())
.schema_source(SchemaSource::Sdl { sdl })
.build())
.schema_source(SchemaSource::Sdl { sdl });
Ok(match unresolved_subgraph.routing_url() {
None => builder.build(),
Some(routing_url) => builder.routing_url(routing_url).build(),
})
}
})
.boxed_clone()),
Expand All @@ -149,7 +147,7 @@ impl FullyResolvedSubgraph {
impl From<FullyResolvedSubgraph> for SubgraphConfig {
fn from(value: FullyResolvedSubgraph) -> Self {
SubgraphConfig {
routing_url: Some(value.routing_url),
routing_url: value.routing_url,
schema: SchemaSource::Sdl { sdl: value.schema },
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/composition/supergraph/config/full/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use derive_getters::Getters;
use futures::{stream, StreamExt, TryFutureExt};
use itertools::Itertools;
use tower::{Service, ServiceExt};
use tracing::debug;

use super::FullyResolvedSubgraph;
use crate::composition::supergraph::config::full::introspect::ResolveIntrospectSubgraphFactory;
Expand Down Expand Up @@ -108,6 +109,32 @@ impl FullyResolvedSupergraphConfig {
self.subgraphs.insert(name, subgraph)
}

pub(crate) fn update_routing_url(
&mut self,
subgraph_name: &str,
routing_url: Option<String>,
) -> Option<Option<String>> {
match self.subgraphs.get_mut(subgraph_name) {
None => {
debug!("Could not find subgraph {}", subgraph_name);
None
}
Some(subgraph) => {
let original_value = subgraph.routing_url.clone();
if routing_url != subgraph.routing_url {
debug!(
"Updating routing URL from {:?} to {:?}",
subgraph.routing_url, routing_url
);
subgraph.routing_url = routing_url;
Some(original_value)
} else {
None
}
}
}
}

/// Removes the subgraph with the name provided
pub fn remove_subgraph(&mut self, name: &str) {
self.subgraphs.remove(name);
Expand Down
26 changes: 18 additions & 8 deletions src/composition/watchers/composition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tap::TapFallible;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::{error, info};

use crate::composition::supergraph::install::InstallSupergraph;
use crate::composition::watchers::composition::CompositionInputEvent::{
Expand Down Expand Up @@ -104,7 +104,7 @@ where
cancellation_token.run_until_cancelled(async {
while let Some(event) = input.next().await {
match event {
Subgraph(SubgraphEvent::SubgraphChanged(subgraph_schema_changed)) => {
Subgraph(SubgraphEvent::SubgraphSchemaChanged(subgraph_schema_changed)) => {
let name = subgraph_schema_changed.name().clone();
let schema_source = subgraph_schema_changed.schema_source().clone();
let message = format!("Schema change detected for subgraph: {}", &name);
Expand All @@ -127,6 +127,14 @@ where
.tap_err(|err| error!("{:?}", err));
};
}
Subgraph(SubgraphEvent::RoutingUrlChanged(routing_url_changed)) => {
let name = routing_url_changed.name();
info!("Change of routing_url detected for subgraph '{}'", name);
if supergraph_config.update_routing_url(name, routing_url_changed.routing_url().clone()).is_none() {
// If we get None back then continue, as we don't need to recompose
continue
}
}
Subgraph(SubgraphEvent::SubgraphRemoved(subgraph_removed)) => {
let name = subgraph_removed.name();
tracing::info!("Subgraph removed: {}", name);
Expand Down Expand Up @@ -380,12 +388,14 @@ mod tests {
.build();

let subgraph_change_events: BoxStream<CompositionInputEvent> = once(async {
Subgraph(SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::new(
subgraph_name,
subgraph_sdl.clone(),
"https://example.com".to_string(),
SchemaSource::Sdl { sdl: subgraph_sdl },
)))
Subgraph(SubgraphEvent::SubgraphSchemaChanged(
SubgraphSchemaChanged::new(
subgraph_name,
subgraph_sdl.clone(),
"https://example.com".to_string(),
SchemaSource::Sdl { sdl: subgraph_sdl },
),
))
})
.boxed();
let (mut composition_messages, composition_subtask) = Subtask::new(composition_handler);
Expand Down
58 changes: 43 additions & 15 deletions src/composition/watchers/subgraphs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ impl SubgraphWatchers {
/// name of the subgraph
pub enum SubgraphEvent {
/// A change to the watched subgraph
SubgraphChanged(SubgraphSchemaChanged),
SubgraphSchemaChanged(SubgraphSchemaChanged),
/// A change to the watched subgraph's routing URL
RoutingUrlChanged(SubgraphRoutingUrlChanged),
/// The subgraph is no longer watched
SubgraphRemoved(SubgraphSchemaRemoved),
}
Expand All @@ -109,7 +111,7 @@ pub struct SubgraphSchemaChanged {
name: String,
/// SDL with changes
sdl: String,
routing_url: String,
routing_url: Option<String>,
/// Schema Source
schema_source: SchemaSource,
}
Expand All @@ -125,20 +127,22 @@ impl SubgraphSchemaChanged {
SubgraphSchemaChanged {
name,
sdl,
routing_url,
routing_url: Some(routing_url),
schema_source,
}
}
}

impl From<SubgraphSchemaChanged> for FullyResolvedSubgraph {
fn from(value: SubgraphSchemaChanged) -> Self {
FullyResolvedSubgraph::builder()
let builder = FullyResolvedSubgraph::builder()
.name(value.name)
.schema(value.sdl)
.routing_url(value.routing_url)
.schema_source(value.schema_source)
.build()
.schema_source(value.schema_source);
match value.routing_url {
None => builder.build(),
Some(routing_url) => builder.routing_url(routing_url).build(),
}
}
}

Expand All @@ -147,12 +151,18 @@ impl From<FullyResolvedSubgraph> for SubgraphSchemaChanged {
SubgraphSchemaChanged {
name: value.name().to_string(),
sdl: value.schema().to_string(),
routing_url: value.routing_url().to_string(),
routing_url: value.routing_url().to_owned(),
schema_source: value.schema_source().to_owned(),
}
}
}

#[derive(derive_getters::Getters, Default)]
pub struct SubgraphRoutingUrlChanged {
name: String,
routing_url: Option<String>,
}

/// The subgraph is no longer watched
#[derive(derive_getters::Getters, Default)]
pub struct SubgraphSchemaRemoved {
Expand Down Expand Up @@ -258,7 +268,9 @@ impl SubgraphHandles {
while let Some(subgraph) = messages.next().await {
tracing::info!("Subgraph change detected: {:?}", subgraph);
let _ = sender
.send(Subgraph(SubgraphEvent::SubgraphChanged(subgraph.into())))
.send(Subgraph(SubgraphEvent::SubgraphSchemaChanged(
subgraph.into(),
)))
.tap_err(|err| tracing::error!("{:?}", err));
}
})
Expand Down Expand Up @@ -337,7 +349,7 @@ impl SubgraphHandles {
)
.await?;
let subgraph_watcher = SubgraphWatcher::new(
lazily_resolved_subgraph,
lazily_resolved_subgraph.clone(),
resolver,
introspection_polling_interval,
subgraph.to_string(),
Expand All @@ -351,10 +363,23 @@ impl SubgraphHandles {
.map(|subgraph| {
let _ = self
.sender
.send(Subgraph(SubgraphEvent::SubgraphChanged(subgraph.into())))
.send(Subgraph(SubgraphEvent::SubgraphSchemaChanged(
subgraph.into(),
)))
.tap_err(|err| tracing::error!("{:?}", err));
});
}

// It's possible that the routing_url was updated at this point so we need to update that
// and propagate the update through by forcing a recomposition. This may be unnecessary,
// but we'll figure that out on the receiving end rather than passing around more
// context.
let _ = self.sender.send(Subgraph(SubgraphEvent::RoutingUrlChanged(
SubgraphRoutingUrlChanged {
name: subgraph.to_string(),
routing_url: lazily_resolved_subgraph.routing_url().clone(),
},
)));
Ok(())
}

Expand Down Expand Up @@ -386,7 +411,9 @@ impl SubgraphHandles {
.map(|subgraph| {
let _ = self
.sender
.send(Subgraph(SubgraphEvent::SubgraphChanged(subgraph.into())))
.send(Subgraph(SubgraphEvent::SubgraphSchemaChanged(
subgraph.into(),
)))
.tap_err(|err| tracing::error!("{:?}", err));
});
}
Expand All @@ -402,7 +429,7 @@ impl SubgraphHandles {
Subtask::<SubgraphWatcher, FullyResolvedSubgraph>::new(subgraph_watcher);
let _ = self
.sender
.send(Subgraph(SubgraphEvent::SubgraphChanged(
.send(Subgraph(SubgraphEvent::SubgraphSchemaChanged(
subgraph.clone().into(),
)))
.tap_err(|err| tracing::error!("{:?}", err));
Expand All @@ -414,9 +441,10 @@ impl SubgraphHandles {
cancellation_token
.run_until_cancelled(async move {
while let Some(subgraph) = messages.next().await {
tracing::info!("Subgraph change detected: {:?}", subgraph);
let _ = sender
.send(Subgraph(SubgraphEvent::SubgraphChanged(subgraph.into())))
.send(Subgraph(SubgraphEvent::SubgraphSchemaChanged(
subgraph.into(),
)))
.tap_err(|err| tracing::error!("{:?}", err));
}
})
Expand Down
21 changes: 8 additions & 13 deletions src/composition/watchers/watcher/supergraph_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,40 +172,35 @@ impl SupergraphConfigDiff {
old: &SupergraphConfig,
new: SupergraphConfig,
) -> Result<SupergraphConfigDiff, ConfigError> {
let old_subgraph_names_and_urls: HashSet<(String, Option<String>)> = old
let old_subgraph_names: HashSet<String> = old
.clone()
.into_iter()
.map(|(name, config)| (name, config.routing_url))
.map(|(name, _config)| name)
.collect();

let new_subgraph_names_and_urls: HashSet<(String, Option<String>)> = new
let new_subgraph_names: HashSet<String> = new
.clone()
.into_iter()
.map(|(name, config)| (name, config.routing_url))
.map(|(name, _config)| name)
.collect();

// Collect the subgraph definitions from the new supergraph config.
let new_subgraphs: BTreeMap<String, SubgraphConfig> = new.clone().into_iter().collect();

// Compare the old and new subgraph names to find additions.
let added_names: HashSet<String> = new_subgraph_names_and_urls
.difference(&old_subgraph_names_and_urls)
.map(|(a, _)| a.clone())
.collect();
let added_names: HashSet<String> =
HashSet::from_iter(new_subgraph_names.difference(&old_subgraph_names).cloned());

// Compare the old and new subgraph names to find removals.
let removed_names: HashSet<String> = old_subgraph_names_and_urls
.difference(&new_subgraph_names_and_urls)
.map(|(a, _)| a.clone())
.collect();
let removed_names = old_subgraph_names.difference(&new_subgraph_names);

// Filter the added and removed subgraphs from the new supergraph config.
let added = new_subgraphs
.clone()
.into_iter()
.filter(|(name, _)| added_names.contains(name))
.collect::<Vec<_>>();
let removed = removed_names.into_iter().collect::<Vec<_>>();
let removed = removed_names.into_iter().cloned().collect::<Vec<_>>();

// Find any in-place changes (eg, SDL, SchemaSource::Subgraph)
let changed = old
Expand Down
Loading