Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROVER-279 Funnels Errors from Supergraph Config Deserialisation to LSP #2345

Merged
merged 5 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ timber = { workspace = true }
termimad = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros", "process", "sync"] }
tokio-stream = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"]}
tokio-util = { workspace = true }
toml = { workspace = true }
tower = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/command/lsp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ async fn start_composition(
}
CompositionEvent::Error(err) => {
debug!("Composition failed: {err}");
let message = format!("Failed run composition: {err}");
let message = format!("Composition failed to run: {err}",);
let diagnostic = Diagnostic::new_simple(Range::default(), message);
language_server
.publish_diagnostics(supergraph_yaml_url.clone(), vec![diagnostic])
Expand Down
2 changes: 2 additions & 0 deletions src/composition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub enum CompositionError {
},
#[error("Serialization error.\n{}", .0)]
SerdeYaml(#[from] serde_yaml::Error),
#[error("{}", .0)]
InvalidSupergraphConfig(String),
}

#[derive(Debug, Eq, PartialEq)]
Expand Down
58 changes: 47 additions & 11 deletions src/composition/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

use camino::Utf8PathBuf;
use futures::stream::{BoxStream, StreamExt};
use futures::stream::{select, BoxStream, StreamExt};
use rover_http::HttpService;
use tower::ServiceExt;

Expand All @@ -28,11 +28,13 @@ use super::{
watchers::{composition::CompositionWatcher, subgraphs::SubgraphWatchers},
};
use crate::composition::supergraph::binary::OutputTarget;
use crate::composition::watchers::federation::FederationWatcher;
use crate::subtask::{BroadcastSubtask, SubtaskRunUnit};
use crate::{
composition::watchers::watcher::{
file::FileWatcher, supergraph_config::SupergraphConfigWatcher,
},
subtask::{Subtask, SubtaskRunStream, SubtaskRunUnit},
subtask::{Subtask, SubtaskRunStream},
utils::effect::{exec::ExecCommand, read_file::ReadFile, write_file::WriteFile},
};

Expand Down Expand Up @@ -173,42 +175,76 @@ where
{
/// Runs the [`Runner`]
pub fn run(self) -> BoxStream<'static, CompositionEvent> {
let (supergraph_config_stream, supergraph_config_subtask) = if let Some(
supergraph_config_watcher,
) =
self.state.supergraph_config_watcher
{
let (
supergraph_config_stream_for_subtask_watcher,
supergraph_config_stream_for_federation_watcher,
supergraph_config_subtask,
) = if let Some(supergraph_config_watcher) = self.state.supergraph_config_watcher {
tracing::info!("Watching subgraphs for changes...");
let (supergraph_config_stream, supergraph_config_subtask) =
Subtask::new(supergraph_config_watcher);
BroadcastSubtask::new(supergraph_config_watcher);
(
supergraph_config_stream.boxed(),
supergraph_config_subtask.subscribe().boxed(),
Some(supergraph_config_subtask),
)
} else {
tracing::warn!(
"No supergraph config detected, changes to subgraph configurations will not be applied automatically"
);
(tokio_stream::empty().boxed(), None)
(
tokio_stream::empty().boxed(),
tokio_stream::empty().boxed(),
None,
)
};

let (subgraph_change_stream, subgraph_watcher_subtask) =
Subtask::new(self.state.subgraph_watchers);

let (federation_watcher_stream, federation_watcher_subtask) =
Subtask::new(FederationWatcher {});

// Create a new subtask for the composition handler, passing in a stream of subgraph change
// events in order to trigger recomposition.
let (composition_messages, composition_subtask) =
Subtask::new(self.state.composition_watcher);
composition_subtask.run(subgraph_change_stream.boxed());

// Start subgraph watchers, listening for events from the supergraph change stream.
subgraph_watcher_subtask.run(supergraph_config_stream);
subgraph_watcher_subtask.run(
supergraph_config_stream_for_subtask_watcher
.filter_map(|recv_res| async move {
match recv_res {
Ok(res) => Some(res),
Err(e) => {
tracing::warn!("Error receiving from broadcast stream: {:?}", e);
None
}
}
})
.boxed(),
);

federation_watcher_subtask.run(
supergraph_config_stream_for_federation_watcher
.filter_map(|recv_res| async move {
match recv_res {
Ok(res) => Some(res),
Err(e) => {
tracing::warn!("Error receiving from broadcast stream: {:?}", e);
None
}
}
})
.boxed(),
);

// Start the supergraph watcher subtask.
if let Some(supergraph_config_subtask) = supergraph_config_subtask {
supergraph_config_subtask.run();
}

composition_messages.boxed()
select(composition_messages, federation_watcher_stream).boxed()
}
}
37 changes: 27 additions & 10 deletions src/composition/supergraph/config/error/subgraph.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::path::PathBuf;
use std::sync::Arc;

use camino::Utf8PathBuf;
use http::header::{InvalidHeaderName, InvalidHeaderValue};

/// Errors that may occur as a result of resolving subgraphs
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, Clone)]
pub enum ResolveSubgraphError {
/// Occurs when the subgraph schema file cannot found relative to the supplied
/// supergraph config file
Expand All @@ -19,37 +20,45 @@ pub enum ResolveSubgraphError {
/// The result of joining the paths together, that caused the failure
joined_path: PathBuf,
/// The source error
source: std::io::Error,
source: Arc<std::io::Error>,
},
/// Occurs as a result of an IO error
#[error(transparent)]
Io(#[from] std::io::Error),
Io {
/// Source error from std::io, wrapped in Arc to make this error Cloneable, and support
/// broadcasting.
source: Arc<std::io::Error>,
},
/// Occurs as a result of a rover_std::Fs error
#[error(transparent)]
Fs(Box<dyn std::error::Error + Send + Sync>),
Fs {
/// Source error from rover_std::Fs, wrapped in Arc to make this error Cloneable, and support
/// broadcasting.
source: Arc<Box<dyn std::error::Error + Send + Sync>>,
},
/// Occurs when a introspection against a subgraph fails
#[error("Failed to introspect the subgraph \"{subgraph_name}\".")]
IntrospectionError {
/// The subgraph name that failed to be resolved
subgraph_name: String,
/// The source error
source: Box<dyn std::error::Error + Send + Sync>,
source: Arc<Box<dyn std::error::Error + Send + Sync>>,
},
/// Occurs when a supplied graph ref cannot be parsed
#[error("Invalid graph ref: {graph_ref}")]
InvalidGraphRef {
/// The supplied graph ref
graph_ref: String,
/// The source error
source: Box<dyn std::error::Error + Send + Sync>,
source: Arc<Box<dyn std::error::Error + Send + Sync>>,
},
/// Occurs when fetching a remote subgraph fails
#[error("Failed to fetch the sdl for subgraph `{}` from remote.\n {}", .subgraph_name, .source)]
FetchRemoteSdlError {
/// The name of the subgraph that failed to be resolved
subgraph_name: String,
/// The source error
source: Box<dyn std::error::Error + Send + Sync>,
source: Arc<Box<dyn std::error::Error + Send + Sync>>,
},
/// Occurs when a supergraph config filepath waqs expected but not found
#[error("Failed to find the supergraph config, which is required when resolving schemas in a file relative to a supergraph config")]
Expand All @@ -68,11 +77,19 @@ pub enum ResolveSubgraphError {
},
/// Pass-through for [`http::InvalidHeaderName`]
#[error(transparent)]
HeaderName(#[from] InvalidHeaderName),
HeaderName {
/// Source error from hyper, wrapped in Arc to make this error Cloneable, and support
/// broadcasting.
source: Arc<InvalidHeaderName>,
},
/// Pass-through for [`http::InvalidHeaderValue`]
#[error(transparent)]
HeaderValue(#[from] InvalidHeaderValue),
HeaderValue {
/// Source error from hyper, wrapped in Arc to make this error Cloneable, and support
/// broadcasting.
source: Arc<InvalidHeaderValue>,
},
/// Pass-through error for when a [`tower::Service`] fails to be ready
#[error(transparent)]
ServiceReady(#[from] Box<dyn std::error::Error + Send + Sync>),
ServiceReady(#[from] Arc<Box<dyn std::error::Error + Send + Sync>>),
}
9 changes: 5 additions & 4 deletions src/composition/supergraph/config/full/subgraph/file.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
//! Utilities that allow for resolving file-based subgraphs

use std::pin::Pin;
use std::sync::Arc;

use buildstructor::Builder;
use camino::Utf8PathBuf;
use futures::Future;
use rover_std::Fs;
use tower::Service;

use super::FullyResolvedSubgraph;
use crate::composition::supergraph::config::{
error::ResolveSubgraphError, unresolved::UnresolvedSubgraph,
};

use super::FullyResolvedSubgraph;

/// Service that resolves a file-based subgraph
#[derive(Clone, Builder)]
pub struct ResolveFileSubgraph {
Expand Down Expand Up @@ -41,8 +41,9 @@ impl Service<()> for ResolveFileSubgraph {
let subgraph_name = unresolved_subgraph.name().to_string();
let fut = async move {
let file = unresolved_subgraph.resolve_file_path(&supergraph_config_root, &path)?;
let schema =
Fs::read_file(&file).map_err(|err| ResolveSubgraphError::Fs(Box::new(err)))?;
let schema = Fs::read_file(&file).map_err(|err| ResolveSubgraphError::Fs {
source: Arc::new(Box::new(err)),
})?;
let routing_url = unresolved_subgraph.routing_url().clone().ok_or_else(|| {
ResolveSubgraphError::MissingRoutingUrl {
subgraph: unresolved_subgraph.name().to_string(),
Expand Down
16 changes: 10 additions & 6 deletions src/composition/supergraph/config/full/subgraph/introspect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Utilities that help resolve a subgraph via introspection

use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};

use buildstructor::Builder;
Expand All @@ -13,9 +14,8 @@ use rover_http::{extend_headers::ExtendHeadersLayer, HttpService};
use tower::{util::BoxCloneService, Service, ServiceBuilder, ServiceExt};
use url::Url;

use crate::composition::supergraph::config::error::ResolveSubgraphError;

use super::FullyResolvedSubgraph;
use crate::composition::supergraph::config::error::ResolveSubgraphError;

/// Alias for a service that fully resolves a subgraph via introspection
pub type ResolveIntrospectSubgraphService =
Expand Down Expand Up @@ -76,10 +76,14 @@ impl Service<MakeResolveIntrospectSubgraphRequest> for MakeResolveIntrospectSubg
.iter()
.map(|(key, value)| {
HeaderName::from_bytes(key.as_bytes())
.map_err(ResolveSubgraphError::from)
.map_err(|err| ResolveSubgraphError::HeaderName {
source: Arc::new(err),
})
.and_then(|key| {
HeaderValue::from_str(value)
.map_err(ResolveSubgraphError::from)
.map_err(|err| ResolveSubgraphError::HeaderValue {
source: Arc::new(err),
})
.map(|value| (key, value))
})
})
Expand Down Expand Up @@ -130,7 +134,7 @@ where
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner
.poll_ready(cx)
.map_err(|err| ResolveSubgraphError::ServiceReady(Box::new(err)))
.map_err(|err| ResolveSubgraphError::ServiceReady(Arc::new(Box::new(err))))
}

fn call(&mut self, _req: ()) -> Self::Future {
Expand All @@ -143,7 +147,7 @@ where
let schema = inner.call(()).await.map_err(|err| {
ResolveSubgraphError::IntrospectionError {
subgraph_name: subgraph_name.to_string(),
source: Box::new(err),
source: Arc::new(Box::new(err)),
}
})?;
Ok(FullyResolvedSubgraph::builder()
Expand Down
16 changes: 8 additions & 8 deletions src/composition/supergraph/config/full/subgraph/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::str::FromStr;
use std::sync::Arc;

use apollo_federation_types::config::{SchemaSource, SubgraphConfig};
use apollo_parser::{cst, Parser};
Expand All @@ -12,16 +13,15 @@ pub mod file;
pub mod introspect;
pub mod remote;

use crate::composition::supergraph::config::{
error::ResolveSubgraphError, resolver::fetch_remote_subgraph::FetchRemoteSubgraphFactory,
unresolved::UnresolvedSubgraph,
};

use self::{
file::ResolveFileSubgraph,
introspect::{MakeResolveIntrospectSubgraphRequest, ResolveIntrospectSubgraphFactory},
remote::ResolveRemoteSubgraph,
};
use crate::composition::supergraph::config::{
error::ResolveSubgraphError, resolver::fetch_remote_subgraph::FetchRemoteSubgraphFactory,
unresolved::UnresolvedSubgraph,
};

/// Alias for a [`tower::Service`] that fully resolves a subgraph
pub type FullyResolveSubgraphService =
Expand Down Expand Up @@ -89,21 +89,21 @@ impl FullyResolvedSubgraph {
let graph_ref = GraphRef::from_str(&graph_ref).map_err(|err| {
ResolveSubgraphError::InvalidGraphRef {
graph_ref: graph_ref.clone(),
source: Box::new(err),
source: Arc::new(Box::new(err)),
}
})?;

let fetch_remote_subgraph_factory = fetch_remote_subgraph_factory
.ready()
.await
.map_err(|err| ResolveSubgraphError::ServiceReady(Box::new(err)))?;
.map_err(|err| ResolveSubgraphError::ServiceReady(Arc::new(Box::new(err))))?;

let service = fetch_remote_subgraph_factory
.call(())
.await
.map_err(|err| ResolveSubgraphError::FetchRemoteSdlError {
subgraph_name: subgraph.to_string(),
source: Box::new(err),
source: Arc::new(Box::new(err)),
})?;
let service = ResolveRemoteSubgraph::builder()
.graph_ref(graph_ref)
Expand Down
Loading
Loading