From 85d285d13dc2bde71c3599d0fca876fd27ab1ece Mon Sep 17 00:00:00 2001 From: Lu Yang Date: Wed, 18 Sep 2024 22:43:22 +0100 Subject: [PATCH] update russh --- Cargo.lock | 85 ++++++++--------------------- Cargo.toml | 2 +- lapdev-api/src/server.rs | 19 +++---- lapdev-conductor/src/server.rs | 4 ++ lapdev-proxy-ssh/src/client.rs | 3 +- lapdev-proxy-ssh/src/server.rs | 11 ++-- lapdev-ws/Cargo.toml | 1 + lapdev-ws/src/server.rs | 60 ++++++--------------- lapdev-ws/src/watcher.rs | 98 ++++++++++++++++++++++++++++++++-- 9 files changed, 154 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96ed774..a937a94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1037,6 +1037,15 @@ dependencies = [ "syn 1.0.105", ] +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + [[package]] name = "digest" version = "0.10.7" @@ -1049,27 +1058,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "5.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" -dependencies = [ - "dirs-sys", -] - -[[package]] -name = "dirs-sys" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" -dependencies = [ - "libc", - "option-ext", - "redox_users", - "windows-sys 0.48.0", -] - [[package]] name = "docker-compose-types" version = "0.7.0" @@ -2378,6 +2366,7 @@ dependencies = [ "lapdev-rpc", "netstat2", "notify", + "parking_lot", "serde", "serde_json", "serde_yaml", @@ -2601,17 +2590,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libredox" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" -dependencies = [ - "bitflags 2.4.1", - "libc", - "redox_syscall 0.4.1", -] - [[package]] name = "libsqlite3-sys" version = "0.27.0" @@ -2677,12 +2655,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.17" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "manyhow" @@ -2740,9 +2715,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "memchr" -version = "2.5.0" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "mime" @@ -3082,12 +3057,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "option-ext" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" - [[package]] name = "ordered-float" version = "3.9.2" @@ -3184,9 +3153,9 @@ checksum = "56d80efc4b6721e8be2a10a5df21a30fa0b470f1539e53d8b4e6e75faf938b63" [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", "parking_lot_core", @@ -3629,17 +3598,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "redox_users" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4" -dependencies = [ - "getrandom", - "libredox", - "thiserror", -] - [[package]] name = "regex" version = "1.9.4" @@ -3825,9 +3783,9 @@ dependencies = [ [[package]] name = "russh" -version = "0.44.1" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6500eedfaf8cd81597899d896908a4b9cd5cb566db875e843c04ccf92add2c16" +checksum = "0a229f2a03daea3f62cee897b40329ce548600cca615906d98d58b8db3029b19" dependencies = [ "aes", "aes-gcm", @@ -3838,6 +3796,7 @@ dependencies = [ "chacha20", "ctr", "curve25519-dalek", + "des", "digest", "elliptic-curve", "flate2", @@ -3878,9 +3837,9 @@ dependencies = [ [[package]] name = "russh-keys" -version = "0.44.0" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb8c0bfe024d4edd242f65a2ac6c8bf38a892930050b9eb90909d8fc2c413c8d" +checksum = "89757474f7c9ee30121d8cc7fe293a954ba10b204a82ccf5850a5352a532ebc7" dependencies = [ "aes", "async-trait", @@ -3892,12 +3851,12 @@ dependencies = [ "data-encoding", "der", "digest", - "dirs", "ecdsa", "ed25519-dalek", "elliptic-curve", "futures", "hmac", + "home", "inout", "log", "md5", diff --git a/Cargo.toml b/Cargo.toml index 0bd0705..7320c37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ sqlx = { version = "0.7.3", default-features = false, features = ["sqlx-postgres sea-orm = { version = "0.12.12", features = [ "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-rustls", "macros" ] } sea-orm-migration = { version = "0.12.6", features = [ "sqlx-postgres", "runtime-tokio-rustls" ] } uuid = { version = "1.6.1", features = ["v4", "serde"] } -russh = {version = "0.44.1", features = ["vendored-openssl"] } +russh = {version = "0.45.0", features = ["vendored-openssl"] } lapdev-api = { path = "./lapdev-api" } lapdev-ws = { path = "./lapdev-ws" } lapdev-db = { path = "./lapdev-db" } diff --git a/lapdev-api/src/server.rs b/lapdev-api/src/server.rs index 42ceaf1..fe0dafe 100644 --- a/lapdev-api/src/server.rs +++ b/lapdev-api/src/server.rs @@ -15,7 +15,8 @@ use serde::Deserialize; use tokio::net::TcpListener; use tokio_rustls::TlsAcceptor; use tower::Service; -use tracing::error; +use tracing::{error, instrument::WithSubscriber}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; use crate::{cert::tls_config, router, state::CoreState}; @@ -96,7 +97,7 @@ async fn run( ) .await { - error!("ssh proxy error: {e}"); + error!("ssh proxy error: {e:?}"); } }); } @@ -191,16 +192,10 @@ async fn setup_log( .filename_prefix("lapdev.log") .build(folder)?; let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); - let filter = tracing_subscriber::EnvFilter::default() - .add_directive("lapdev=info".parse()?) - .add_directive("lapdev_api=info".parse()?) - .add_directive("lapdev_conductor=info".parse()?) - .add_directive("lapdev_rpc=info".parse()?) - .add_directive("lapdev_common=info".parse()?) - .add_directive("lapdev_db=info".parse()?) - .add_directive("lapdev_enterprise=info".parse()?) - .add_directive("lapdev_proxy_ssh=info".parse()?) - .add_directive("lapdev_proxy_http=info".parse()?); + let var = std::env::var("RUST_LOG").unwrap_or_default(); + let var = + format!("error,lapdev=info,lapdev_api=info,lapdev_conductor=info,lapdev_rpc=info,lapdev_common=info,lapdev_db=info,lapdev_enterprise=info,lapdev_proxy_ssh=info,lapdev_proxy_http=info,{var}"); + let filter = tracing_subscriber::EnvFilter::builder().parse_lossy(var); tracing_subscriber::fmt() .with_ansi(false) .with_env_filter(filter) diff --git a/lapdev-conductor/src/server.rs b/lapdev-conductor/src/server.rs index 8c4c099..bfe065d 100644 --- a/lapdev-conductor/src/server.rs +++ b/lapdev-conductor/src/server.rs @@ -605,6 +605,10 @@ impl Conductor { .split('/') .last() .ok_or_else(|| ApiError::RepositoryInvalid("invalid repo path".to_string()))?; + let path = path + .split('?') + .next() + .ok_or_else(|| ApiError::RepositoryInvalid("invalid repo path".to_string()))?; let repo_name = path.strip_suffix(".git").unwrap_or(path).to_string(); Ok(RepoDetails { diff --git a/lapdev-proxy-ssh/src/client.rs b/lapdev-proxy-ssh/src/client.rs index 4db57da..3879189 100644 --- a/lapdev-proxy-ssh/src/client.rs +++ b/lapdev-proxy-ssh/src/client.rs @@ -27,7 +27,8 @@ impl russh::client::Handler for SshProxyClient { impl ClientSession { pub async fn connect(addr: &str, key: &KeyPair) -> Result { let config = russh::client::Config { - inactivity_timeout: Some(std::time::Duration::from_secs(5)), + inactivity_timeout: Some(std::time::Duration::from_secs(30)), + keepalive_interval: Some(std::time::Duration::from_secs(10)), ..<_>::default() }; let config = Arc::new(config); diff --git a/lapdev-proxy-ssh/src/server.rs b/lapdev-proxy-ssh/src/server.rs index 0c80705..ca8e767 100644 --- a/lapdev-proxy-ssh/src/server.rs +++ b/lapdev-proxy-ssh/src/server.rs @@ -1,13 +1,15 @@ use std::{borrow::Cow, sync::Arc}; -use anyhow::Result; +use anyhow::{Context, Result}; use lapdev_conductor::Conductor; use russh::{server::Server, MethodSet, Preferred}; use crate::{key::host_keys, proxy::SshProxy}; pub async fn run(conductor: Conductor, bind: &str, port: u16) -> Result<()> { - let keys = host_keys(&conductor.db).await?; + let keys = host_keys(&conductor.db) + .await + .with_context(|| "when get host keys")?; let config = russh::server::Config { inactivity_timeout: Some(std::time::Duration::from_secs(3600)), @@ -32,7 +34,10 @@ pub async fn run(conductor: Conductor, bind: &str, port: u16) -> Result<()> { db: conductor.db.clone(), conductor: Arc::new(conductor), }; - proxy.run_on_address(config, (bind, port)).await?; + proxy + .run_on_address(config, (bind, port)) + .await + .with_context(|| "when run proxy on address")?; Ok(()) } diff --git a/lapdev-ws/Cargo.toml b/lapdev-ws/Cargo.toml index bb9787e..81a445e 100644 --- a/lapdev-ws/Cargo.toml +++ b/lapdev-ws/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true description.workspace = true [dependencies] +parking_lot = "0.12.3" crossbeam-channel = "0.5.13" notify = "6.1.1" git2.workspace = true diff --git a/lapdev-ws/src/server.rs b/lapdev-ws/src/server.rs index 5f61617..c0e571d 100644 --- a/lapdev-ws/src/server.rs +++ b/lapdev-ws/src/server.rs @@ -54,6 +54,7 @@ struct LapdevWsConfig { bind: Option, ws_port: Option, inter_ws_port: Option, + backup_host: Option, } #[derive(Parser)] @@ -105,7 +106,7 @@ async fn run(cli: &Cli) -> Result<()> { let ws_port = config.ws_port.unwrap_or(6123); let inter_ws_port = config.inter_ws_port.unwrap_or(6122); WorkspaceServer::new(cli.data_folder()) - .run(bind, ws_port, inter_ws_port) + .run(bind, ws_port, inter_ws_port, config.backup_host) .await } @@ -117,7 +118,13 @@ impl WorkspaceServer { } } - async fn run(&self, bind: &str, ws_port: u16, inter_ws_port: u16) -> Result<()> { + async fn run( + &self, + bind: &str, + ws_port: u16, + inter_ws_port: u16, + backup_host: Option, + ) -> Result<()> { { Command::new("mkdir") .arg("-p") @@ -130,7 +137,7 @@ impl WorkspaceServer { .status() .await?; - let watcher = FileWatcher::new(&self.data_folder)?; + let watcher = FileWatcher::new(&self.data_folder, backup_host)?; std::thread::spawn(move || { watcher.watch(); }); @@ -372,38 +379,6 @@ impl WorkspaceServer { .wait() .await?; - let containers_config_folder = format!("/home/{username}/.config/containers"); - Command::new("su") - .arg("-") - .arg(username) - .arg("-c") - .arg(format!("mkdir -p {containers_config_folder}")) - .spawn()? - .wait() - .await?; - tokio::fs::write( - format!("{containers_config_folder}/registries.conf"), - r#" -unqualified-search-registries = ["docker.io"] -"#, - ) - .await?; - tokio::fs::write( - format!("{containers_config_folder}/storage.conf"), - r#" -[storage] -driver = "overlay" -"#, - ) - .await?; - - Command::new("chown") - .arg("-R") - .arg(format!("{username}:{username}")) - .arg(&containers_config_folder) - .output() - .await?; - let uid = self._os_user_uid(username).await?; Command::new("loginctl") @@ -974,11 +949,9 @@ driver = "overlay" self.os_user_uid(&info.osuser).await?; let build_dir = self.build_base_folder(info); let repo_dir = format!("{build_dir}/{}", info.repo_name); - Command::new("su") - .arg("-") - .arg(&info.osuser) - .arg("-c") - .arg(format!("mkdir -p {repo_dir}")) + Command::new("mkdir") + .arg("-p") + .arg(&repo_dir) .spawn()? .wait() .await?; @@ -1281,10 +1254,9 @@ async fn setup_log( .filename_prefix("lapdev-ws.log") .build(folder)?; let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); - let filter = tracing_subscriber::EnvFilter::default() - .add_directive("lapdev_ws=info".parse()?) - .add_directive("lapdev_rpc=info".parse()?) - .add_directive("lapdev_common=info".parse()?); + let var = std::env::var("RUST_LOG").unwrap_or_default(); + let var = format!("error,lapdev_ws=info,lapdev_rpc=info,lapdev_common=info,{var}"); + let filter = tracing_subscriber::EnvFilter::builder().parse_lossy(var); tracing_subscriber::fmt() .with_ansi(false) .with_env_filter(filter) diff --git a/lapdev-ws/src/watcher.rs b/lapdev-ws/src/watcher.rs index 3d46d43..c6942f1 100644 --- a/lapdev-ws/src/watcher.rs +++ b/lapdev-ws/src/watcher.rs @@ -1,22 +1,88 @@ -use std::path::Path; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, Instant}, +}; use anyhow::Result; use crossbeam_channel::{unbounded, Receiver}; use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher}; +use parking_lot::Mutex; + +struct PendingPath { + workspace: String, + start: Instant, + last: Instant, +} pub struct FileWatcher { rx_event: Receiver>, pub watcher: RecommendedWatcher, + base: PathBuf, + pending: Arc>>, } impl FileWatcher { - pub fn new(data_folder: &Path) -> Result { + pub fn new(data_folder: &Path, backup_host: Option) -> Result { let (tx_event, rx_event) = unbounded(); + let folder = data_folder.join("workspaces"); let mut watcher = notify::recommended_watcher(tx_event)?; - watcher.watch(&data_folder.join("workspaces"), RecursiveMode::Recursive)?; + watcher.watch(&folder, RecursiveMode::Recursive)?; - Ok(Self { rx_event, watcher }) + let pending: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + + { + let pending = pending.clone(); + std::thread::spawn(move || loop { + let last_run = Instant::now(); + let paths = { + let mut paths = Vec::new(); + let mut pending = pending.lock(); + for (path, pending) in pending.iter() { + if pending.last.elapsed().as_millis() > 1000 + || pending.start.elapsed().as_millis() > 1000 * 10 + { + paths.push((path.clone(), pending.workspace.clone())); + } + } + for (p, _) in &paths { + pending.remove(p); + } + paths + }; + if let Some(backup_host) = backup_host.as_ref() { + for (p, workspace) in paths { + let cmd = format!( + r#"rsync -a --delete --filter=':- .gitignore' -e 'ssh -o ControlMaster=auto -o ControlPath="~/.ssh/%L-%r@%h:%p" -o ControlPersist=10m ' {}/ {backup_host}:~/workspaces/{workspace}/"#, + p.to_string_lossy() + ); + tracing::debug!("now run {cmd}"); + if let Err(e) = std::process::Command::new("bash") + .arg("-c") + .arg(cmd) + .output() + { + tracing::error!("got error when backing up {p:?}: {e:?}"); + } + } + } + + let elpased = last_run.elapsed().as_millis(); + if elpased < 1000 { + std::thread::sleep(Duration::from_millis(1000 - elpased as u64)); + } + }); + } + + Ok(Self { + rx_event, + watcher, + base: folder, + pending, + }) } pub fn watch(&self) { @@ -33,7 +99,29 @@ impl FileWatcher { notify::EventKind::Remove(_) => {} notify::EventKind::Other => {} } - println!("paths {:?}", event.paths); + for path in event.paths { + if let Ok(p) = path.strip_prefix(&self.base) { + let mut components = p.components(); + if let Some(user) = components.next() { + if let Some(workspace) = components.next() { + let workspace_full_path = self.base.join(user).join(workspace); + let mut pending = self.pending.lock(); + let pending = + pending.entry(workspace_full_path).or_insert_with(move || { + PendingPath { + workspace: workspace + .as_os_str() + .to_string_lossy() + .to_string(), + start: Instant::now(), + last: Instant::now(), + } + }); + pending.last = Instant::now(); + } + } + } + } } } tracing::info!("lapdev ws file watcher stopped");