Skip to content

Commit

Permalink
change cpu core into an enum
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang2821 committed Aug 25, 2024
1 parent 6336ed5 commit 8b1d3da
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 87 deletions.
17 changes: 14 additions & 3 deletions lapdev-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ pub struct MachineType {
pub cost_per_second: usize,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
#[serde(untagged)]
pub enum CpuCore {
Shared(usize),
Dedicated(Vec<usize>),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CreateMachineType {
pub name: String,
Expand Down Expand Up @@ -159,7 +166,7 @@ pub struct RepoBuildInfo {
pub repo_name: String,
pub env: Vec<(String, String)>,
// the cpu cores that this build will use
pub cpus: Vec<usize>,
pub cpus: CpuCore,
// the memory limit that this build will use
pub memory: usize,
}
Expand Down Expand Up @@ -341,7 +348,7 @@ pub struct CreateWorkspaceRequest {
pub ssh_public_key: String,
pub repo_name: String,
pub env: Vec<(String, String)>,
pub cpus: Vec<usize>,
pub cpus: CpuCore,
pub memory: usize,
pub disk: usize,
}
Expand Down Expand Up @@ -440,8 +447,12 @@ pub struct NewContainerHostConfig {
pub publish_all_ports: bool,
#[serde(rename = "Binds")]
pub binds: Vec<String>,
#[serde(rename = "CpuPeriod")]
pub cpu_period: Option<i64>,
#[serde(rename = "CpuQuota")]
pub cpu_quota: Option<i64>,
#[serde(rename = "CpusetCpus")]
pub cpuset_cpus: String,
pub cpuset_cpus: Option<String>,
#[serde(rename = "Memory")]
pub memory: usize,
#[serde(rename = "NetworkMode")]
Expand Down
38 changes: 21 additions & 17 deletions lapdev-conductor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};

use anyhow::Result;
use itertools::Itertools;
use lapdev_common::{PrebuildStatus, WorkspaceHostStatus};
use lapdev_common::{CpuCore, PrebuildStatus, WorkspaceHostStatus};
use lapdev_db::{entities, links};
use sea_orm::{
ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter,
Expand Down Expand Up @@ -56,7 +56,7 @@ pub async fn pick_workspce_host(
disk: usize,
region: String,
cpu_overcommit: usize,
) -> Result<(entities::workspace_host::Model, Vec<usize>)> {
) -> Result<(entities::workspace_host::Model, CpuCore)> {
let workspace_host = decide_workspace_host(
txn,
prebuild_workspace_host,
Expand All @@ -71,18 +71,18 @@ pub async fn pick_workspce_host(

let existing = get_existing_resource(txn, &workspace_host).await?;

let available = available_cores(workspace_host.cpu as usize, cpu_overcommit, &existing.cores);
let cores: Vec<usize> = if !shared {
let cores = if !shared {
let available =
available_cores(workspace_host.cpu as usize, cpu_overcommit, &existing.cores);
let available = available.dedicated();
available.into_iter().take(cpu).collect()
let cores: Vec<usize> = available.into_iter().take(cpu).collect();
if cores.len() != cpu {
return Err(anyhow::anyhow!("can't allocate the number of cpus"));
}
CpuCore::Dedicated(cores)
} else {
let available = available.sorted();
tracing::debug!("available cores are {available:?}");
available.into_iter().map(|(k, _)| k).take(cpu).collect()
CpuCore::Shared(cpu)
};
if cores.len() != cpu {
return Err(anyhow::anyhow!("can't allocate the number of cpus"));
}

Ok((workspace_host, cores))
}
Expand Down Expand Up @@ -218,9 +218,11 @@ async fn get_existing_resource(

for (workspace, machine_type) in models {
if let Some(machine_type) = machine_type {
let cores: Vec<usize> = serde_json::from_str(&workspace.cores)?;
let cores: HashSet<usize> = HashSet::from_iter(cores.into_iter());
existing.cores.push((cores, machine_type.shared));
let cores: CpuCore = serde_json::from_str(&workspace.cores)?;
if let CpuCore::Dedicated(cores) = cores {
let cores: HashSet<usize> = HashSet::from_iter(cores.into_iter());
existing.cores.push((cores, false));
}
existing.memory += machine_type.memory.max(0) as usize;
existing.disk += machine_type.disk.max(0) as usize;
}
Expand All @@ -236,9 +238,11 @@ async fn get_existing_resource(
.await?;
for (prebuild, machine_type) in result {
if let Some(machine_type) = machine_type {
let cores: Vec<usize> = serde_json::from_str(&prebuild.cores)?;
let cores: HashSet<usize> = HashSet::from_iter(cores.into_iter());
existing.cores.push((cores, machine_type.shared));
let cores: CpuCore = serde_json::from_str(&prebuild.cores)?;
if let CpuCore::Dedicated(cores) = cores {
let cores: HashSet<usize> = HashSet::from_iter(cores.into_iter());
existing.cores.push((cores, false));
}
existing.memory += machine_type.memory.max(0) as usize;
existing.disk += machine_type.disk.max(0) as usize;
}
Expand Down
18 changes: 12 additions & 6 deletions lapdev-conductor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use data_encoding::BASE64_MIME;
use futures::{channel::mpsc::UnboundedReceiver, stream::AbortHandle, SinkExt, StreamExt};
use git2::{Cred, FetchOptions, FetchPrune, RemoteCallbacks, Repository};
use lapdev_common::{
utils::rand_string, AuditAction, AuditResourceKind, AuthProvider, BuildTarget,
utils::rand_string, AuditAction, AuditResourceKind, AuthProvider, BuildTarget, CpuCore,
CreateWorkspaceRequest, DeleteWorkspaceRequest, GitBranch, NewProject, NewProjectResponse,
NewWorkspace, NewWorkspaceResponse, PrebuildInfo, PrebuildStatus, PrebuildUpdateEvent,
RepoBuildInfo, RepoBuildOutput, RepoContent, RepoContentPosition, RepoSource,
Expand All @@ -38,7 +38,6 @@ use sqlx::postgres::PgNotification;
use tarpc::{
context,
server::{BaseChannel, Channel},
tokio_serde::formats::Bincode,
};
use tokio::{
io::AsyncReadExt,
Expand Down Expand Up @@ -401,7 +400,11 @@ impl Conductor {

async fn connect_workspace_host_once(&self, id: Uuid, host: &str, port: u16) -> Result<()> {
tracing::debug!("start to connect to workspace host {host}:{port}");
let conn = tarpc::serde_transport::tcp::connect((host, port), Bincode::default).await?;
let conn = tarpc::serde_transport::tcp::connect(
(host, port),
tarpc::tokio_serde::formats::Json::default,
)
.await?;
let (server_chan, client_chan, abort_handle) = spawn_twoway(conn);
let ws_client =
WorkspaceServiceClient::new(tarpc::client::Config::default(), client_chan).spawn();
Expand Down Expand Up @@ -907,7 +910,7 @@ impl Conductor {
let (host_id, cores, host) = if let Some(ws) = ws {
// if the prebuild initiated by creating a workspace
// we'll use the resource of the workspace
let cores: Vec<usize> = serde_json::from_str(&ws.cores)?;
let cores: CpuCore = serde_json::from_str(&ws.cores)?;
(ws.host_id, cores, None)
} else {
let shared_cpu = if self.enterprise.has_valid_license().await {
Expand Down Expand Up @@ -1168,7 +1171,10 @@ impl Conductor {
self.cpu_overcommit().await,
)
.await
.map_err(|_| ApiError::NoAvailableWorkspaceHost)?;
.map_err(|e| {
tracing::error!("pick workspace host error: {e}");
ApiError::NoAvailableWorkspaceHost
})?;
let workspace_id = Uuid::new_v4();
let now = Utc::now();
let ws = entities::workspace::ActiveModel {
Expand Down Expand Up @@ -1873,7 +1879,7 @@ impl Conductor {

let build_output = serde_json::to_string(&output)?;

let cores: Vec<usize> = serde_json::from_str(&ws.cores)?;
let cores: CpuCore = serde_json::from_str(&ws.cores)?;
for (i, (service, tag, image_env)) in images.into_iter().enumerate() {
let workspace_name = if let Some(service) = service.clone() {
if i > 0 {
Expand Down
34 changes: 1 addition & 33 deletions lapdev-ws/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,4 @@
#[tokio::main]
pub async fn main() {
let _result = setup_log().await;

if let Err(e) = lapdev_ws::server::run().await {
tracing::error!("lapdev-ws server run error: {e:#}");
}
}

async fn setup_log() -> Result<tracing_appender::non_blocking::WorkerGuard, anyhow::Error> {
let folder = "/var/lib/lapdev/logs";
if !tokio::fs::try_exists(folder).await.unwrap_or(false) {
tokio::fs::create_dir_all(folder).await?;
let _ = tokio::process::Command::new("chown")
.arg("-R")
.arg("/var/lib/lapdev/")
.output()
.await;
}
let file_appender = tracing_appender::rolling::Builder::new()
.max_log_files(30)
.rotation(tracing_appender::rolling::Rotation::DAILY)
.filename_prefix("lapdev-ws.log")
.build(folder)?;
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
let filter = tracing_subscriber::EnvFilter::default()
.add_directive("lapdev_ws=info".parse()?)
.add_directive("lapdev_rpc=info".parse()?)
.add_directive("lapdev_common=info".parse()?);
tracing_subscriber::fmt()
.with_ansi(false)
.with_env_filter(filter)
.with_writer(non_blocking)
.init();
Ok(guard)
lapdev_ws::server::start().await;
}
99 changes: 82 additions & 17 deletions lapdev-ws/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use lapdev_common::{
devcontainer::{
DevContainerCmd, DevContainerConfig, DevContainerCwd, DevContainerLifeCycleCmd,
},
BuildTarget, ContainerImageInfo, RepoBuildInfo, RepoBuildOutput, RepoComposeService,
BuildTarget, ContainerImageInfo, CpuCore, RepoBuildInfo, RepoBuildOutput, RepoComposeService,
};
use lapdev_rpc::{
error::ApiError, spawn_twoway, ConductorServiceClient, InterWorkspaceService, WorkspaceService,
Expand All @@ -28,7 +28,7 @@ use serde::Deserialize;
use tarpc::{
context::current,
server::{BaseChannel, Channel},
tokio_serde::formats::Bincode,
tokio_serde::formats::Json,
};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
Expand Down Expand Up @@ -58,6 +58,9 @@ struct Cli {
/// The config file path
#[clap(short, long, action, value_hint = clap::ValueHint::AnyPath)]
config_file: Option<PathBuf>,
/// The folder for putting data
#[clap(short, long, action, value_hint = clap::ValueHint::AnyPath)]
data_folder: Option<PathBuf>,
}

#[derive(Clone)]
Expand All @@ -71,10 +74,24 @@ impl Default for WorkspaceServer {
}
}

pub async fn run() -> Result<()> {
pub async fn start() {
let cli = Cli::parse();
let data_folder = cli
.data_folder
.clone()
.unwrap_or_else(|| PathBuf::from("/var/lib/lapdev"));

let _result = setup_log(&data_folder).await;

if let Err(e) = run(&cli).await {
tracing::error!("lapdev-ws server run error: {e:#}");
}
}

async fn run(cli: &Cli) -> Result<()> {
let config_file = cli
.config_file
.clone()
.unwrap_or_else(|| PathBuf::from("/etc/lapdev-ws.conf"));
let config_content = tokio::fs::read_to_string(&config_file)
.await
Expand Down Expand Up @@ -108,7 +125,7 @@ impl WorkspaceServer {
}

let mut listener =
tarpc::serde_transport::tcp::listen((bind, ws_port), Bincode::default).await?;
tarpc::serde_transport::tcp::listen((bind, ws_port), Json::default).await?;

{
let server = self.clone();
Expand Down Expand Up @@ -224,7 +241,7 @@ impl WorkspaceServer {

async fn run_inter_ws_service(&self, bind: &str, inter_ws_port: u16) -> Result<()> {
let mut listener =
tarpc::serde_transport::tcp::listen((bind, inter_ws_port), Bincode::default).await?;
tarpc::serde_transport::tcp::listen((bind, inter_ws_port), Json::default).await?;
listener.config_mut().max_frame_length(usize::MAX);
listener
// Ignore accept errors.
Expand Down Expand Up @@ -413,13 +430,22 @@ impl WorkspaceServer {
.arg(&info.osuser)
.arg("-c")
.arg(format!(
"cd {} && podman build --no-cache {build_args} --cpuset-cpus {} -m {}g -f {} -t {tag} {}",
"cd {} && podman build --no-cache {build_args} {} -m {}g -f {} -t {tag} {}",
cwd.to_string_lossy(),
info.cpus
.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
.join(","),
match &info.cpus {
CpuCore::Dedicated(cpus) => {
format!(
"--cpuset-cpus {}",
cpus.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
.join(",")
)
}
CpuCore::Shared(n) => {
format!("--cpus {n}")
}
},
info.memory,
temp.to_string_lossy(),
context.to_string_lossy(),
Expand Down Expand Up @@ -777,12 +803,21 @@ impl WorkspaceServer {
.arg(&info.osuser)
.arg("-c")
.arg(format!(
"podman run --rm --cpuset-cpus {} -m {}g --security-opt label=disable -v {repo_folder}:/workspace -w /workspace --user root --entrypoint \"\" {image} {cmd}",
info.cpus
.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
.join(","),
"podman run --rm {} -m {}g --security-opt label=disable -v {repo_folder}:/workspace -w /workspace --user root --entrypoint \"\" {image} {cmd}",
match &info.cpus {
CpuCore::Dedicated(cpus) => {
format!(
"--cpuset-cpus {}",
cpus.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
.join(",")
)
}
CpuCore::Shared(n) => {
format!("--cpus {n}")
}
},
info.memory,
))
.stdout(Stdio::piped())
Expand Down Expand Up @@ -918,3 +953,33 @@ pub fn unix_client(
) -> hyper_util::client::legacy::Client<UnixConnector, http_body_util::Full<hyper::body::Bytes>> {
hyper_util::client::legacy::Client::unix()
}

async fn setup_log(
data_folder: &Path,
) -> Result<tracing_appender::non_blocking::WorkerGuard, anyhow::Error> {
let folder = data_folder.join("logs");
if !tokio::fs::try_exists(&folder).await.unwrap_or(false) {
tokio::fs::create_dir_all(&folder).await?;
let _ = tokio::process::Command::new("chown")
.arg("-R")
.arg("/var/lib/lapdev/")
.output()
.await;
}
let file_appender = tracing_appender::rolling::Builder::new()
.max_log_files(30)
.rotation(tracing_appender::rolling::Rotation::DAILY)
.filename_prefix("lapdev-ws.log")
.build(folder)?;
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
let filter = tracing_subscriber::EnvFilter::default()
.add_directive("lapdev_ws=info".parse()?)
.add_directive("lapdev_rpc=info".parse()?)
.add_directive("lapdev_common=info".parse()?);
tracing_subscriber::fmt()
.with_ansi(false)
.with_env_filter(filter)
.with_writer(non_blocking)
.init();
Ok(guard)
}
Loading

0 comments on commit 8b1d3da

Please sign in to comment.