Skip to content

Commit

Permalink
Add xDS server
Browse files Browse the repository at this point in the history
Co-Authored-By: rezvaneh <[email protected]>
Co-Authored-By: markmandel <[email protected]>
  • Loading branch information
3 people committed Apr 19, 2022
1 parent 49adc6a commit cc35b7a
Show file tree
Hide file tree
Showing 14 changed files with 1,076 additions and 94 deletions.
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,31 @@ quilkin-macros = { version = "0.4.0-dev", path = "./macros" }

# Crates.io
arc-swap = "1.5.0"
async-stream = "0.3.3"
base64 = "0.13.0"
base64-serde = "0.6.1"
bytes = { version = "1.1.0", features = ["serde"] }
cached = "0.34.0"
chrono = "0.4.19"
clap = { version = "3.1.2", features = ["cargo", "derive", "env"] }
dashmap = "4.0.2"
either = "1.6.1"
eyre = "0.6.7"
futures = "0.3.21"
hyper = "0.14.17"
ipnetwork = "0.18.0"
k8s-openapi = { version = "0.14.0", features = ["v1_21", "schemars"] }
kube = { version = "0.70.0", features = ["derive"] }
num_cpus = "1.13.0"
once_cell = "1.9.0"
openssl = { version = "0.10", features = ["vendored"] }
parking_lot = "0.12"
prometheus = { version = "0.13.0", default-features = false }
prost = "=0.9.0"
prost-types = "=0.9.0"
rand = "0.8.5"
regex = "1.5.4"
schemars = { version = "0.8.8", features = ["bytes"] }
schemars = { version = "0.8.8", features = ["chrono", "bytes"] }
serde = { version = "1.0.130", features = ["derive", "rc"] }
serde_json = "1.0.79"
serde_regex = "1.1.0"
Expand Down
1 change: 1 addition & 0 deletions build/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ build-windows-binary: ensure-build-image
build-macos-binary:
docker run --rm -v $(project_path):/workspace -w /workspace \
-v $(CARGO_HOME)/registry:/root/.cargo/registry \
-e "CC=o64-clang" -e "CXX=o64-clang++" \
joseluisq/rust-linux-darwin-builder:$(rust_toolchain) \
sh -c "rustup target add x86_64-apple-darwin && cargo build --target x86_64-apple-darwin && cargo build --release --target x86_64-apple-darwin"

Expand Down
24 changes: 24 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,30 @@ pub struct Filter {
pub config: Option<serde_json::Value>,
}

impl TryFrom<Filter> for crate::xds::config::listener::v3::Filter {
type Error = crate::filters::Error;

fn try_from(filter: Filter) -> Result<Self, Self::Error> {
use crate::xds::config::listener::v3::filter::ConfigType;

let config_type = match filter.config {
Some(config) => {
let any = crate::filters::FilterRegistry::get_factory(&filter.name)
.ok_or_else(|| crate::filters::Error::NotFound(filter.name.clone()))?
.encode_config_to_protobuf(config)?;

Some(ConfigType::TypedConfig(any))
}
None => None,
};

Ok(Self {
name: filter.name,
config_type,
})
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub mod filters;

#[doc(hidden)]
pub mod test_utils;
#[doc(hidden)]
pub mod xds;

pub type Result<T, E = eyre::Error> = std::result::Result<T, E>;
Expand All @@ -39,6 +38,7 @@ pub use self::{
config::Config,
proxy::{Builder, PendingValidation, Server, Validated},
runner::run,
xds::manage,
};

pub use quilkin_macros::include_proto;
Expand Down
59 changes: 59 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,44 @@ enum Commands {
)]
filter_ids: Vec<String>,
},
Manage {
#[clap(
short,
long,
default_value = "18000",
help = "The listening port for the management server."
)]
port: u16,
#[clap(
short,
long,
default_value = "18090",
help = "The listening port for the admin server."
)]
admin_port: u16,
#[clap(subcommand)]
provider: ProviderCommands,
},
}

#[derive(clap::Subcommand)]
enum ProviderCommands {
Agones {
#[clap(
short,
long,
default_value = "quilkin",
help = "Namespace under which the proxies run."
)]
config_namespace: String,
#[clap(
short,
long,
default_value = "gameservers",
help = "Namespace under which the game servers run."
)]
gameservers_namespace: String,
},
}

#[tokio::main]
Expand Down Expand Up @@ -93,6 +131,27 @@ async fn main() -> quilkin::Result<()> {
quilkin::run(config, vec![]).await
}

Commands::Manage {
port,
admin_port,
provider,
} => {
let provider = match provider {
ProviderCommands::Agones {
gameservers_namespace,
config_namespace,
} => std::sync::Arc::from(
quilkin::xds::provider::AgonesProvider::new(
gameservers_namespace,
config_namespace,
)
.await?,
),
};

quilkin::manage(port, admin_port, provider).await
}

Commands::GenerateConfigSchema {
output_directory,
filter_ids,
Expand Down
28 changes: 28 additions & 0 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

/// Common utilities for testing
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
str::from_utf8,
sync::Arc,
Expand All @@ -30,6 +31,7 @@ use crate::{
filters::{prelude::*, FilterRegistry},
metadata::Value,
proxy::{Builder, PendingValidation},
xds::{DiscoveryServiceProvider, ResourceType, service::discovery::v3::DiscoveryResponse},
};

// TestFilter is useful for testing that commands are executing filters appropriately.
Expand Down Expand Up @@ -307,6 +309,32 @@ pub fn load_test_filters() {
FilterRegistry::register([TestFilter::factory()]);
}

pub struct TestProvider {
resources: HashMap<ResourceType, DiscoveryResponse>,
}

impl TestProvider {
pub fn new(resources: HashMap<ResourceType, DiscoveryResponse>) -> Self {
Self { resources }
}
}

#[tonic::async_trait]
impl DiscoveryServiceProvider for TestProvider {
async fn discovery_request(
&self,
_node_id: &str,
_version: u64,
kind: ResourceType,
_names: &[String],
) -> Result<DiscoveryResponse, tonic::Status> {
self.resources
.get(&kind)
.cloned()
.ok_or_else(|| tonic::Status::not_found("No resource supplied"))
}
}

#[cfg(test)]
mod tests {
use crate::test_utils::TestHelper;
Expand Down
50 changes: 46 additions & 4 deletions src/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,57 @@ mod google {
}
}

const ENDPOINT_TYPE: &str = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
const CLUSTER_TYPE: &str = "type.googleapis.com/envoy.config.cluster.v3.Cluster";
const LISTENER_TYPE: &str = "type.googleapis.com/envoy.config.listener.v3.Listener";
macro_rules! type_urls {
($($base_url:literal : {$($const_name:ident = $type_url:literal),+ $(,)?})+) => {
$(
$(
pub const $const_name : &str = concat!($base_url, "/", $type_url);
)+
)+
}
}

pub use xds::*;
type_urls! {
"type.googleapis.com": {
CLUSTER_TYPE = "envoy.config.cluster.v3.Cluster",
ENDPOINT_TYPE = "envoy.config.endpoint.v3.ClusterLoadAssignment",
EXTENSION_CONFIG_TYPE = "envoy.config.core.v3.TypedExtensionConfig",
LISTENER_TYPE = "envoy.config.listener.v3.Listener",
ROUTE_TYPE = "envoy.config.route.v3.RouteConfiguration",
RUNTIME_TYPE = "envoy.service.runtime.v3.Runtime",
SCOPED_ROUTE_TYPE = "envoy.config.route.v3.ScopedRouteConfiguration",
SECRET_TYPE = "envoy.extensions.transport_sockets.tls.v3.Secret",
VIRTUAL_HOST_TYPE = "envoy.config.route.v3.VirtualHost",
}
}

pub(crate) mod ads_client;
mod cache;
pub(crate) mod cluster;
pub(crate) mod listener;
mod metrics;
pub mod provider;
mod resource;
pub(crate) mod server;

pub(crate) use ads_client::AdsClient;
pub use cache::Cache;
pub use provider::DiscoveryServiceProvider;
pub use resource::ResourceType;
pub use server::ControlPlane;
pub use service::discovery::v3::aggregated_discovery_service_client::AggregatedDiscoveryServiceClient;
pub use xds::*;

use service::discovery::v3::aggregated_discovery_service_server::AggregatedDiscoveryServiceServer;

pub async fn manage(
port: u16,
_admin_port: u16,
provider: std::sync::Arc<dyn DiscoveryServiceProvider>,
) -> crate::Result<()> {
let server = AggregatedDiscoveryServiceServer::new(ControlPlane::from_arc(provider));
let server = tonic::transport::Server::builder().add_service(server);
Ok(server
.serve((std::net::Ipv4Addr::UNSPECIFIED, port).into())
.await?)
}
2 changes: 1 addition & 1 deletion src/xds/ads_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl RpcSender {
&self,
node_id: String,
) -> Result<(), RpcSessionError> {
for resource_type in &[CLUSTER_TYPE, LISTENER_TYPE] {
for resource_type in &[ENDPOINT_TYPE, LISTENER_TYPE] {
let send_result = self
.send_discovery_request(DiscoveryRequest {
node: Some(Node {
Expand Down
44 changes: 44 additions & 0 deletions src/xds/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use cached::CachedAsync;
use tokio::sync::Mutex;

use crate::xds::{
service::discovery::v3::DiscoveryResponse, DiscoveryServiceProvider, ResourceType,
};

const CACHE_LIFESPAN_IN_SECONDS: u64 = 5;

/// A generic [DiscoveryServiceProvider] cache, that will cache any matching
/// request from the underlying provider for a limited duration.
pub struct Cache {
provider: Box<dyn DiscoveryServiceProvider>,
cache: Mutex<cached::TimedCache<(String, u64, ResourceType), DiscoveryResponse>>,
}

impl Cache {
pub fn new<P: DiscoveryServiceProvider + 'static>(provider: P) -> Self {
Self {
provider: Box::from(provider),
cache: Mutex::new(cached::TimedCache::with_lifespan(CACHE_LIFESPAN_IN_SECONDS)),
}
}
}

#[tonic::async_trait]
impl crate::xds::DiscoveryServiceProvider for Cache {
async fn discovery_request(
&self,
node_id: &str,
version: u64,
kind: ResourceType,
names: &[String],
) -> Result<DiscoveryResponse, tonic::Status> {
let mut lock = self.cache.lock().await;

lock.try_get_or_set_with((node_id.to_owned(), version, kind), || {
self.provider
.discovery_request(node_id, version, kind, names)
})
.await
.map(|response| response.clone())
}
}
18 changes: 18 additions & 0 deletions src/xds/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
mod agones;

pub use agones::AgonesProvider;

use crate::xds::{service::discovery::v3::DiscoveryResponse, ResourceType};

/// A trait over a discovery service provider responsible for returning
/// the [DiscoveryResponse]s. The type of resource returned is based on the [ResourceType].
#[tonic::async_trait]
pub trait DiscoveryServiceProvider: Send + Sync {
async fn discovery_request(
&self,
node_id: &str,
version: u64,
kind: ResourceType,
names: &[String],
) -> Result<DiscoveryResponse, tonic::Status>;
}
Loading

0 comments on commit cc35b7a

Please sign in to comment.