diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 94faf2e372..4dbb353a8b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -68,8 +68,8 @@ jobs: - name: Install nextest run: curl -LsSf https://get.nexte.st/latest/linux | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin - name: Build - run: cargo build -p qt -p quilkin --tests - - run: cargo nextest run -p qt -p quilkin + run: cargo build -p qt -p quilkin -p xds --tests + - run: cargo nextest run -p qt -p quilkin -p xds quilkin build: name: Build diff --git a/crates/xds/Cargo.toml b/crates/xds/Cargo.toml index 3ad4260e6b..0ecd97ea3c 100644 --- a/crates/xds/Cargo.toml +++ b/crates/xds/Cargo.toml @@ -4,7 +4,8 @@ version = "0.1.0" edition.workspace = true license.workspace = true -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +test = true [dependencies] quilkin-proto = { path = "../quilkin-proto" } diff --git a/crates/xds/src/client.rs b/crates/xds/src/client.rs index 3f7a025c31..6a5295de4e 100644 --- a/crates/xds/src/client.rs +++ b/crates/xds/src/client.rs @@ -30,16 +30,12 @@ use tryhard::{ }; use crate::{ - generated::{ - envoy::{ - config::core::v3::Node, - service::discovery::v3::{ - aggregated_discovery_service_client::AggregatedDiscoveryServiceClient, - DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse, - }, - }, - quilkin::relay::v1alpha1::aggregated_control_plane_discovery_service_client::AggregatedControlPlaneDiscoveryServiceClient, + core::Node, + discovery::{ + aggregated_discovery_service_client::AggregatedDiscoveryServiceClient, + DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse, }, + generated::quilkin::relay::v1alpha1::aggregated_control_plane_discovery_service_client::AggregatedControlPlaneDiscoveryServiceClient, Result, }; diff --git a/crates/xds/src/config.rs b/crates/xds/src/config.rs index 99afffd038..fb67e8d474 100644 --- a/crates/xds/src/config.rs +++ b/crates/xds/src/config.rs @@ -199,7 +199,7 @@ pub trait Configuration: Send + Sync + Sized + 'static { } pub struct DeltaDiscoveryRes { - pub resources: Vec, + pub resources: Vec, pub removed: std::collections::HashSet, } @@ -276,7 +276,7 @@ pub fn handle_delta_discovery_responses( let error_detail = if let Err(error) = result { crate::metrics::nacks(control_plane_identifier, &type_url).inc(); - Some(crate::generated::google::rpc::Status { + Some(quilkin_proto::generated::google::rpc::Status { code: 3, message: error.to_string(), ..Default::default() diff --git a/crates/xds/src/lib.rs b/crates/xds/src/lib.rs index 5b0fcdc353..9679d88ace 100644 --- a/crates/xds/src/lib.rs +++ b/crates/xds/src/lib.rs @@ -19,281 +19,16 @@ pub mod config; pub mod locality; pub mod metrics; pub mod net; -pub mod resource; pub mod server; -pub use crate::generated::envoy::{ +pub use client::{AdsClient, Client}; + +pub use generated::envoy::{ config::core::v3::{self as core, socket_address}, config::listener::v3 as listener, service::discovery::v3 as discovery, }; -pub use client::{AdsClient, Client}; -pub use quilkin_proto as generated; -pub use resource::{Resource, ResourceType}; +pub use generated::quilkin::config::v1alpha1 as proto; +pub use quilkin_proto::generated; pub type Result = std::result::Result; - -#[cfg(test)] -mod tests { - use super::*; - - use std::sync::Arc; - - use crate::test::AddressType; - use crate::{ - config::Config, - filters::*, - net::{endpoint::Endpoint, TcpListener}, - }; - - #[tokio::test] - #[ignore = "flaky, ignoring for now"] - async fn token_routing() { - let mut helper = crate::test::TestHelper::default(); - let token = "mytoken"; - let address = { - let mut addr = Endpoint::new(helper.run_echo_server(AddressType::Ipv6).await); - addr.metadata.known.tokens.insert(token.into()); - crate::test::map_to_localhost(&mut addr.address).await; - addr - }; - let clusters = crate::net::cluster::ClusterMap::default(); - - tracing::debug!(?address); - clusters.insert_default([address].into()); - tracing::debug!(?clusters); - - let xds_config: Arc = serde_json::from_value(serde_json::json!({ - "version": "v1alpha1", - "id": "test-proxy", - "clusters": clusters, - })) - .map(Arc::new) - .unwrap(); - - let client_addr = crate::test::available_addr(AddressType::Random).await; - let client_config = serde_json::from_value(serde_json::json!({ - "version": "v1alpha1", - "id": "test-proxy", - })) - .map(Arc::new) - .unwrap(); - - let xds_one = TcpListener::bind(None).unwrap(); - let xds_two = TcpListener::bind(None).unwrap(); - - let xds_one_port = xds_one.port(); - let xds_two_port = xds_two.port(); - - // Test that the client can handle the manager dropping out. - let handle = tokio::spawn( - server::ControlPlane::from_arc( - xds_config.clone(), - crate::components::admin::IDLE_REQUEST_INTERVAL, - ) - .management_server(xds_one) - .unwrap(), - ); - - let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(crate::ShutdownKind::Testing); - tokio::spawn( - server::ControlPlane::from_arc( - xds_config.clone(), - crate::components::admin::IDLE_REQUEST_INTERVAL, - ) - .management_server(xds_two) - .unwrap(), - ); - let client_proxy = crate::cli::Proxy { - port: client_addr.port(), - management_server: vec![ - format!("http://[::1]:{xds_one_port}").parse().unwrap(), - format!("http://[::1]:{xds_two_port}").parse().unwrap(), - ], - qcmp_port: 0, - ..<_>::default() - }; - - tokio::spawn(async move { - client_proxy - .run(client_config, Default::default(), None, shutdown_rx) - .await - }); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - handle.abort(); - let _ = handle.await; - - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - const VERSION_KEY: &str = "quilkin.dev/load_balancer/version"; - const TOKEN_KEY: &str = "quilkin.dev/load_balancer/token"; - - xds_config.filters.store(Arc::new( - FilterChain::try_create([ - Capture::as_filter_config(capture::Config { - metadata_key: VERSION_KEY.into(), - strategy: capture::Suffix { - size: 1, - remove: true, - } - .into(), - }) - .unwrap(), - Match::as_filter_config(r#match::Config { - on_write: None, - on_read: Some(r#match::DirectionalConfig { - metadata_key: VERSION_KEY.into(), - branches: vec![r#match::Branch { - value: 1.into(), - filter: Capture::as_filter_config(capture::Config { - metadata_key: TOKEN_KEY.into(), - strategy: capture::Suffix { - size: 16, - remove: true, - } - .into(), - }) - .unwrap(), - }], - fallthrough: <_>::default(), - }), - }) - .unwrap(), - Match::as_filter_config(r#match::Config { - on_write: None, - on_read: Some(r#match::DirectionalConfig { - metadata_key: VERSION_KEY.into(), - branches: vec![r#match::Branch { - value: 1.into(), - filter: TokenRouter::as_filter_config(token_router::Config { - metadata_key: TOKEN_KEY.into(), - }) - .unwrap(), - }], - fallthrough: <_>::default(), - }), - }) - .unwrap(), - ]) - .unwrap(), - )); - - let fixture = "Hello World!"; - let data = fixture.as_bytes(); - let mut packet = data.to_vec(); - packet.extend(token.as_bytes()); - - let client = helper.open_socket_and_recv_single_packet().await; - - client - .socket - .send_to(&packet, (std::net::Ipv6Addr::LOCALHOST, client_addr.port())) - .await - .unwrap(); - let response = - tokio::time::timeout(std::time::Duration::from_millis(100), client.packet_rx) - .await - .unwrap() - .unwrap(); - - assert_eq!(format!("{}{}", fixture, token), response); - } - - #[tokio::test] - async fn basic() { - let config: Arc = serde_json::from_value(serde_json::json!({ - "version": "v1alpha1", - "id": "test-proxy", - })) - .map(Arc::new) - .unwrap(); - - let proxy_config = crate::components::proxy::Ready::default(); - let listener = TcpListener::bind(None).unwrap(); - let port = listener.port(); - tokio::spawn( - crate::net::xds::server::ControlPlane::from_arc( - config.clone(), - proxy_config.idle_request_interval, - ) - .management_server(listener) - .unwrap(), - ); - let client = Client::connect( - "test-client".into(), - vec![format!("http://127.0.0.1:{port}").try_into().unwrap()], - ) - .await - .unwrap(); - let mut stream = client.xds_client_stream(config.clone(), proxy_config); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - // Each time, we create a new upstream endpoint and send a cluster update for it. - let concat_bytes = vec![("b", "c,"), ("d", "e")]; - for (b1, b2) in concat_bytes.into_iter() { - let socket = std::net::UdpSocket::bind((std::net::Ipv6Addr::LOCALHOST, 0)).unwrap(); - let local_addr: crate::net::endpoint::EndpointAddress = - socket.local_addr().unwrap().into(); - - config.clusters.modify(|clusters| { - clusters.insert( - None, - Some(Endpoint::new(local_addr.clone())) - .into_iter() - .collect(), - ); - }); - - let filters = FilterChain::try_create([ - Concatenate::as_filter_config(concatenate::Config { - on_read: concatenate::Strategy::Append, - on_write: <_>::default(), - bytes: b1.as_bytes().to_vec(), - }) - .unwrap(), - Concatenate::as_filter_config(concatenate::Config { - on_read: concatenate::Strategy::Append, - on_write: <_>::default(), - bytes: b2.as_bytes().to_vec(), - }) - .unwrap(), - ]) - .unwrap(); - - config.filters.modify(|chain| *chain = filters.clone()); - - stream - .aggregated_subscribe(ResourceType::Cluster, &[]) - .await - .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - assert_eq!( - local_addr, - config - .clusters - .read() - .get_default() - .unwrap() - .endpoints - .iter() - .next() - .unwrap() - .address - ); - - stream - .aggregated_subscribe(ResourceType::Listener, &[]) - .await - .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - let changed_filters = config.filters.load(); - - assert_eq!(changed_filters.len(), 2); - - let mut iter = changed_filters.iter(); - assert_eq!(iter.next().unwrap(), filters[0].clone().into()); - assert_eq!(iter.next().unwrap(), filters[1].clone().into()); - } - } -} diff --git a/crates/xds/src/locality.rs b/crates/xds/src/locality.rs index 974ead0dad..a52078a9f4 100644 --- a/crates/xds/src/locality.rs +++ b/crates/xds/src/locality.rs @@ -135,14 +135,14 @@ impl std::str::FromStr for Locality { } } -impl From for Locality { +impl From for Locality { #[inline] - fn from(value: crate::resource::proto::Locality) -> Self { + fn from(value: crate::proto::Locality) -> Self { Self::new(value.region, value.zone, value.sub_zone) } } -impl From for crate::resource::proto::Locality { +impl From for crate::proto::Locality { #[inline] fn from(value: Locality) -> Self { Self { diff --git a/crates/xds/src/resource.rs b/crates/xds/src/resource.rs deleted file mode 100644 index 110e4c0382..0000000000 --- a/crates/xds/src/resource.rs +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -pub use crate::generated::quilkin::config::v1alpha1 as proto; -use prost_types::Any; - -pub type Resource = Box; - -pub trait ResourceType { - fn name(&self) -> String; - fn type_url(&self) -> &'static str; - - fn decode(&mut self, any: Any) -> Result<(), eyre::Error>; - fn encode(&self) -> Result; -} diff --git a/crates/xds/src/server.rs b/crates/xds/src/server.rs index 60faf8944e..7600b0ce9e 100644 --- a/crates/xds/src/server.rs +++ b/crates/xds/src/server.rs @@ -461,125 +461,3 @@ impl AggregatedControlPlaneDiscoveryService for }))) } } - -#[cfg(test)] -mod tests { - use pretty_assertions::assert_eq; - use tokio::time::timeout; - - use super::*; - use crate::{ - core::Node, - listener::v3::{FilterChain, Listener}, - listener::{FilterChain, Listener}, - ResourceType, - }; - - const TIMEOUT_DURATION: std::time::Duration = std::time::Duration::from_secs(10); - - #[tokio::test] - async fn valid_response() { - const RESOURCE: ResourceType = ResourceType::Cluster; - const LISTENER_TYPE: ResourceType = ResourceType::Listener; - - let mut response = DeltaDiscoveryResponse { - resources: vec![], - type_url: RESOURCE.type_url().into(), - ..<_>::default() - }; - - let mut listener_response = DeltaDiscoveryResponse { - resources: vec![prost_types::Any { - type_url: LISTENER_TYPE.type_url().into(), - value: crate::codec::prost::encode(&Listener { - filter_chains: vec![FilterChain { - filters: vec![], - ..<_>::default() - }], - ..<_>::default() - }) - .unwrap(), - }], - type_url: LISTENER_TYPE.type_url().into(), - ..<_>::default() - }; - - let config = Arc::new(Config::default_non_agent()); - let client = ControlPlane::from_arc(config.clone(), TIMEOUT_DURATION); - let (tx, rx) = tokio::sync::mpsc::channel(256); - - let mut request = DiscoveryRequest { - node: Some(Node { - id: "quilkin".into(), - user_agent_name: "quilkin".into(), - ..Node::default() - }), - resource_names: vec![], - type_url: RESOURCE.type_url().into(), - ..DiscoveryRequest::default() - }; - - let mut listener_request = DiscoveryRequest { - node: Some(Node { - id: "quilkin".into(), - user_agent_name: "quilkin".into(), - ..Node::default() - }), - resource_names: vec![], - type_url: LISTENER_TYPE.type_url().into(), - ..DiscoveryRequest::default() - }; - - timeout(TIMEOUT_DURATION, tx.send(Ok(request.clone()))) - .await - .unwrap() - .unwrap(); - - let mut stream = timeout( - TIMEOUT_DURATION, - client.stream_resources(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))), - ) - .await - .unwrap() - .unwrap(); - - let message = timeout(TIMEOUT_DURATION, stream.next()) - .await - .unwrap() - .unwrap() - .unwrap(); - response.version_info = message.version_info.clone(); - response.nonce = message.nonce.clone(); - response.control_plane = message.control_plane.clone(); - request.response_nonce = message.nonce.clone(); - - assert_eq!(response, message); - - timeout(TIMEOUT_DURATION, tx.send(Ok(request.clone()))) - .await - .unwrap() - .unwrap(); - - timeout(TIMEOUT_DURATION, tx.send(Ok(listener_request.clone()))) - .await - .unwrap() - .unwrap(); - - let message = timeout(TIMEOUT_DURATION, stream.next()) - .await - .unwrap() - .unwrap() - .unwrap(); - listener_response.control_plane = message.control_plane.clone(); - listener_response.version_info = message.version_info.clone(); - listener_response.nonce = message.nonce.clone(); - listener_request.response_nonce = message.nonce.clone(); - - assert_eq!(listener_response, message); - - timeout(TIMEOUT_DURATION, tx.send(Ok(listener_request.clone()))) - .await - .unwrap() - .unwrap(); - } -}