diff --git a/src/k8s_helpers.rs b/src/k8s_helpers.rs index c7f2598..4755462 100644 --- a/src/k8s_helpers.rs +++ b/src/k8s_helpers.rs @@ -1,5 +1,5 @@ use { - crate::docker::DockerImage, + crate::{docker::DockerImage, kubernetes::ServiceType}, k8s_openapi::{ api::{ apps::v1::{ReplicaSet, ReplicaSetSpec}, @@ -125,8 +125,12 @@ pub fn create_service( service_name: String, namespace: String, label_selector: BTreeMap, - is_load_balancer: bool, + service_type: ServiceType, ) -> Service { + let (type_, cluster_ip, node_port) = match service_type { + ServiceType::Standard => (None, Some("None".to_string()), None), + ServiceType::LoadBalancer(port) => (Some("LoadBalancer".to_string()), None, Some(port)), + }; Service { metadata: ObjectMeta { name: Some(service_name), @@ -135,21 +139,13 @@ pub fn create_service( }, spec: Some(ServiceSpec { selector: Some(label_selector), - type_: if is_load_balancer { - Some("LoadBalancer".to_string()) - } else { - None - }, - cluster_ip: if is_load_balancer { - None - } else { - Some("None".to_string()) - }, + type_, + cluster_ip, ports: Some(vec![ ServicePort { port: 8899, // RPC Port name: Some("rpc-port".to_string()), - node_port: if is_load_balancer { Some(30000) } else { None }, + node_port, ..Default::default() }, ServicePort { diff --git a/src/kubernetes.rs b/src/kubernetes.rs index 8f8a9e6..3dea15c 100644 --- a/src/kubernetes.rs +++ b/src/kubernetes.rs @@ -17,12 +17,16 @@ use { apimachinery::pkg::api::resource::Quantity, }, kube::{ - api::{Api, ListParams, PostParams}, + api::{Api, ListParams, ObjectList, PostParams}, Client, }, log::*, solana_sdk::pubkey::Pubkey, - std::{collections::BTreeMap, error::Error, path::Path}, + std::{ + collections::{BTreeMap, HashSet}, + error::Error, + path::Path, + }, }; #[derive(Debug, Clone)] @@ -41,6 +45,12 @@ impl PodRequests { } } +#[derive(Debug, PartialEq)] +pub enum ServiceType { + Standard, + LoadBalancer(/* External Port */ i32), +} + pub struct Kubernetes<'a> { k8s_client: Client, namespace: String, @@ -79,10 +89,14 @@ impl<'a> Kubernetes<'a> { self.validator_config.shred_version = Some(shred_version); } - pub async fn namespace_exists(&self) -> Result { + async fn get_namespaces(&self) -> Result, kube::Error> { let namespaces: Api = Api::all(self.k8s_client.clone()); let namespace_list = namespaces.list(&ListParams::default()).await?; + Ok(namespace_list) + } + pub async fn namespace_exists(&self) -> Result { + let namespace_list = self.get_namespaces().await?; let exists = namespace_list .items .iter() @@ -383,7 +397,7 @@ impl<'a> Kubernetes<'a> { service_name.to_string(), self.namespace.clone(), label_selector.clone(), - false, + ServiceType::Standard, ) } @@ -397,7 +411,7 @@ impl<'a> Kubernetes<'a> { format!("{}-{}-{}", service_name, self.deployment_tag, index), self.namespace.clone(), label_selector.clone(), - false, + ServiceType::Standard, ) } @@ -411,17 +425,20 @@ impl<'a> Kubernetes<'a> { service_api.create(&post_params, service).await } - pub fn create_validator_load_balancer( + pub async fn create_validator_load_balancer( &self, service_name: &str, label_selector: &BTreeMap, - ) -> Service { - k8s_helpers::create_service( + ) -> Result> { + let node_port = self.get_open_external_port_for_rpc_service().await?; + info!("Deploying Load Balancer Service with external port: {node_port}"); + + Ok(k8s_helpers::create_service( service_name.to_string(), self.namespace.clone(), label_selector.clone(), - true, - ) + ServiceType::LoadBalancer(node_port), + )) } pub async fn is_replica_set_ready(&self, replica_set_name: &str) -> Result { @@ -758,4 +775,53 @@ impl<'a> Kubernetes<'a> { None, ) } + + async fn get_open_external_port_for_rpc_service(&self) -> Result> { + let used_ports = self.get_all_used_ports().await?; + + // This Node Port range is standard for kubernetes + const MIN_NODE_PORT: i32 = 30000; + const MAX_NODE_PORT: i32 = 32767; + // Find an available NodePort + let mut available_port = MIN_NODE_PORT; + while used_ports.contains(&available_port) { + available_port += 1; + } + if available_port > MAX_NODE_PORT { + return Err(format!( + "No available NodePort found in the range {MIN_NODE_PORT}-{MAX_NODE_PORT}" + ) + .into()); + } + + Ok(available_port) + } + + async fn get_all_used_ports(&self) -> Result, kube::Error> { + let mut used_ports = HashSet::new(); + let namespaces = self.get_namespaces().await?; + let namespaces: Vec = namespaces + .items + .into_iter() + .filter_map(|ns| ns.metadata.name) + .collect(); + + // Iterate over namespaces to collect used NodePorts + for ns in namespaces { + let services: Api = Api::namespaced(self.k8s_client.clone(), &ns); + let service_list = services.list(&Default::default()).await?; + for svc in service_list { + if let Some(spec) = svc.spec { + if let Some(ports) = spec.ports { + for port in ports { + if let Some(node_port) = port.node_port { + used_ports.insert(node_port); + } + } + } + } + } + } + Ok(used_ports) + } } diff --git a/src/main.rs b/src/main.rs index 72e42ca..e86b515 100644 --- a/src/main.rs +++ b/src/main.rs @@ -755,10 +755,12 @@ async fn main() -> Result<(), Box> { let load_balancer_label = kub_controller.create_selector("load-balancer/name", "load-balancer-selector"); //create load balancer - let load_balancer = kub_controller.create_validator_load_balancer( - "bootstrap-and-rpc-node-lb-service", - &load_balancer_label, - ); + let load_balancer = kub_controller + .create_validator_load_balancer( + "bootstrap-and-rpc-node-lb-service", + &load_balancer_label, + ) + .await?; //deploy load balancer kub_controller.deploy_service(&load_balancer).await?;