Skip to content

Commit

Permalink
pick unique port for rpc load balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Jul 31, 2024
1 parent 6601934 commit 9cbf62e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 27 deletions.
22 changes: 9 additions & 13 deletions src/k8s_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::docker::DockerImage,
crate::{docker::DockerImage, kubernetes::ServiceType},
k8s_openapi::{
api::{
apps::v1::{ReplicaSet, ReplicaSetSpec},
Expand Down Expand Up @@ -125,8 +125,12 @@ pub fn create_service(
service_name: String,
namespace: String,
label_selector: BTreeMap<String, String>,
is_load_balancer: bool,
service_type: ServiceType,
) -> Service {
let (type_, cluster_ip, node_port) = match service_type {
ServiceType::Standard => (None, Some("None".to_string()), None),
ServiceType::LoadBalancer(port) => (Some("LoadBalancer".to_string()), None, Some(port)),
};
Service {
metadata: ObjectMeta {
name: Some(service_name),
Expand All @@ -135,21 +139,13 @@ pub fn create_service(
},
spec: Some(ServiceSpec {
selector: Some(label_selector),
type_: if is_load_balancer {
Some("LoadBalancer".to_string())
} else {
None
},
cluster_ip: if is_load_balancer {
None
} else {
Some("None".to_string())
},
type_,
cluster_ip,
ports: Some(vec![
ServicePort {
port: 8899, // RPC Port
name: Some("rpc-port".to_string()),
node_port: if is_load_balancer { Some(30000) } else { None },
node_port,
..Default::default()
},
ServicePort {
Expand Down
86 changes: 76 additions & 10 deletions src/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ use {
apimachinery::pkg::api::resource::Quantity,
},
kube::{
api::{Api, ListParams, PostParams},
api::{Api, ListParams, ObjectList, PostParams},
Client,
},
log::*,
solana_sdk::pubkey::Pubkey,
std::{collections::BTreeMap, error::Error, path::Path},
std::{
collections::{BTreeMap, HashSet},
error::Error,
path::Path,
},
};

#[derive(Debug, Clone)]
Expand All @@ -41,6 +45,12 @@ impl PodRequests {
}
}

#[derive(Debug, PartialEq)]
pub enum ServiceType {
Standard,
LoadBalancer(/* External Port */ i32),
}

pub struct Kubernetes<'a> {
k8s_client: Client,
namespace: String,
Expand Down Expand Up @@ -79,10 +89,14 @@ impl<'a> Kubernetes<'a> {
self.validator_config.shred_version = Some(shred_version);
}

pub async fn namespace_exists(&self) -> Result<bool, kube::Error> {
async fn get_namespaces(&self) -> Result<ObjectList<Namespace>, kube::Error> {
let namespaces: Api<Namespace> = Api::all(self.k8s_client.clone());
let namespace_list = namespaces.list(&ListParams::default()).await?;
Ok(namespace_list)
}

pub async fn namespace_exists(&self) -> Result<bool, kube::Error> {
let namespace_list = self.get_namespaces().await?;
let exists = namespace_list
.items
.iter()
Expand Down Expand Up @@ -383,7 +397,7 @@ impl<'a> Kubernetes<'a> {
service_name.to_string(),
self.namespace.clone(),
label_selector.clone(),
false,
ServiceType::Standard,
)
}

Expand All @@ -397,7 +411,7 @@ impl<'a> Kubernetes<'a> {
format!("{}-{}-{}", service_name, self.deployment_tag, index),
self.namespace.clone(),
label_selector.clone(),
false,
ServiceType::Standard,
)
}

Expand All @@ -411,17 +425,20 @@ impl<'a> Kubernetes<'a> {
service_api.create(&post_params, service).await
}

pub fn create_validator_load_balancer(
pub async fn create_validator_load_balancer(
&self,
service_name: &str,
label_selector: &BTreeMap<String, String>,
) -> Service {
k8s_helpers::create_service(
) -> Result<Service, Box<dyn Error>> {
let node_port = self.get_open_external_port_for_rpc_service().await?;
info!("Deploying Load Balancer Service with external port: {node_port}");

Ok(k8s_helpers::create_service(
service_name.to_string(),
self.namespace.clone(),
label_selector.clone(),
true,
)
ServiceType::LoadBalancer(node_port),
))
}

pub async fn is_replica_set_ready(&self, replica_set_name: &str) -> Result<bool, kube::Error> {
Expand Down Expand Up @@ -758,4 +775,53 @@ impl<'a> Kubernetes<'a> {
None,
)
}

async fn get_open_external_port_for_rpc_service(&self) -> Result<i32, Box<dyn Error>> {
let used_ports = self.get_all_used_ports().await?;

// This Node Port range is standard for kubernetes
const MIN_NODE_PORT: i32 = 30000;
const MAX_NODE_PORT: i32 = 32767;
// Find an available NodePort
let mut available_port = MIN_NODE_PORT;
while used_ports.contains(&available_port) {
available_port += 1;
}
if available_port > MAX_NODE_PORT {
return Err(format!(
"No available NodePort found in the range {MIN_NODE_PORT}-{MAX_NODE_PORT}"
)
.into());
}

Ok(available_port)
}

async fn get_all_used_ports(&self) -> Result<HashSet<i32>, kube::Error> {
let mut used_ports = HashSet::new();
let namespaces = self.get_namespaces().await?;
let namespaces: Vec<String> = namespaces
.items
.into_iter()
.filter_map(|ns| ns.metadata.name)
.collect();

// Iterate over namespaces to collect used NodePorts
for ns in namespaces {
let services: Api<Service> = Api::namespaced(self.k8s_client.clone(), &ns);
let service_list = services.list(&Default::default()).await?;
for svc in service_list {
if let Some(spec) = svc.spec {
if let Some(ports) = spec.ports {
for port in ports {
if let Some(node_port) = port.node_port {
used_ports.insert(node_port);
}
}
}
}
}
}
Ok(used_ports)
}
}
10 changes: 6 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,10 +755,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let load_balancer_label =
kub_controller.create_selector("load-balancer/name", "load-balancer-selector");
//create load balancer
let load_balancer = kub_controller.create_validator_load_balancer(
"bootstrap-and-rpc-node-lb-service",
&load_balancer_label,
);
let load_balancer = kub_controller
.create_validator_load_balancer(
"bootstrap-and-rpc-node-lb-service",
&load_balancer_label,
)
.await?;

//deploy load balancer
kub_controller.deploy_service(&load_balancer).await?;
Expand Down

0 comments on commit 9cbf62e

Please sign in to comment.