From 2ec5ab35e73b5cc1ea82fa51b289bb687e0680d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Smolarek?= <34063647+Razz4780@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:05:28 +0100 Subject: [PATCH 1/4] Add namespaces to `mirrord ls` output (#3003) * Additional command flag, rich output skeleton * listing functions extracted * Added namespaces to the output * mirrord_cli::list docs * mirrord_kube::api::kubernetes::list docs * Removed dead code * flag doc * Changelog * Moved output control to env var * Clippy * Reduce print_targets visibility, change display wrapper name * FoundTargetsList doc * Expect message fixed * print_targets doc * Move listing utils to KubeResourceSeeker --- changelog.d/2999.added.md | 1 + mirrord/cli/src/config.rs | 9 +- mirrord/cli/src/list.rs | 215 ++++++++++++++++++++++ mirrord/cli/src/main.rs | 123 ++----------- mirrord/cli/src/port_forward.rs | 22 +-- mirrord/kube/src/api/kubernetes/seeker.rs | 140 +++++++++----- 6 files changed, 335 insertions(+), 175 deletions(-) create mode 100644 changelog.d/2999.added.md create mode 100644 mirrord/cli/src/list.rs diff --git a/changelog.d/2999.added.md b/changelog.d/2999.added.md new file mode 100644 index 00000000000..ac9b5676f4d --- /dev/null +++ b/changelog.d/2999.added.md @@ -0,0 +1 @@ +Added available namespaces to `mirrord ls` output. New output format is enabled with `--rich` flag. diff --git a/mirrord/cli/src/config.rs b/mirrord/cli/src/config.rs index 367504ddbf4..96548235ad4 100644 --- a/mirrord/cli/src/config.rs +++ b/mirrord/cli/src/config.rs @@ -712,11 +712,18 @@ pub(super) struct ListTargetArgs { #[arg(short = 'n', long = "namespace")] pub namespace: Option, - /// Specify config file to use + /// Specify config file to use. #[arg(short = 'f', long, value_hint = ValueHint::FilePath)] pub config_file: Option, } +impl ListTargetArgs { + /// Controls the output of `mirrord ls`. + /// If set to `true`, the command outputs a JSON object that contains more data. + /// Otherwise, it outputs a plain array of target paths. + pub(super) const RICH_OUTPUT_ENV: &str = "MIRRORD_LS_RICH_OUTPUT"; +} + #[derive(Args, Debug)] pub(super) struct ExtensionExecArgs { /// Specify config file to use diff --git a/mirrord/cli/src/list.rs b/mirrord/cli/src/list.rs new file mode 100644 index 00000000000..2b8059bad62 --- /dev/null +++ b/mirrord/cli/src/list.rs @@ -0,0 +1,215 @@ +use std::sync::LazyLock; + +use futures::TryStreamExt; +use k8s_openapi::api::core::v1::Namespace; +use kube::Client; +use mirrord_analytics::NullReporter; +use mirrord_config::{ + config::{ConfigContext, MirrordConfig}, + LayerConfig, LayerFileConfig, +}; +use mirrord_kube::{ + api::kubernetes::{create_kube_config, seeker::KubeResourceSeeker}, + error::KubeApiError, +}; +use mirrord_operator::client::OperatorApi; +use semver::VersionReq; +use serde::{ser::SerializeSeq, Serialize, Serializer}; + +use crate::{util, CliError, CliResult, Format, ListTargetArgs}; + +/// A mirrord target found in the cluster. +#[derive(Serialize)] +struct FoundTarget { + /// E.g `pod/my-pod-1234/container/my-container`. + path: String, + + /// Whether this target is currently available. + /// + /// # Note + /// + /// Right now this is always true. Some preliminary checks are done in the + /// [`KubeResourceSeeker`] and results come filtered. + /// + /// This field is here for forward compatibility, because in the future we might want to return + /// unavailable targets as well (along with some validation error message) to improve UX. + available: bool, +} + +/// Result of mirrord targets lookup in the cluster. +#[derive(Serialize)] +struct FoundTargets { + /// In order: + /// 1. deployments + /// 2. rollouts + /// 3. statefulsets + /// 4. cronjobs + /// 5. jobs + /// 6. pods + targets: Vec, + + /// Current lookup namespace. + /// + /// Taken from [`LayerConfig::target`], defaults to [`Client`]'s default namespace. + current_namespace: String, + + /// Available lookup namespaces. + namespaces: Vec, +} + +impl FoundTargets { + /// Performs a lookup of mirrord targets in the cluster. + /// + /// Unless the operator is explicitly disabled, attempts to connect with it. + /// Operator lookup affects returned results (e.g some targets are only available via the + /// operator). + /// + /// If `fetch_namespaces` is set, returned [`FoundTargets`] will contain info about namespaces + /// available in the cluster. + async fn resolve(config: LayerConfig, fetch_namespaces: bool) -> CliResult { + let client = create_kube_config( + config.accept_invalid_certificates, + config.kubeconfig.clone(), + config.kube_context.clone(), + ) + .await + .and_then(|config| Client::try_from(config).map_err(From::from)) + .map_err(|error| { + CliError::friendlier_error_or_else(error, CliError::CreateKubeApiFailed) + })?; + + let mut reporter = NullReporter::default(); + let operator_api = if config.operator != Some(false) + && let Some(api) = OperatorApi::try_new(&config, &mut reporter).await? + { + let api = api.prepare_client_cert(&mut reporter).await; + + api.inspect_cert_error( + |error| tracing::error!(%error, "failed to prepare client certificate"), + ); + + Some(api) + } else { + None + }; + + let seeker = KubeResourceSeeker { + client: &client, + namespace: config.target.namespace.as_deref(), + }; + let paths = match operator_api { + None if config.operator == Some(true) => Err(CliError::OperatorNotInstalled), + + Some(api) + if ALL_TARGETS_SUPPORTED_OPERATOR_VERSION + .matches(&api.operator().spec.operator_version) => + { + seeker.all().await.map_err(|error| { + CliError::friendlier_error_or_else(error, CliError::ListTargetsFailed) + }) + } + + _ => seeker.all_open_source().await.map_err(|error| { + CliError::friendlier_error_or_else(error, CliError::ListTargetsFailed) + }), + }?; + + let targets = paths + .into_iter() + .map(|path| FoundTarget { + path, + available: true, + }) + .collect(); + let current_namespace = config + .target + .namespace + .as_deref() + .unwrap_or(client.default_namespace()) + .to_owned(); + + let namespaces = if fetch_namespaces { + seeker + .list_all_clusterwide::(None) + .try_filter_map(|namespace| std::future::ready(Ok(namespace.metadata.name))) + .try_collect::>() + .await + .map_err(KubeApiError::KubeError) + .map_err(|error| { + CliError::friendlier_error_or_else(error, CliError::ListTargetsFailed) + })? + } else { + Default::default() + }; + + Ok(Self { + targets, + current_namespace, + namespaces, + }) + } +} + +/// Thin wrapper over [`FoundTargets`] that implements [`Serialize`]. +/// Its serialized format is a sequence of available target paths. +/// +/// Used to print available targets when the plugin/extension does not support the full format +/// (backward compatibility). +struct FoundTargetsList<'a>(&'a FoundTargets); + +impl Serialize for FoundTargetsList<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let count = self.0.targets.iter().filter(|t| t.available).count(); + let mut list = serializer.serialize_seq(Some(count))?; + + for target in self.0.targets.iter().filter(|t| t.available) { + list.serialize_element(&target.path)?; + } + + list.end() + } +} + +/// Controls whether we support listing all targets or just the open source ones. +static ALL_TARGETS_SUPPORTED_OPERATOR_VERSION: LazyLock = + LazyLock::new(|| ">=3.84.0".parse().expect("version should be valid")); + +/// Fetches mirrord targets from the cluster and prints output to stdout. +/// +/// When `rich_output` is set, targets info is printed as a JSON object containing extra data. +/// Otherwise, targets are printed as a plain JSON array of strings (backward compatibility). +pub(super) async fn print_targets(args: ListTargetArgs, rich_output: bool) -> CliResult<()> { + let mut layer_config = if let Some(config) = &args.config_file { + let mut cfg_context = ConfigContext::default(); + LayerFileConfig::from_path(config)?.generate_config(&mut cfg_context)? + } else { + LayerConfig::from_env()? + }; + + if let Some(namespace) = args.namespace { + layer_config.target.namespace.replace(namespace); + }; + + if !layer_config.use_proxy { + util::remove_proxy_env(); + } + + let targets = FoundTargets::resolve(layer_config, rich_output).await?; + + match args.output { + Format::Json => { + let serialized = if rich_output { + serde_json::to_string(&targets).unwrap() + } else { + serde_json::to_string(&FoundTargetsList(&targets)).unwrap() + }; + + println!("{serialized}"); + } + } + + Ok(()) +} diff --git a/mirrord/cli/src/main.rs b/mirrord/cli/src/main.rs index dde03d6a73e..4631af499dc 100644 --- a/mirrord/cli/src/main.rs +++ b/mirrord/cli/src/main.rs @@ -5,7 +5,7 @@ use std::{ collections::HashMap, env::vars, ffi::CString, net::SocketAddr, os::unix::ffi::OsStrExt, - sync::LazyLock, time::Duration, + time::Duration, }; use clap::{CommandFactory, Parser}; @@ -17,12 +17,10 @@ use diagnose::diagnose_command; use execution::MirrordExecution; use extension::extension_exec; use extract::extract_library; -use kube::Client; use mirrord_analytics::{ - AnalyticsError, AnalyticsReporter, CollectAnalytics, ExecutionKind, NullReporter, Reporter, + AnalyticsError, AnalyticsReporter, CollectAnalytics, ExecutionKind, Reporter, }; use mirrord_config::{ - config::{ConfigContext, MirrordConfig}, feature::{ fs::FsModeConfig, network::{ @@ -33,16 +31,13 @@ use mirrord_config::{ LayerConfig, LayerFileConfig, MIRRORD_CONFIG_FILE_ENV, }; use mirrord_intproxy::agent_conn::{AgentConnection, AgentConnectionError}; -use mirrord_kube::api::kubernetes::{create_kube_config, seeker::KubeResourceSeeker}; -use mirrord_operator::client::OperatorApi; use mirrord_progress::{messages::EXEC_CONTAINER_BINARY, Progress, ProgressTracker}; #[cfg(all(target_os = "macos", target_arch = "aarch64"))] use nix::errno::Errno; use operator::operator_command; use port_forward::{PortForwardError, PortForwarder, ReversePortForwarder}; use regex::Regex; -use semver::{Version, VersionReq}; -use serde_json::json; +use semver::Version; use tracing::{error, info, warn}; use which::which; @@ -56,9 +51,10 @@ mod extension; mod external_proxy; mod extract; mod internal_proxy; +mod list; mod logging; mod operator; -pub mod port_forward; +mod port_forward; mod teams; mod util; mod verify_config; @@ -67,12 +63,6 @@ mod vpn; pub(crate) use error::{CliError, CliResult}; use verify_config::verify_config; -use crate::util::remove_proxy_env; - -/// Controls whether we support listing all targets or just the open source ones. -static ALL_TARGETS_SUPPORTED_OPERATOR_VERSION: LazyLock = - LazyLock::new(|| ">=3.84.0".parse().expect("verion should be valid")); - async fn exec_process

( config: LayerConfig, args: &ExecArgs, @@ -381,100 +371,6 @@ async fn exec(args: &ExecArgs, watch: drain::Watch) -> CliResult<()> { execution_result } -/// Lists targets based on whether or not the operator has been enabled in `layer_config`. -/// If the operator is enabled (and we can reach it), then we list [`KubeResourceSeeker::all`] -/// targets, otherwise we list [`KubeResourceSeeker::all_open_source`] only. -async fn list_targets(layer_config: &LayerConfig, args: &ListTargetArgs) -> CliResult> { - let client = create_kube_config( - layer_config.accept_invalid_certificates, - layer_config.kubeconfig.clone(), - layer_config.kube_context.clone(), - ) - .await - .and_then(|config| Client::try_from(config).map_err(From::from)) - .map_err(|error| CliError::friendlier_error_or_else(error, CliError::CreateKubeApiFailed))?; - - let namespace = args - .namespace - .as_deref() - .or(layer_config.target.namespace.as_deref()); - - let seeker = KubeResourceSeeker { - client: &client, - namespace, - }; - - let mut reporter = NullReporter::default(); - - let operator_api = if layer_config.operator != Some(false) - && let Some(api) = OperatorApi::try_new(layer_config, &mut reporter).await? - { - let api = api.prepare_client_cert(&mut reporter).await; - - api.inspect_cert_error( - |error| tracing::error!(%error, "failed to prepare client certificate"), - ); - - Some(api) - } else { - None - }; - - match operator_api { - None if layer_config.operator == Some(true) => Err(CliError::OperatorNotInstalled), - Some(api) - if ALL_TARGETS_SUPPORTED_OPERATOR_VERSION - .matches(&api.operator().spec.operator_version) => - { - seeker.all().await.map_err(|error| { - CliError::friendlier_error_or_else(error, CliError::ListTargetsFailed) - }) - } - _ => seeker.all_open_source().await.map_err(|error| { - CliError::friendlier_error_or_else(error, CliError::ListTargetsFailed) - }), - } -} - -/// Lists all possible target paths. -/// Tries to use operator if available, otherwise falls back to k8s API (if operator isn't -/// explicitly true). Example: -/// ``` -/// [ -/// "pod/metalbear-deployment-85c754c75f-982p5", -/// "pod/nginx-deployment-66b6c48dd5-dc9wk", -/// "pod/py-serv-deployment-5c57fbdc98-pdbn4/container/py-serv", -/// "deployment/nginx-deployment" -/// "deployment/nginx-deployment/container/nginx" -/// "rollout/nginx-rollout" -/// "statefulset/nginx-statefulset" -/// "statefulset/nginx-statefulset/container/nginx" -/// ] -/// ``` -async fn print_targets(args: &ListTargetArgs) -> CliResult<()> { - let mut layer_config = if let Some(config) = &args.config_file { - let mut cfg_context = ConfigContext::default(); - LayerFileConfig::from_path(config)?.generate_config(&mut cfg_context)? - } else { - LayerConfig::from_env()? - }; - - if let Some(namespace) = &args.namespace { - layer_config.target.namespace = Some(namespace.clone()); - }; - - if !layer_config.use_proxy { - remove_proxy_env(); - } - - // The targets come sorted in the following order: - // `deployments - rollouts - statefulsets - cronjobs - jobs - pods` - let targets = list_targets(&layer_config, args).await?; - let json_obj = json!(targets); - println!("{json_obj}"); - Ok(()) -} - async fn port_forward(args: &PortForwardArgs, watch: drain::Watch) -> CliResult<()> { fn hash_port_mappings( args: &PortForwardArgs, @@ -661,7 +557,14 @@ fn main() -> miette::Result<()> { false, )?; } - Commands::ListTargets(args) => print_targets(&args).await?, + Commands::ListTargets(args) => { + let rich_output = std::env::var(ListTargetArgs::RICH_OUTPUT_ENV) + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or_default(); + + list::print_targets(*args, rich_output).await? + } Commands::Operator(args) => operator_command(*args).await?, Commands::ExtensionExec(args) => { extension_exec(*args, watch).await?; diff --git a/mirrord/cli/src/port_forward.rs b/mirrord/cli/src/port_forward.rs index ba59679d945..002a904ae1b 100644 --- a/mirrord/cli/src/port_forward.rs +++ b/mirrord/cli/src/port_forward.rs @@ -29,8 +29,7 @@ use mirrord_protocol::{ LayerClose, LayerConnect, LayerWrite, SocketAddress, }, tcp::{Filter, HttpFilter, LayerTcp, LayerTcpSteal, StealType}, - ClientMessage, ConnectionId, DaemonMessage, LogLevel, Port, ResponseError, - CLIENT_READY_FOR_LOGS, + ClientMessage, ConnectionId, DaemonMessage, LogLevel, Port, CLIENT_READY_FOR_LOGS, }; use thiserror::Error; use tokio::{ @@ -969,19 +968,12 @@ impl IncomingMode { #[derive(Debug, Error)] pub enum PortForwardError { - // setup errors - #[error("wrong combination of arguments used: {0}")] - ArgsError(String), - #[error("multiple port forwarding mappings found for local address `{0}`")] PortMapSetupError(SocketAddr), #[error("multiple port forwarding mappings found for desination port `{0:?}`")] ReversePortMapSetupError(RemotePort), - #[error("no port forwarding mappings were provided")] - NoMappingsError(), - // running errors #[error("agent closed connection with error: `{0}`")] AgentError(String), @@ -992,18 +984,9 @@ pub enum PortForwardError { #[error("error from Incoming Proxy task")] IncomingProxyError(IntProxyError), - #[error("failed to send Ping to agent: `{0}`")] - PingError(String), - #[error("TcpListener operation failed with error: `{0}`")] TcpListenerError(std::io::Error), - #[error("TcpStream operation failed with error: `{0}`")] - TcpStreamError(std::io::Error), - - #[error("no destination address found for local address `{0}`")] - SocketMappingNotFound(SocketAddr), - #[error("no task for socket {0} ready to receive connection ID: `{1}`")] ReadyTaskNotFound(SocketAddr, ConnectionId), @@ -1012,9 +995,6 @@ pub enum PortForwardError { #[error("failed to establish connection with remote process: `{0}`")] ConnectionError(String), - - #[error("failed to subscribe to remote port: `{0}`")] - SubscriptionError(ResponseError), } impl From> for PortForwardError { diff --git a/mirrord/kube/src/api/kubernetes/seeker.rs b/mirrord/kube/src/api/kubernetes/seeker.rs index b9429610f8e..e5208fcd8b5 100644 --- a/mirrord/kube/src/api/kubernetes/seeker.rs +++ b/mirrord/kube/src/api/kubernetes/seeker.rs @@ -1,6 +1,5 @@ use std::fmt; -use async_stream::stream; use futures::{stream, Stream, StreamExt, TryStreamExt}; use k8s_openapi::{ api::{ @@ -8,17 +7,17 @@ use k8s_openapi::{ batch::v1::{CronJob, Job}, core::v1::Pod, }, - Metadata, NamespaceResourceScope, + ClusterResourceScope, Metadata, NamespaceResourceScope, }; -use kube::{api::ListParams, Resource}; -use serde::de; +use kube::{api::ListParams, Api, Resource}; +use serde::de::{self, DeserializeOwned}; use crate::{ api::{ container::SKIP_NAMES, kubernetes::{get_k8s_resource_api, rollout::Rollout}, }, - error::Result, + error::{KubeApiError, Result}, }; pub struct KubeResourceSeeker<'a> { @@ -95,7 +94,7 @@ impl KubeResourceSeeker<'_> { Some((name, containers)) } - self.list_resource::(Some("status.phase=Running")) + self.list_all_namespaced(Some("status.phase=Running")) .try_filter(|pod| std::future::ready(check_pod_status(pod))) .try_filter_map(|pod| std::future::ready(Ok(create_pod_container_map(pod)))) .map_ok(|(pod, containers)| { @@ -111,6 +110,7 @@ impl KubeResourceSeeker<'_> { .try_flatten() .try_collect() .await + .map_err(KubeApiError::KubeError) } /// The list of deployments that have at least 1 `Replicas` and a deployment name. @@ -123,7 +123,7 @@ impl KubeResourceSeeker<'_> { .unwrap_or(false) } - self.list_resource::(None) + self.list_all_namespaced::(None) .filter(|response| std::future::ready(response.is_ok())) .try_filter(|deployment| std::future::ready(check_deployment_replicas(deployment))) .try_filter_map(|deployment| { @@ -134,60 +134,114 @@ impl KubeResourceSeeker<'_> { }) .try_collect() .await + .map_err(From::from) } - /// Helper to get the list of a resource type ([`Pod`], [`Deployment`], [`Rollout`], [`Job`], - /// [`CronJob`], [`StatefulSet`], or whatever satisfies `R`) through the kube api. - fn list_resource<'s, R>( - &self, - field_selector: Option<&'s str>, - ) -> impl Stream> + 's + async fn simple_list_resource<'s, R>(&self, prefix: &'s str) -> Result> where - R: Clone + fmt::Debug + for<'de> de::Deserialize<'de> + 's, - R: Resource, + R: 'static + + Clone + + fmt::Debug + + for<'de> de::Deserialize<'de> + + Resource + + Metadata + + Send, { - let Self { client, namespace } = self; - let resource_api = get_k8s_resource_api::(client, *namespace); + self.list_all_namespaced::(None) + .filter(|response| std::future::ready(response.is_ok())) + .try_filter_map(|rollout| { + std::future::ready(Ok(rollout + .meta() + .name + .as_ref() + .map(|name| format!("{prefix}/{name}")))) + }) + .try_collect() + .await + .map_err(From::from) + } + + /// Prepares [`ListParams`] that: + /// 1. Excludes our own resources + /// 2. Adds a limit for item count in a response + fn make_list_params(field_selector: Option<&str>) -> ListParams { + ListParams { + label_selector: Some("app!=mirrord,!operator.metalbear.co/owner".to_string()), + field_selector: field_selector.map(ToString::to_string), + limit: Some(500), + ..Default::default() + } + } - stream! { - let mut params = ListParams { - label_selector: Some("app!=mirrord,!operator.metalbear.co/owner".to_string()), - field_selector: field_selector.map(ToString::to_string), - limit: Some(500), - ..Default::default() - }; + /// Returns a [`Stream`] of all objects in this [`KubeResourceSeeker`]'s namespace. + /// + /// 1. `field_selector` can be used for filtering. + /// 2. Our own resources are excluded. + pub fn list_all_namespaced( + &self, + field_selector: Option<&str>, + ) -> impl 'static + Stream> + Send + where + R: 'static + + Resource + + fmt::Debug + + Clone + + DeserializeOwned + + Send, + { + let api = get_k8s_resource_api(self.client, self.namespace); + let mut params = Self::make_list_params(field_selector); + async_stream::stream! { loop { - let resource = resource_api.list(¶ms).await?; + let response = api.list(¶ms).await?; - for resource in resource.items { + for resource in response.items { yield Ok(resource); } - if let Some(continue_token) = resource.metadata.continue_ && !continue_token.is_empty() { - params = params.continue_token(&continue_token); - } else { + let continue_token = response.metadata.continue_.unwrap_or_default(); + if continue_token.is_empty() { break; } + params.continue_token.replace(continue_token); } } } - async fn simple_list_resource<'s, R>(&self, prefix: &'s str) -> Result> + /// Returns a [`Stream`] of all objects in the cluster. + /// + /// 1. `field_selector` can be used for filtering. + /// 2. Our own resources are excluded. + pub fn list_all_clusterwide( + &self, + field_selector: Option<&str>, + ) -> impl 'static + Stream> + Send where - R: Clone + fmt::Debug + for<'de> de::Deserialize<'de>, - R: Resource + Metadata, + R: 'static + + Resource + + fmt::Debug + + Clone + + DeserializeOwned + + Send, { - self.list_resource::(None) - .filter(|response| std::future::ready(response.is_ok())) - .try_filter_map(|rollout| { - std::future::ready(Ok(rollout - .meta() - .name - .as_ref() - .map(|name| format!("{prefix}/{name}")))) - }) - .try_collect() - .await + let api = Api::all(self.client.clone()); + let mut params = Self::make_list_params(field_selector); + + async_stream::stream! { + loop { + let response = api.list(¶ms).await?; + + for resource in response.items { + yield Ok(resource); + } + + let continue_token = response.metadata.continue_.unwrap_or_default(); + if continue_token.is_empty() { + break; + } + params.continue_token.replace(continue_token); + } + } } } From a51e981ac6f54fa04c978e326acb3a570d53319e Mon Sep 17 00:00:00 2001 From: meowjesty <43983236+meowjesty@users.noreply.github.com> Date: Thu, 9 Jan 2025 15:18:11 -0300 Subject: [PATCH 2/4] Add policy for file ops. (#2997) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add policy to exclude env vars. * docs * changelog * fix tests * Make policy optional for backwards compat. * typo * other typo * default serde and more explicit docs * fix mismatching policy * make it look more like config * make it pub * fix test * better docs Co-authored-by: Michał Smolarek <34063647+Razz4780@users.noreply.github.com> * better docs due Co-authored-by: Michał Smolarek <34063647+Razz4780@users.noreply.github.com> * rustfmt * Add policy for file ops. * no exclude * update protocol with open_local_version * bump protocol * lint test * docs * fix test * change min protocol version * e2e test for fspolicy * Ignore fs policy test/ * namespaced test * hopefully fixed policy test * the children get to live * fix test policy name Co-authored-by: t4lz * fix go fs test * remove read_write * add newline to python test * changelog --------- Co-authored-by: Michał Smolarek <34063647+Razz4780@users.noreply.github.com> Co-authored-by: t4lz --- Cargo.lock | 377 ++++++++++-------- changelog.d/+104-policy-fs.added.md | 1 + mirrord/layer/src/detour.rs | 4 + mirrord/layer/src/error.rs | 1 + mirrord/layer/src/file/ops.rs | 7 +- mirrord/operator/src/crd/policy.rs | 34 ++ mirrord/protocol/Cargo.toml | 2 +- mirrord/protocol/src/error.rs | 3 + mirrord/protocol/src/file.rs | 3 + tests/go-e2e-dir/main.go | 15 +- .../fspolicy/test_operator_fs_policy.mjs | 54 +++ tests/python-e2e/files_ro.py | 4 +- tests/python-e2e/ops.py | 2 +- tests/src/operator/policies.rs | 9 + tests/src/operator/policies/fs.rs | 87 ++++ tests/src/utils.rs | 8 + 16 files changed, 437 insertions(+), 174 deletions(-) create mode 100644 changelog.d/+104-policy-fs.added.md create mode 100644 tests/node-e2e/fspolicy/test_operator_fs_policy.mjs create mode 100644 tests/src/operator/policies/fs.rs diff --git a/Cargo.lock b/Cargo.lock index bcd9d13b8a7..89528a03ca1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,7 +65,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -182,7 +182,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -440,7 +440,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", "synstructure 0.13.1", ] @@ -463,7 +463,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -508,7 +508,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -540,7 +540,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -551,13 +551,13 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.83" +version = "0.1.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" +checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -583,9 +583,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.5.12" +version = "1.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "649316840239f4e58df0b7f620c428f5fababbbca2d504488c641534050bd141" +checksum = "c03a50b30228d3af8865ce83376b4e99e1ffa34728220fe2860e4df0bb5278d6" dependencies = [ "aws-credential-types", "aws-runtime", @@ -650,9 +650,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.2" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44f6f1124d6e19ab6daf7f2e615644305dc6cb2d706892a8a8c0b98db35de020" +checksum = "b16d1aa50accc11a4b4d5c50f7fb81cc0cf60328259c587d0e6b0f11385bde46" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -675,9 +675,9 @@ dependencies = [ [[package]] name = "aws-sdk-sqs" -version = "1.52.0" +version = "1.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9466fd797274f6c55454ea5aac51fc55a9bac6ca2116ed32cfb3134bb3fbcf0" +checksum = "6493ce2b27a2687b0d8a2453bf6ad2499012e9720c3367cb1206496ede475443" dependencies = [ "aws-credential-types", "aws-runtime", @@ -697,9 +697,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.52.0" +version = "1.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb25f7129c74d36afe33405af4517524df8f74b635af8c2c8e91c1552b8397b2" +checksum = "1605dc0bf9f0a4b05b451441a17fcb0bda229db384f23bf5cead3adbab0664ac" dependencies = [ "aws-credential-types", "aws-runtime", @@ -719,9 +719,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.53.0" +version = "1.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d03a3d5ef14851625eafd89660a751776f938bf32f309308b20dcca41c44b568" +checksum = "59f3f73466ff24f6ad109095e0f3f2c830bfb4cd6c8b12f744c8e61ebf4d3ba1" dependencies = [ "aws-credential-types", "aws-runtime", @@ -741,9 +741,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.53.0" +version = "1.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf3a9f073ae3a53b54421503063dfb87ff1ea83b876f567d92e8b8d9942ba91b" +checksum = "249b2acaa8e02fd4718705a9494e3eb633637139aa4bb09d70965b0448e865db" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1109,7 +1109,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.93", + "syn 2.0.95", "which 4.4.2", ] @@ -1269,9 +1269,9 @@ dependencies = [ [[package]] name = "bstr" -version = "1.11.1" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "786a307d683a5bf92e6fd5fd69a7eb613751668d1d8d67d802846dfe367c62c8" +checksum = "531a9155a481e2ee699d4f98f43c0ca4ff8ee1bfd55c31e9e98fb29d2b176fe0" dependencies = [ "memchr", "serde", @@ -1380,9 +1380,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.6" +version = "1.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d6dbb628b8f8555f86d0323c2eb39e3ec81901f4b83e091db8a6a76d316a333" +checksum = "a012a0df96dd6d06ba9a1b29d6402d1a5d77c6befd2566afdc26e10603dc93d7" dependencies = [ "jobserver", "libc", @@ -1470,9 +1470,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.23" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" +checksum = "9560b07a799281c7e0958b9296854d6fafd4c5f31444a7e5bb1ad6dde5ccf1bd" dependencies = [ "clap_builder", "clap_derive", @@ -1480,9 +1480,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.23" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" +checksum = "874e0dd3eb68bf99058751ac9712f622e61e6f393a94f7128fa26e3f02f5c7cd" dependencies = [ "anstream", "anstyle", @@ -1492,23 +1492,23 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.5.40" +version = "4.5.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac2e663e3e3bed2d32d065a8404024dad306e699a04263ec59919529f803aee9" +checksum = "942dc5991a34d8cf58937ec33201856feba9cbceeeab5adf04116ec7c763bff1" dependencies = [ "clap", ] [[package]] name = "clap_derive" -version = "4.5.18" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +checksum = "54b755194d6389280185988721fffba69495eed5ee9feeee9a599b53db80318c" dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -1789,7 +1789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a2785755761f3ddc1492979ce1e48d2c00d09311c39e4466429188f3dd6501" dependencies = [ "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -1821,7 +1821,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -1845,7 +1845,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -1856,7 +1856,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -1932,7 +1932,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -1942,7 +1942,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -1955,7 +1955,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -2054,7 +2054,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -2116,7 +2116,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -2168,7 +2168,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -2188,7 +2188,7 @@ checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -2200,7 +2200,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -2466,7 +2466,7 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcca6a476beb610ebeab9c0bc60b3535aa103b52a2c265dc9d3d26209bea666c" dependencies = [ - "reqwest 0.12.11", + "reqwest 0.12.12", "tar", "xz", ] @@ -2585,7 +2585,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -2624,6 +2624,19 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd" +dependencies = [ + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2657,7 +2670,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -3204,7 +3217,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -3331,7 +3344,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -3771,7 +3784,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -3913,6 +3926,19 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lzma-sys" version = "0.1.20" @@ -3962,7 +3988,7 @@ dependencies = [ "clap", "glob", "rand", - "syn 2.0.93", + "syn 2.0.95", "thiserror 2.0.9", "tracing", "tracing-subscriber", @@ -4030,7 +4056,7 @@ checksum = "23c9b935fbe1d6cbd1dac857b54a688145e2d93f48db36010514d0f612d0ad67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -4123,7 +4149,7 @@ dependencies = [ "rand", "rcgen", "regex", - "reqwest 0.12.11", + "reqwest 0.12.12", "rstest", "rustls 0.23.20", "rustls-pemfile 2.2.0", @@ -4204,7 +4230,7 @@ dependencies = [ "assert-json-diff", "base64 0.22.1", "drain", - "reqwest 0.12.11", + "reqwest 0.12.12", "serde", "serde_json", "tokio", @@ -4222,7 +4248,7 @@ dependencies = [ "k8s-openapi", "kube", "pem", - "reqwest 0.12.11", + "reqwest 0.12.12", "serde", "serde_yaml", "thiserror 2.0.9", @@ -4262,7 +4288,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -4299,7 +4325,7 @@ dependencies = [ "mirrord-operator", "mirrord-protocol", "rand", - "reqwest 0.12.11", + "reqwest 0.12.12", "rstest", "rustls 0.23.20", "rustls-pemfile 2.2.0", @@ -4395,7 +4421,7 @@ version = "3.128.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -4405,7 +4431,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "semver 1.0.24", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -4453,7 +4479,7 @@ dependencies = [ [[package]] name = "mirrord-protocol" -version = "1.13.2" +version = "1.13.3" dependencies = [ "actix-codec", "bincode", @@ -4530,26 +4556,25 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] name = "moka" -version = "0.12.8" +version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" dependencies = [ "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", - "once_cell", + "loom", "parking_lot", - "quanta", + "portable-atomic", "rustc_version", "smallvec", "tagptr", "thiserror 1.0.69", - "triomphe", "uuid", ] @@ -4704,7 +4729,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -4952,7 +4977,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -5011,7 +5036,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -5037,18 +5062,18 @@ dependencies = [ [[package]] name = "phf" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" dependencies = [ "phf_shared", ] [[package]] name = "phf_codegen" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a" dependencies = [ "phf_generator", "phf_shared", @@ -5056,9 +5081,9 @@ dependencies = [ [[package]] name = "phf_generator" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", "rand", @@ -5066,38 +5091,38 @@ dependencies = [ [[package]] name = "phf_shared" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" dependencies = [ "siphasher", ] [[package]] name = "pin-project" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" +checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" +checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] name = "pin-project-lite" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "pin-utils" @@ -5207,7 +5232,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -5302,12 +5327,12 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.25" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" +checksum = "483f8c21f64f3ea09fe0f30f5d48c3e8eefe5dac9129f0075f76593b4c1da705" dependencies = [ "proc-macro2", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -5361,7 +5386,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -5381,7 +5406,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", "version_check", "yansi", ] @@ -5437,7 +5462,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.93", + "syn 2.0.95", "tempfile", ] @@ -5451,7 +5476,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -5463,21 +5488,6 @@ dependencies = [ "prost", ] -[[package]] -name = "quanta" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773ce68d0bb9bc7ef20be3536ffe94e223e1f365bd374108b2659fac0c65cfe6" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi", - "web-sys", - "winapi", -] - [[package]] name = "quick-error" version = "1.2.3" @@ -5628,15 +5638,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "raw-cpuid" -version = "11.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" -dependencies = [ - "bitflags 2.6.0", -] - [[package]] name = "rawsocket" version = "0.1.0" @@ -5823,9 +5824,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.11" +version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe060fe50f524be480214aba758c71f99f90ee8c83c5a36b5e9e1d568eb4eb3" +checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64 0.22.1", "bytes", @@ -5919,7 +5920,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.93", + "syn 2.0.95", "unicode-ident", ] @@ -6086,7 +6087,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.1.0", + "security-framework 3.2.0", ] [[package]] @@ -6217,9 +6218,15 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.93", + "syn 2.0.95", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -6243,7 +6250,7 @@ checksum = "7f81c2fde025af7e69b1d1420531c8a8811ca898919db177141a85313b1cb932" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -6294,9 +6301,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "3.1.0" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81d3f8c9bfcc3cbb6b0179eb57042d75b1582bdc65c3cb95f3fa999509c03cbc" +checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ "bitflags 2.6.0", "core-foundation 0.10.0", @@ -6307,9 +6314,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.13.0" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1863fd3768cd83c56a7f60faa4dc0d403f1b6df0a38c3c25f44b7894e45370d5" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" dependencies = [ "core-foundation-sys", "libc", @@ -6367,7 +6374,7 @@ checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -6378,14 +6385,14 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] name = "serde_json" -version = "1.0.134" +version = "1.0.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" +checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" dependencies = [ "itoa", "memchr", @@ -6411,7 +6418,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -6544,9 +6551,9 @@ checksum = "5dd19be0257552dd56d1bb6946f89f193c6e5b9f13cc9327c4bc84a357507c74" [[package]] name = "siphasher" -version = "0.3.11" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" [[package]] name = "slab" @@ -6686,7 +6693,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -6729,9 +6736,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.93" +version = "2.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c786062daee0d6db1132800e623df74274a0a87322d8e183338e01b3d98d058" +checksum = "46f71c0377baf4ef1cc3e3402ded576dccc315800fbc62dfc7fe04b009773b4a" dependencies = [ "proc-macro2", "quote", @@ -6773,7 +6780,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -6832,12 +6839,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.14.0" +version = "3.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" dependencies = [ "cfg-if", "fastrand", + "getrandom", "once_cell", "rustix", "windows-sys 0.59.0", @@ -6943,7 +6951,7 @@ dependencies = [ "mirrord-operator", "rand", "regex", - "reqwest 0.12.11", + "reqwest 0.12.12", "rstest", "rustls 0.23.20", "serde", @@ -6990,7 +6998,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -7001,7 +7009,7 @@ checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -7105,7 +7113,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -7273,7 +7281,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -7363,7 +7371,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -7418,12 +7426,6 @@ dependencies = [ "tracing-serde", ] -[[package]] -name = "triomphe" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" - [[package]] name = "try-lock" version = "0.2.5" @@ -7783,7 +7785,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", "wasm-bindgen-shared", ] @@ -7818,7 +7820,7 @@ checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -7942,6 +7944,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +dependencies = [ + "windows-core 0.58.0", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -7951,6 +7963,41 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.95", +] + +[[package]] +name = "windows-interface" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.95", +] + [[package]] name = "windows-registry" version = "0.2.0" @@ -8131,9 +8178,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.6.20" +version = "0.6.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +checksum = "39281189af81c07ec09db316b302a3e67bf9bd7cbf6c820b50e35fee9c2fa980" dependencies = [ "memchr", ] @@ -8263,9 +8310,9 @@ dependencies = [ [[package]] name = "xattr" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" +checksum = "e105d177a3871454f754b33bb0ee637ecaaac997446375fd3e5d43a2ed00c909" dependencies = [ "libc", "linux-raw-sys", @@ -8337,7 +8384,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", "synstructure 0.13.1", ] @@ -8359,7 +8406,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -8379,7 +8426,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", "synstructure 0.13.1", ] @@ -8400,7 +8447,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] @@ -8422,7 +8469,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.93", + "syn 2.0.95", ] [[package]] diff --git a/changelog.d/+104-policy-fs.added.md b/changelog.d/+104-policy-fs.added.md new file mode 100644 index 00000000000..1ad43a13736 --- /dev/null +++ b/changelog.d/+104-policy-fs.added.md @@ -0,0 +1 @@ +Add policy to control file ops. diff --git a/mirrord/layer/src/detour.rs b/mirrord/layer/src/detour.rs index a89e79ce0f6..92a014d20dd 100644 --- a/mirrord/layer/src/detour.rs +++ b/mirrord/layer/src/detour.rs @@ -215,6 +215,10 @@ pub(crate) enum Bypass { /// Useful for operations that are version gated, and we want to bypass when the protocol /// doesn't support them. NotImplemented, + + /// File `open` (any `open`-ish operation) was forced to be local, instead of remote, most + /// likely due to an operator fs policy. + OpenLocal, } impl Bypass { diff --git a/mirrord/layer/src/error.rs b/mirrord/layer/src/error.rs index ea0cbabe3c8..da4797a1916 100644 --- a/mirrord/layer/src/error.rs +++ b/mirrord/layer/src/error.rs @@ -248,6 +248,7 @@ impl From for i64 { HookError::BincodeEncode(_) => libc::EINVAL, HookError::ResponseError(response_fail) => match response_fail { ResponseError::IdsExhausted(_) => libc::ENOMEM, + ResponseError::OpenLocal => libc::ENOENT, ResponseError::NotFound(_) => libc::ENOENT, ResponseError::NotDirectory(_) => libc::ENOTDIR, ResponseError::NotFile(_) => libc::EISDIR, diff --git a/mirrord/layer/src/file/ops.rs b/mirrord/layer/src/file/ops.rs index 580f8eacc8a..ca9dd10f951 100644 --- a/mirrord/layer/src/file/ops.rs +++ b/mirrord/layer/src/file/ops.rs @@ -206,7 +206,12 @@ pub(crate) fn open(path: Detour, open_options: OpenOptionsInternal) -> ensure_not_ignored!(path, open_options.is_write()); - let OpenFileResponse { fd: remote_fd } = RemoteFile::remote_open(path.clone(), open_options)?; + let OpenFileResponse { fd: remote_fd } = RemoteFile::remote_open(path.clone(), open_options) + .or_else(|fail| match fail { + // The operator has a policy that matches this `path` as local-only. + HookError::ResponseError(ResponseError::OpenLocal) => Detour::Bypass(Bypass::OpenLocal), + other => Detour::Error(other), + })?; // TODO: Need a way to say "open a directory", right now `is_dir` always returns false. // This requires having a fake directory name (`/fake`, for example), instead of just converting diff --git a/mirrord/operator/src/crd/policy.rs b/mirrord/operator/src/crd/policy.rs index 1ad9447d1e8..e236164da98 100644 --- a/mirrord/operator/src/crd/policy.rs +++ b/mirrord/operator/src/crd/policy.rs @@ -58,6 +58,11 @@ pub struct MirrordPolicySpec { /// target. #[serde(default)] pub env: EnvPolicy, + + /// Overrides fs ops behaviour, granting control over them to the operator policy, instead of + /// the user config. + #[serde(default)] + pub fs: FsPolicy, } /// Custom cluster-wide resource for policies that limit what mirrord features users can use. @@ -90,6 +95,11 @@ pub struct MirrordClusterPolicySpec { /// target. #[serde(default)] pub env: EnvPolicy, + + /// Overrides fs ops behaviour, granting control over them to the operator policy, instead of + /// the user config. + #[serde(default)] + pub fs: FsPolicy, } /// Policy for controlling environment variables access from mirrord instances. @@ -104,9 +114,33 @@ pub struct EnvPolicy { /// Variable names can be matched using `*` and `?` where `?` matches exactly one occurrence of /// any character and `*` matches arbitrary many (including zero) occurrences of any character, /// e.g. `DATABASE_*` will match `DATABASE_URL` and `DATABASE_PORT`. + #[serde(default)] pub exclude: HashSet, } +/// File operations policy that mimics the mirrord fs config. +/// +/// Allows the operator control over remote file ops behaviour, overriding what the user has set in +/// their mirrord config file, if it matches something in one of the lists (regex sets) of this +/// struct. +#[derive(Clone, Default, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "kebab-case")] +pub struct FsPolicy { + /// The file can only be opened in read-only mode, otherwise the operator returns an IO error. + #[serde(default)] + pub read_only: HashSet, + + /// The file cannot be opened in the remote target. + /// + /// `open` calls that match this are forced to be opened in the local user's machine. + #[serde(default)] + pub local: HashSet, + + /// Any file that matches this returns a file not found error from the operator. + #[serde(default)] + pub not_found: HashSet, +} + #[test] fn check_one_api_group() { use kube::Resource; diff --git a/mirrord/protocol/Cargo.toml b/mirrord/protocol/Cargo.toml index 34832bbe47f..1904cc97f1c 100644 --- a/mirrord/protocol/Cargo.toml +++ b/mirrord/protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mirrord-protocol" -version = "1.13.2" +version = "1.13.3" authors.workspace = true description.workspace = true documentation.workspace = true diff --git a/mirrord/protocol/src/error.rs b/mirrord/protocol/src/error.rs index 9a8e451658c..efb7ff08198 100644 --- a/mirrord/protocol/src/error.rs +++ b/mirrord/protocol/src/error.rs @@ -67,6 +67,9 @@ pub enum ResponseError { #[error("Failed stripping path with `{0}`!")] StripPrefix(String), + + #[error("File has to be opened locally!")] + OpenLocal, } impl From for ResponseError { diff --git a/mirrord/protocol/src/file.rs b/mirrord/protocol/src/file.rs index b2e8e3773f8..c0b4cfe2f18 100644 --- a/mirrord/protocol/src/file.rs +++ b/mirrord/protocol/src/file.rs @@ -25,6 +25,9 @@ pub static READDIR_BATCH_VERSION: LazyLock = pub static MKDIR_VERSION: LazyLock = LazyLock::new(|| ">=1.13.0".parse().expect("Bad Identifier")); +pub static OPEN_LOCAL_VERSION: LazyLock = + LazyLock::new(|| ">=1.13.3".parse().expect("Bad Identifier")); + /// Internal version of Metadata across operating system (macOS, Linux) /// Only mutual attributes #[derive(Encode, Decode, Debug, PartialEq, Clone, Copy, Eq, Default)] diff --git a/tests/go-e2e-dir/main.go b/tests/go-e2e-dir/main.go index b608f01b53b..80006ce58c5 100644 --- a/tests/go-e2e-dir/main.go +++ b/tests/go-e2e-dir/main.go @@ -15,13 +15,20 @@ func main() { os.Exit(-1) } fmt.Printf("DirEntries: %s\n", dir) + // `os.ReadDir` does not include `.` and `..`. - if len(dir) != 2 { + if len(dir) < 2 { os.Exit(-1) } - // `os.ReadDir` sorts the result by file name. - if dir[0].Name() != "app.py" || dir[1].Name() != "test.txt" { - os.Exit(-1) + + // Iterate over the files in this dir, exiting if it's not an expected file name. + for i := 0; i < len(dir); i++ { + dirName := dir[i].Name() + + if dirName != "app.py" && dirName != "test.txt" && dirName != "file.local" && dirName != "file.not-found" && dirName != "file.read-only" && dirName != "file.read-write" { + os.Exit(-1) + } + } err = os.Mkdir("/app/test_mkdir", 0755) diff --git a/tests/node-e2e/fspolicy/test_operator_fs_policy.mjs b/tests/node-e2e/fspolicy/test_operator_fs_policy.mjs new file mode 100644 index 00000000000..8e58bf52cec --- /dev/null +++ b/tests/node-e2e/fspolicy/test_operator_fs_policy.mjs @@ -0,0 +1,54 @@ +import fs from 'fs'; + +fs.open("/app/file.local", (fail, fd) => { + console.log(`open file.local ${fd}`); + if (fd) { + console.log(`SUCCESS /app/file.local ${fd}`); + } + + if (fail) { + console.error(`FAIL /app/file.local ${fail}`); + } +}); + +fs.open("/app/file.not-found", (fail, fd) => { + console.log(`open file.not-found ${fd}`); + if (fd) { + console.log(`SUCCESS /app/file.not-found ${fd}`); + } + + if (fail) { + console.error(`FAIL /app/file.not-found ${fail}`); + } +}); + +fs.open("/app/file.read-only", (fail, fd) => { + if (fd) { + console.log(`SUCCESS /app/file.read-only ${fd}`); + } + + if (fail) { + console.error(`FAIL /app/file.read-only ${fail}`); + } +}); + +fs.open("/app/file.read-only", "r+", (fail, fd) => { + if (fd) { + console.log(`SUCCESS r+ /app/file.read-only ${fd}`); + } + + if (fail) { + console.error(`FAIL r+ /app/file.read-only ${fail}`); + } +}); + +fs.open("/app/file.read-write", "r+", (fail, fd) => { + if (fd) { + console.log(`SUCCESS /app/file.read-write ${fd}`); + } + + if (fail) { + console.error(`FAIL /app/file.read-write ${fail}`); + } +}); + diff --git a/tests/python-e2e/files_ro.py b/tests/python-e2e/files_ro.py index ed99eab5d7f..c6e4e8d5631 100644 --- a/tests/python-e2e/files_ro.py +++ b/tests/python-e2e/files_ro.py @@ -3,7 +3,7 @@ import uuid import unittest -TEXT = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." +TEXT = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n" class FileOpsTest(unittest.TestCase): @@ -22,4 +22,4 @@ def test_read_only(self): if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/python-e2e/ops.py b/tests/python-e2e/ops.py index 8e83271628f..9f94425921e 100644 --- a/tests/python-e2e/ops.py +++ b/tests/python-e2e/ops.py @@ -2,7 +2,7 @@ import uuid import unittest -TEXT = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." +TEXT = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n" class FileOpsTest(unittest.TestCase): diff --git a/tests/src/operator/policies.rs b/tests/src/operator/policies.rs index 61a17d90af5..114d42968e6 100644 --- a/tests/src/operator/policies.rs +++ b/tests/src/operator/policies.rs @@ -18,6 +18,8 @@ use crate::utils::{ config_dir, kube_client, service, Application, KubeService, ResourceGuard, TestProcess, }; +mod fs; + /// Guard that deletes a mirrord policy when dropped. struct PolicyGuard { _inner: ResourceGuard, @@ -128,6 +130,7 @@ fn block_steal_without_qualifiers() -> PolicyTestCase { selector: None, block: vec![BlockedFeature::Steal], env: Default::default(), + fs: Default::default(), }, ), service_b_can_steal: No, @@ -147,6 +150,7 @@ fn block_steal_with_path_pattern() -> PolicyTestCase { selector: None, block: vec![BlockedFeature::Steal], env: Default::default(), + fs: Default::default(), }, ), service_b_can_steal: EvenWithoutFilter, @@ -166,6 +170,7 @@ fn block_unfiltered_steal_with_path_pattern() -> PolicyTestCase { selector: None, block: vec![BlockedFeature::StealWithoutFilter], env: Default::default(), + fs: Default::default(), }, ), service_b_can_steal: EvenWithoutFilter, @@ -185,6 +190,7 @@ fn block_unfiltered_steal_with_deployment_path_pattern() -> PolicyTestCase { selector: None, block: vec![BlockedFeature::StealWithoutFilter], env: Default::default(), + fs: Default::default(), }, ), service_a_can_steal: OnlyWithFilter, @@ -210,6 +216,7 @@ fn block_steal_with_label_selector() -> PolicyTestCase { }), block: vec![BlockedFeature::Steal], env: Default::default(), + fs: Default::default(), }, ), service_b_can_steal: EvenWithoutFilter, @@ -236,6 +243,7 @@ fn block_steal_with_unmatching_policy() -> PolicyTestCase { }), block: vec![BlockedFeature::Steal], env: Default::default(), + fs: Default::default(), }, ), service_b_can_steal: EvenWithoutFilter, @@ -377,6 +385,7 @@ pub async fn create_cluster_policy_and_try_to_mirror( selector: None, block: vec![BlockedFeature::Mirror], env: Default::default(), + fs: Default::default(), }, ), ) diff --git a/tests/src/operator/policies/fs.rs b/tests/src/operator/policies/fs.rs new file mode 100644 index 00000000000..d54ed7bb98d --- /dev/null +++ b/tests/src/operator/policies/fs.rs @@ -0,0 +1,87 @@ +use std::{collections::HashSet, time::Duration}; + +use mirrord_operator::crd::policy::{FsPolicy, MirrordPolicy, MirrordPolicySpec}; +use rstest::{fixture, rstest}; + +use crate::{ + operator::policies::PolicyGuard, + utils::{kube_client, service, Application, KubeService}, +}; + +#[fixture] +async fn fs_service(#[future] kube_client: kube::Client) -> KubeService { + let namespace = format!("e2e-tests-fs-policies-{}", crate::utils::random_string()); + + service( + &namespace, + "NodePort", + "ghcr.io/metalbear-co/mirrord-pytest:latest", + "fs-policy-e2e-test-service", + false, + kube_client, + ) + .await +} + +#[rstest] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[timeout(Duration::from_secs(60))] +pub async fn create_cluster_fs_policy_and_try_file_operations( + #[future] service: KubeService, + #[future] kube_client: kube::Client, +) { + let kube_client = kube_client.await; + let service = service.await; + + // Create policy, delete it when test exits. + let _policy_guard = PolicyGuard::namespaced( + kube_client, + &MirrordPolicy::new( + "e2e-test-fs-policy-with-path-pattern", + MirrordPolicySpec { + target_path: Some("fs_policy_e2e-test-*".into()), + selector: None, + block: Default::default(), + env: Default::default(), + fs: FsPolicy { + read_only: HashSet::from_iter(vec!["file.read-only".to_string()]), + local: HashSet::from_iter(vec!["file.local".to_string()]), + not_found: HashSet::from_iter(vec!["file.not-found".to_string()]), + }, + }, + ), + &service.namespace, + ) + .await; + + let application = Application::NodeFsPolicy; + println!("Running mirrord {application:?} against {}", &service.name); + + let mut test_process = application + .run( + &service.target, + Some(&service.namespace), + Some(vec!["--fs-mode=write"]), + None, + ) + .await; + + test_process.wait_assert_success().await; + + test_process + .assert_stderr_contains("FAIL /app/file.local") + .await; + test_process + .assert_stderr_contains("FAIL /app/file.not-found") + .await; + test_process + .assert_stderr_contains("FAIL r+ /app/file.read-only") + .await; + + test_process + .assert_stdout_contains("SUCCESS /app/file.read-only") + .await; + test_process + .assert_stdout_contains("SUCCESS /app/file.read-write") + .await; +} diff --git a/tests/src/utils.rs b/tests/src/utils.rs index bf807337a09..538a4d753e6 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -100,6 +100,11 @@ pub enum Application { PythonCloseSocketKeepConnection, RustWebsockets, RustSqs, + /// Tries to open files in the remote target, but these operations should succeed or fail based + /// on mirrord `FsPolicy`. + /// + /// - `node-e2e/fspolicy/test_operator_fs_policy.mjs` + NodeFsPolicy, } #[derive(Debug)] @@ -408,6 +413,9 @@ impl Application { Application::NodeHTTP2 => { vec!["node", "node-e2e/http2/test_http2_traffic_steal.mjs"] } + Application::NodeFsPolicy => { + vec!["node", "node-e2e/fspolicy/test_operator_fs_policy.mjs"] + } Application::Go21HTTP => vec!["go-e2e/21.go_test_app"], Application::Go22HTTP => vec!["go-e2e/22.go_test_app"], Application::Go23HTTP => vec!["go-e2e/23.go_test_app"], From 81da4e4eb36889f04892419517c8a4b71c1856df Mon Sep 17 00:00:00 2001 From: t4lz Date: Thu, 9 Jan 2025 22:34:17 +0100 Subject: [PATCH 3/4] IPv6 support for traffic stealing (#2976) * E2E IPv6 steal test * e2e ipv6 service * Local E2E IPv6 testing * No ephemeral, need to delete or uncomment later * For local testing. DROP * add ipv6 flag * allow IPv6 in socket if enabled in config * enable ipv6 in test config * don't change CONTRIBUTING.md formatting * Use IpAddr instad of Ipv4Addr for pod IPs * E2E test with portforwarding * fix tests import * move ipv6 config up to network * Propagate ipv6 setting to an agent arg * fallback agent listener * stealer, iptables - start * add ipv6 listener and iptables, still need to adapt more places * iptable listeners * use filter table for ipv6 * oh no * Revert "oh no" This reverts commit 8fa0954f95c434497b2839db4483269fc2db5132. * try with flush connections * use input chain for IPv6 * fix dumb bug (ip6tables command switch) * add debug logs * add debug logs * revert some stuff * use nat table in ip6tables * ipv6 manual test app * fix test request * fix doc? * thanks clippy * ignore ipv6 test * fix config test * cfg test for ipv6 utils * easy way out * fix tests utils * ipv6 support default to false * fix iptables tests * remove unused methods * fix policies test * update schema * run medschool * fix kube UT * use test image agent * changelog * use published test image again * TODOs * add ipv6 test to CI * add kind cluster config for IPv6 * fix cluster config * CI IPv6 job name * patch kind config to fix fail * use kind bash script * fix cargo test command * agent logs? * maybe with a longer TTL I'll get some logs? * print intproxy logs on failure * show nodes on failure * modprobe? * exec modprobe as command * which modprobe * docker file install kmod * modprobe ip6_tables * load 3 modules * unused vars * undo modprobes * protocol cargo * don't test ipv6 on CI * delete kind cluster creation script, since not testing in CI * CR * apply change to new policy test --- CONTRIBUTING.md | 19 ++ changelog.d/2956.added.md | 1 + mirrord-schema.json | 10 +- mirrord/agent/src/cli.rs | 11 +- mirrord/agent/src/entrypoint.rs | 44 ++- mirrord/agent/src/error.rs | 4 + mirrord/agent/src/steal/connection.rs | 26 +- mirrord/agent/src/steal/ip_tables.rs | 22 +- mirrord/agent/src/steal/ip_tables/output.rs | 5 +- mirrord/agent/src/steal/subscriptions.rs | 253 +++++++++++++++--- mirrord/config/configuration.md | 4 + mirrord/config/src/config/from_env.rs | 4 + mirrord/config/src/feature/network.rs | 17 +- .../config/src/feature/network/incoming.rs | 12 +- mirrord/config/src/lib.rs | 1 + mirrord/kube/src/api/container.rs | 9 +- mirrord/kube/src/api/container/job.rs | 10 +- mirrord/kube/src/api/container/util.rs | 3 +- mirrord/kube/src/api/kubernetes.rs | 10 +- mirrord/kube/src/api/runtime.rs | 8 +- mirrord/layer/src/socket/ops.rs | 3 +- mirrord/protocol/Cargo.toml | 2 +- mirrord/protocol/src/lib.rs | 2 + tests/ipv6-app.yaml | 49 ++++ tests/kind-cluster-ipv6-config.yaml | 9 + tests/src/env.rs | 2 +- tests/src/file_ops.rs | 38 ++- tests/src/http.rs | 14 +- tests/src/issue1317.rs | 9 +- tests/src/lib.rs | 1 + tests/src/operator/concurrent_steal.rs | 15 +- tests/src/operator/policies.rs | 6 +- tests/src/operator/policies/fs.rs | 2 +- tests/src/traffic.rs | 138 +++++++--- tests/src/traffic/steal.rs | 88 ++++-- tests/src/utils.rs | 74 +++-- tests/src/utils/ipv6.rs | 100 +++++++ 37 files changed, 838 insertions(+), 187 deletions(-) create mode 100644 changelog.d/2956.added.md create mode 100644 tests/ipv6-app.yaml create mode 100644 tests/kind-cluster-ipv6-config.yaml create mode 100644 tests/src/utils/ipv6.rs diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 818a39e3a89..1b99ddc4c65 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -113,6 +113,25 @@ For example, a test which only tests sanity of the ephemeral container feature s On Linux, running tests may exhaust a large amount of RAM and crash the machine. To prevent this, limit the number of concurrent jobs by running the command with e.g. `-j 4` +### IPv6 + +Some tests create a single-stack IPv6 service. They can only be run on clusters with IPv6 enabled. +In order to test IPv6 on a local cluster on macOS, you can use Kind: + +1. `brew install kind` +2. ```shell + cat >kind-config.yaml < Result<()> { trace!("start_agent -> Starting agent with args: {args:?}"); - let listener = TcpListener::bind(SocketAddrV4::new( + // listen for client connections + let ipv4_listener_result = TcpListener::bind(SocketAddrV4::new( Ipv4Addr::UNSPECIFIED, args.communicate_port, )) - .await?; + .await; + + let listener = if args.ipv6 && ipv4_listener_result.is_err() { + debug!("IPv6 Support enabled, and IPv4 bind failed, binding IPv6 listener"); + TcpListener::bind(SocketAddrV6::new( + Ipv6Addr::UNSPECIFIED, + args.communicate_port, + 0, + 0, + )) + .await + } else { + ipv4_listener_result + }?; + + match listener.local_addr() { + Ok(addr) => debug!( + client_listener_address = addr.to_string(), + "Created listener." + ), + Err(err) => error!(%err, "listener local address error"), + } let state = State::new(&args).await?; @@ -566,13 +588,15 @@ async fn start_agent(args: Args) -> Result<()> { let cancellation_token = cancellation_token.clone(); let watched_task = WatchedTask::new( TcpConnectionStealer::TASK_NAME, - TcpConnectionStealer::new(stealer_command_rx).and_then(|stealer| async move { - let res = stealer.start(cancellation_token).await; - if let Err(err) = res.as_ref() { - error!("Stealer failed: {err}"); - } - res - }), + TcpConnectionStealer::new(stealer_command_rx, args.ipv6).and_then( + |stealer| async move { + let res = stealer.start(cancellation_token).await; + if let Err(err) = res.as_ref() { + error!("Stealer failed: {err}"); + } + res + }, + ), ); let status = watched_task.status(); let task = run_thread_in_namespace( diff --git a/mirrord/agent/src/error.rs b/mirrord/agent/src/error.rs index ad04e49c8c5..d9ae7cb8b9d 100644 --- a/mirrord/agent/src/error.rs +++ b/mirrord/agent/src/error.rs @@ -84,6 +84,10 @@ pub(crate) enum AgentError { /// Temporary error for vpn feature #[error("Generic error in vpn: {0}")] VpnError(String), + + /// When we neither create a redirector for IPv4, nor for IPv6 + #[error("Could not create a listener for stolen connections")] + CannotListenForStolenConnections, } impl From> for AgentError { diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index 463c61f88d0..37435176b8b 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -1,6 +1,6 @@ use std::{ collections::{HashMap, HashSet}, - net::{IpAddr, Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, }; use fancy_regex::Regex; @@ -289,6 +289,9 @@ pub(crate) struct TcpConnectionStealer { /// Set of active connections stolen by [`Self::port_subscriptions`]. connections: StolenConnections, + + /// Shen set, the stealer will use IPv6 if needed. + support_ipv6: bool, } impl TcpConnectionStealer { @@ -297,14 +300,21 @@ impl TcpConnectionStealer { /// Initializes a new [`TcpConnectionStealer`], but doesn't start the actual work. /// You need to call [`TcpConnectionStealer::start`] to do so. #[tracing::instrument(level = "trace")] - pub(crate) async fn new(command_rx: Receiver) -> Result { + pub(crate) async fn new( + command_rx: Receiver, + support_ipv6: bool, + ) -> Result { let config = envy::prefixed("MIRRORD_AGENT_") .from_env::() .unwrap_or_default(); let port_subscriptions = { - let redirector = - IpTablesRedirector::new(config.stealer_flush_connections, config.pod_ips).await?; + let redirector = IpTablesRedirector::new( + config.stealer_flush_connections, + config.pod_ips, + support_ipv6, + ) + .await?; PortSubscriptions::new(redirector, 4) }; @@ -315,6 +325,7 @@ impl TcpConnectionStealer { clients: HashMap::with_capacity(8), clients_closed: Default::default(), connections: StolenConnections::with_capacity(8), + support_ipv6, }) } @@ -371,9 +382,14 @@ impl TcpConnectionStealer { #[tracing::instrument(level = "trace", skip(self))] async fn incoming_connection(&mut self, stream: TcpStream, peer: SocketAddr) -> Result<()> { let mut real_address = orig_dst::orig_dst_addr(&stream)?; + let localhost = if self.support_ipv6 && real_address.is_ipv6() { + IpAddr::V6(Ipv6Addr::LOCALHOST) + } else { + IpAddr::V4(Ipv4Addr::LOCALHOST) + }; // If we use the original IP we would go through prerouting and hit a loop. // localhost should always work. - real_address.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)); + real_address.set_ip(localhost); let Some(port_subscription) = self.port_subscriptions.get(real_address.port()).cloned() else { diff --git a/mirrord/agent/src/steal/ip_tables.rs b/mirrord/agent/src/steal/ip_tables.rs index 5583b485000..68bddb6a406 100644 --- a/mirrord/agent/src/steal/ip_tables.rs +++ b/mirrord/agent/src/steal/ip_tables.rs @@ -111,6 +111,18 @@ pub fn new_iptables() -> iptables::IPTables { .expect("IPTables initialization may not fail!") } +/// wrapper around iptables::new that uses nft or legacy based on env +pub fn new_ip6tables() -> iptables::IPTables { + if let Ok(val) = std::env::var("MIRRORD_AGENT_NFTABLES") + && val.to_lowercase() == "true" + { + iptables::new_with_cmd("/usr/sbin/ip6tables-nft") + } else { + iptables::new_with_cmd("/usr/sbin/ip6tables-legacy") + } + .expect("IPTables initialization may not fail!") +} + impl Debug for IPTablesWrapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("IPTablesWrapper") @@ -140,7 +152,7 @@ impl IPTables for IPTablesWrapper { } } - #[tracing::instrument(level = "trace")] + #[tracing::instrument(level = tracing::Level::TRACE, skip(self), ret, fields(table_name=%self.table_name))] fn create_chain(&self, name: &str) -> Result<()> { self.tables .new_chain(self.table_name, name) @@ -220,6 +232,7 @@ where ipt: IPT, flush_connections: bool, pod_ips: Option<&str>, + ipv6: bool, ) -> Result { let ipt = Arc::new(ipt); @@ -231,6 +244,7 @@ where _ => Redirects::Mesh(MeshRedirect::create(ipt.clone(), vendor, pod_ips)?), } } else { + tracing::trace!(ipv6 = ipv6, "creating standard redirect"); match StandardRedirect::create(ipt.clone(), pod_ips) { Err(err) => { warn!("Unable to create StandardRedirect chain: {err}"); @@ -280,7 +294,7 @@ where /// Adds the redirect rule to iptables. /// /// Used to redirect packets when mirrord incoming feature is set to `steal`. - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument(level = tracing::Level::DEBUG, skip(self))] pub(super) async fn add_redirect( &self, redirected_port: Port, @@ -408,7 +422,7 @@ mod tests { .times(1) .returning(|_| Ok(())); - let ipt = SafeIpTables::create(mock, false, None) + let ipt = SafeIpTables::create(mock, false, None, false) .await .expect("Create Failed"); @@ -541,7 +555,7 @@ mod tests { .times(1) .returning(|_| Ok(())); - let ipt = SafeIpTables::create(mock, false, None) + let ipt = SafeIpTables::create(mock, false, None, false) .await .expect("Create Failed"); diff --git a/mirrord/agent/src/steal/ip_tables/output.rs b/mirrord/agent/src/steal/ip_tables/output.rs index 944bc26f95b..2286469c00c 100644 --- a/mirrord/agent/src/steal/ip_tables/output.rs +++ b/mirrord/agent/src/steal/ip_tables/output.rs @@ -20,8 +20,11 @@ where { const ENTRYPOINT: &'static str = "OUTPUT"; + #[tracing::instrument(skip(ipt), level = tracing::Level::TRACE)] pub fn create(ipt: Arc, chain_name: String, pod_ips: Option<&str>) -> Result { - let managed = IPTableChain::create(ipt, chain_name)?; + let managed = IPTableChain::create(ipt, chain_name.clone()).inspect_err( + |e| tracing::error!(%e, "Could not create iptables chain \"{chain_name}\"."), + )?; let exclude_source_ips = pod_ips .map(|pod_ips| format!("! -s {pod_ips}")) diff --git a/mirrord/agent/src/steal/subscriptions.rs b/mirrord/agent/src/steal/subscriptions.rs index 0ff0e1fa8ea..0468719bc9c 100644 --- a/mirrord/agent/src/steal/subscriptions.rs +++ b/mirrord/agent/src/steal/subscriptions.rs @@ -1,16 +1,20 @@ use std::{ collections::{hash_map::Entry, HashMap}, - net::{Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + ops::Not, sync::Arc, }; use dashmap::{mapref::entry::Entry as DashMapEntry, DashMap}; use mirrord_protocol::{Port, RemoteResult, ResponseError}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{ + net::{TcpListener, TcpStream}, + select, +}; use super::{ http::HttpFilter, - ip_tables::{new_iptables, IPTablesWrapper, SafeIpTables}, + ip_tables::{new_ip6tables, new_iptables, IPTablesWrapper, SafeIpTables}, }; use crate::{error::AgentError, util::ClientId}; @@ -47,19 +51,82 @@ pub trait PortRedirector { async fn next_connection(&mut self) -> Result<(TcpStream, SocketAddr), Self::Error>; } -/// Implementation of [`PortRedirector`] that manipulates iptables to steal connections by -/// redirecting TCP packets to inner [`TcpListener`]. -pub(crate) struct IpTablesRedirector { +/// A TCP listener, together with an iptables wrapper to set rules that send traffic to the +/// listener. +pub(crate) struct IptablesListener { /// For altering iptables rules. iptables: Option>, - /// Whether exisiting connections should be flushed when adding new redirects. - flush_connections: bool, - /// Port of [`IpTablesRedirector::listener`]. + /// Port of [`listener`](Self::listener). redirect_to: Port, /// Listener to which redirect all connections. listener: TcpListener, - + /// Optional comma-seperated list of IPs of the pod, originating in the pod's `Status.PodIps` pod_ips: Option, + /// Whether existing connections should be flushed when adding new redirects. + flush_connections: bool, + /// Is this for connections incoming over IPv6 + ipv6: bool, +} + +#[async_trait::async_trait] +impl PortRedirector for IptablesListener { + type Error = AgentError; + + #[tracing::instrument(skip(self), err, ret, level=tracing::Level::DEBUG, fields(self.ipv6 = %self.ipv6))] + async fn add_redirection(&mut self, from: Port) -> Result<(), Self::Error> { + let iptables = if let Some(iptables) = self.iptables.as_ref() { + iptables + } else { + let safe = crate::steal::ip_tables::SafeIpTables::create( + if self.ipv6 { + new_ip6tables() + } else { + new_iptables() + } + .into(), + self.flush_connections, + self.pod_ips.as_deref(), + self.ipv6, + ) + .await?; + self.iptables.insert(safe) + }; + iptables.add_redirect(from, self.redirect_to).await + } + + async fn remove_redirection(&mut self, from: Port) -> Result<(), Self::Error> { + if let Some(iptables) = self.iptables.as_ref() { + iptables.remove_redirect(from, self.redirect_to).await?; + } + + Ok(()) + } + + async fn cleanup(&mut self) -> Result<(), Self::Error> { + if let Some(iptables) = self.iptables.take() { + iptables.cleanup().await?; + } + + Ok(()) + } + + async fn next_connection(&mut self) -> Result<(TcpStream, SocketAddr), Self::Error> { + self.listener.accept().await.map_err(Into::into) + } +} + +/// Implementation of [`PortRedirector`] that manipulates iptables to steal connections by +/// redirecting TCP packets to inner [`TcpListener`]. +/// +/// Holds TCP listeners + iptables, for redirecting IPv4 and/or IPv6 connections. +pub(crate) enum IpTablesRedirector { + Ipv4Only(IptablesListener), + /// Could be used if IPv6 support is enabled, and we cannot bind an IPv4 address. + Ipv6Only(IptablesListener), + Dual { + ipv4_listener: IptablesListener, + ipv6_listener: IptablesListener, + }, } impl IpTablesRedirector { @@ -67,28 +134,116 @@ impl IpTablesRedirector { /// [`Ipv4Addr::UNSPECIFIED`] address and a random port. This listener will be used to accept /// redirected connections. /// + /// If `support_ipv6` is set, will also listen on IPv6, and a fail to listen over IPv4 will be + /// accepted. + /// /// # Note /// /// Does not yet alter iptables. /// /// # Params /// - /// * `flush_connections` - whether exisitng connections should be flushed when adding new + /// * `flush_connections` - whether existing connections should be flushed when adding new /// redirects pub(crate) async fn new( flush_connections: bool, pod_ips: Option, + support_ipv6: bool, ) -> Result { - let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).await?; - let redirect_to = listener.local_addr()?.port(); - - Ok(Self { - iptables: None, - flush_connections, - redirect_to, - listener, - pod_ips, - }) + let (pod_ips4, pod_ips6) = pod_ips.map_or_else( + || (None, None), + |ips| { + // TODO: probably nicer to split at the client and avoid the conversion to and back + // from a string. + let (ip4s, ip6s): (Vec<_>, Vec<_>) = ips.split(',').partition(|ip_str| { + ip_str + .parse::() + .inspect_err(|e| tracing::warn!(%e, "failed to parse pod IP {ip_str}")) + .as_ref() + .map(IpAddr::is_ipv4) + .unwrap_or_default() + }); + // Convert to options, `None` if vector is empty. + ( + ip4s.is_empty().not().then(|| ip4s.join(",")), + ip6s.is_empty().not().then(|| ip6s.join(",")), + ) + }, + ); + tracing::debug!("pod IPv4 addresses: {pod_ips4:?}, pod IPv6 addresses: {pod_ips6:?}"); + + tracing::debug!("Creating IPv4 iptables redirection listener"); + let listener4 = TcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).await + .inspect_err( + |err| tracing::debug!(%err, "Could not bind IPv4, continuing with IPv6 only."), + ) + .ok() + .and_then(|listener| { + let redirect_to = listener + .local_addr() + .inspect_err( + |err| tracing::debug!(%err, "Get IPv4 listener address, continuing with IPv6 only."), + ) + .ok()? + .port(); + Some(IptablesListener { + iptables: None, + redirect_to, + listener, + pod_ips: pod_ips4, + flush_connections, + ipv6: false, + }) + }); + tracing::debug!("Creating IPv6 iptables redirection listener"); + let listener6 = if support_ipv6 { + TcpListener::bind((Ipv6Addr::UNSPECIFIED, 0)).await + .inspect_err( + |err| tracing::debug!(%err, "Could not bind IPv6, continuing with IPv4 only."), + ) + .ok() + .and_then(|listener| { + let redirect_to = listener + .local_addr() + .inspect_err( + |err| tracing::debug!(%err, "Get IPv6 listener address, continuing with IPv4 only."), + ) + .ok()? + .port(); + Some(IptablesListener { + iptables: None, + redirect_to, + listener, + pod_ips: pod_ips6, + flush_connections, + ipv6: true, + }) + }) + } else { + None + }; + match (listener4, listener6) { + (None, None) => Err(AgentError::CannotListenForStolenConnections), + (Some(ipv4_listener), None) => Ok(Self::Ipv4Only(ipv4_listener)), + (None, Some(ipv6_listener)) => Ok(Self::Ipv6Only(ipv6_listener)), + (Some(ipv4_listener), Some(ipv6_listener)) => Ok(Self::Dual { + ipv4_listener, + ipv6_listener, + }), + } + } + + pub(crate) fn get_listeners_mut( + &mut self, + ) -> (Option<&mut IptablesListener>, Option<&mut IptablesListener>) { + match self { + IpTablesRedirector::Ipv4Only(ipv4_listener) => (Some(ipv4_listener), None), + IpTablesRedirector::Ipv6Only(ipv6_listener) => (None, Some(ipv6_listener)), + IpTablesRedirector::Dual { + ipv4_listener, + ipv6_listener, + } => (Some(ipv4_listener), Some(ipv6_listener)), + } } } @@ -97,41 +252,53 @@ impl PortRedirector for IpTablesRedirector { type Error = AgentError; async fn add_redirection(&mut self, from: Port) -> Result<(), Self::Error> { - let iptables = match self.iptables.as_ref() { - Some(iptables) => iptables, - None => { - let iptables = new_iptables(); - let safe = SafeIpTables::create( - iptables.into(), - self.flush_connections, - self.pod_ips.as_deref(), - ) - .await?; - self.iptables.insert(safe) - } - }; - - iptables.add_redirect(from, self.redirect_to).await + let (ipv4_listener, ipv6_listener) = self.get_listeners_mut(); + if let Some(ip4_listener) = ipv4_listener { + tracing::debug!("Adding IPv4 redirection from port {from}"); + ip4_listener.add_redirection(from).await?; + } + if let Some(ip6_listener) = ipv6_listener { + tracing::debug!("Adding IPv6 redirection from port {from}"); + ip6_listener.add_redirection(from).await?; + } + Ok(()) } async fn remove_redirection(&mut self, from: Port) -> Result<(), Self::Error> { - if let Some(iptables) = self.iptables.as_ref() { - iptables.remove_redirect(from, self.redirect_to).await?; + let (ipv4_listener, ipv6_listener) = self.get_listeners_mut(); + if let Some(ip4_listener) = ipv4_listener { + ip4_listener.remove_redirection(from).await?; + } + if let Some(ip6_listener) = ipv6_listener { + ip6_listener.remove_redirection(from).await?; } - Ok(()) } async fn cleanup(&mut self) -> Result<(), Self::Error> { - if let Some(iptables) = self.iptables.take() { - iptables.cleanup().await?; + let (ipv4_listener, ipv6_listener) = self.get_listeners_mut(); + if let Some(ip4_listener) = ipv4_listener { + ip4_listener.cleanup().await?; + } + if let Some(ip6_listener) = ipv6_listener { + ip6_listener.cleanup().await?; } - Ok(()) } async fn next_connection(&mut self) -> Result<(TcpStream, SocketAddr), Self::Error> { - self.listener.accept().await.map_err(Into::into) + match self { + Self::Dual { + ipv4_listener, + ipv6_listener, + } => { + select! { + con = ipv4_listener.next_connection() => con, + con = ipv6_listener.next_connection() => con, + } + } + Self::Ipv4Only(listener) | Self::Ipv6Only(listener) => listener.next_connection().await, + } } } diff --git a/mirrord/config/configuration.md b/mirrord/config/configuration.md index 93dab79c3c0..3a3f8b4fa57 100644 --- a/mirrord/config/configuration.md +++ b/mirrord/config/configuration.md @@ -1266,6 +1266,10 @@ List of ports to mirror/steal traffic from. Other ports will remain local. Mutually exclusive with [`feature.network.incoming.ignore_ports`](#feature-network-ignore_ports). +### feature.network.ipv6 {#feature-network-dns} + +Enable ipv6 support. Turn on if your application listens to incoming traffic over IPv6. + ### feature.network.outgoing {#feature-network-outgoing} Tunnel outgoing network operations through mirrord. diff --git a/mirrord/config/src/config/from_env.rs b/mirrord/config/src/config/from_env.rs index 9770456721a..0f87ef59034 100644 --- a/mirrord/config/src/config/from_env.rs +++ b/mirrord/config/src/config/from_env.rs @@ -20,6 +20,10 @@ where { type Value = T; + /// Returns: + /// - `None` if there is no env var with that name. + /// - `Some(Err(ConfigError::InvalidValue{...}))` if the value of the env var cannot be parsed. + /// - `Some(Ok(...))` if the env var exists and was parsed successfully. fn source_value(self, _context: &mut ConfigContext) -> Option> { std::env::var(self.0).ok().map(|var| { var.parse::() diff --git a/mirrord/config/src/feature/network.rs b/mirrord/config/src/feature/network.rs index 2f2b7901aee..227bd82a915 100644 --- a/mirrord/config/src/feature/network.rs +++ b/mirrord/config/src/feature/network.rs @@ -6,10 +6,12 @@ use serde::Serialize; use self::{incoming::*, outgoing::*}; use crate::{ - config::{ConfigContext, ConfigError}, + config::{from_env::FromEnv, source::MirrordConfigSource, ConfigContext, ConfigError}, util::MirrordToggleableConfig, }; +const IPV6_ENV_VAR: &str = "MIRRORD_INCOMING_ENABLE_IPV6"; + pub mod dns; pub mod filter; pub mod incoming; @@ -67,14 +69,26 @@ pub struct NetworkConfig { /// ### feature.network.dns {#feature-network-dns} #[config(toggleable, nested)] pub dns: DnsConfig, + + /// ### feature.network.ipv6 {#feature-network-dns} + /// + /// Enable ipv6 support. Turn on if your application listens to incoming traffic over IPv6. + #[config(env = IPV6_ENV_VAR, default = false)] + pub ipv6: bool, } impl MirrordToggleableConfig for NetworkFileConfig { fn disabled_config(context: &mut ConfigContext) -> Result { + let ipv6 = FromEnv::new(IPV6_ENV_VAR) + .source_value(context) + .transpose()? + .unwrap_or_default(); + Ok(NetworkConfig { incoming: IncomingFileConfig::disabled_config(context)?, dns: DnsFileConfig::disabled_config(context)?, outgoing: OutgoingFileConfig::disabled_config(context)?, + ipv6, }) } } @@ -84,6 +98,7 @@ impl CollectAnalytics for &NetworkConfig { analytics.add("incoming", &self.incoming); analytics.add("outgoing", &self.outgoing); analytics.add("dns", &self.dns); + analytics.add("ipv6", self.ipv6); } } diff --git a/mirrord/config/src/feature/network/incoming.rs b/mirrord/config/src/feature/network/incoming.rs index d56199e003e..fddd46260d5 100644 --- a/mirrord/config/src/feature/network/incoming.rs +++ b/mirrord/config/src/feature/network/incoming.rs @@ -58,7 +58,7 @@ use http_filter::*; /// }, /// "port_mapping": [[ 7777, 8888 ]], /// "ignore_localhost": false, -/// "ignore_ports": [9999, 10000] +/// "ignore_ports": [9999, 10000], /// "listen_ports": [[80, 8111]] /// } /// } @@ -96,9 +96,7 @@ impl MirrordConfig for IncomingFileConfig { .unwrap_or_default(), http_filter: HttpFilterFileConfig::default().generate_config(context)?, on_concurrent_steal: FromEnv::new("MIRRORD_OPERATOR_ON_CONCURRENT_STEAL") - .layer(|layer| { - Unstable::new("IncomingFileConfig", "on_concurrent_steal", layer) - }) + .layer(|layer| Unstable::new("incoming", "on_concurrent_steal", layer)) .source_value(context) .transpose()? .unwrap_or_default(), @@ -129,9 +127,7 @@ impl MirrordConfig for IncomingFileConfig { .unwrap_or_default(), on_concurrent_steal: FromEnv::new("MIRRORD_OPERATOR_ON_CONCURRENT_STEAL") .or(advanced.on_concurrent_steal) - .layer(|layer| { - Unstable::new("IncomingFileConfig", "on_concurrent_steal", layer) - }) + .layer(|layer| Unstable::new("incoming", "on_concurrent_steal", layer)) .source_value(context) .transpose()? .unwrap_or_default(), @@ -149,7 +145,7 @@ impl MirrordToggleableConfig for IncomingFileConfig { .unwrap_or_else(|| Ok(IncomingMode::Off))?; let on_concurrent_steal = FromEnv::new("MIRRORD_OPERATOR_ON_CONCURRENT_STEAL") - .layer(|layer| Unstable::new("IncomingFileConfig", "on_concurrent_steal", layer)) + .layer(|layer| Unstable::new("incoming", "on_concurrent_steal", layer)) .source_value(context) .transpose()? .unwrap_or_default(); diff --git a/mirrord/config/src/lib.rs b/mirrord/config/src/lib.rs index 8ffd1d48c29..055adac98d2 100644 --- a/mirrord/config/src/lib.rs +++ b/mirrord/config/src/lib.rs @@ -878,6 +878,7 @@ mod tests { udp: Some(false), ..Default::default() })), + ipv6: None, })), copy_target: None, hostname: None, diff --git a/mirrord/kube/src/api/container.rs b/mirrord/kube/src/api/container.rs index b87a088a412..a651dc13458 100644 --- a/mirrord/kube/src/api/container.rs +++ b/mirrord/kube/src/api/container.rs @@ -44,10 +44,16 @@ pub struct ContainerParams { /// the agent container. pub tls_cert: Option, pub pod_ips: Option, + /// Support IPv6-only clusters + pub support_ipv6: bool, } impl ContainerParams { - pub fn new(tls_cert: Option, pod_ips: Option) -> ContainerParams { + pub fn new( + tls_cert: Option, + pod_ips: Option, + support_ipv6: bool, + ) -> ContainerParams { let port: u16 = rand::thread_rng().gen_range(30000..=65535); let gid: u16 = rand::thread_rng().gen_range(3000..u16::MAX); @@ -64,6 +70,7 @@ impl ContainerParams { port, tls_cert, pod_ips, + support_ipv6, } } } diff --git a/mirrord/kube/src/api/container/job.rs b/mirrord/kube/src/api/container/job.rs index 907aefffeeb..d9958e6620b 100644 --- a/mirrord/kube/src/api/container/job.rs +++ b/mirrord/kube/src/api/container/job.rs @@ -241,12 +241,14 @@ mod test { fn targetless() -> Result<(), Box> { let mut config_context = ConfigContext::default(); let agent = AgentFileConfig::default().generate_config(&mut config_context)?; + let support_ipv6 = false; let params = ContainerParams { name: "foobar".to_string(), port: 3000, gid: 13, tls_cert: None, pod_ips: None, + support_ipv6, }; let update = JobVariant::new(&agent, ¶ms).as_update(); @@ -298,7 +300,8 @@ mod test { { "name": "RUST_LOG", "value": agent.log_level }, { "name": "MIRRORD_AGENT_STEALER_FLUSH_CONNECTIONS", "value": agent.flush_connections.to_string() }, { "name": "MIRRORD_AGENT_NFTABLES", "value": agent.nftables.to_string() }, - { "name": "MIRRORD_AGENT_JSON_LOG", "value": Some(agent.json_log.to_string()) } + { "name": "MIRRORD_AGENT_JSON_LOG", "value": Some(agent.json_log.to_string()) }, + { "name": "MIRRORD_AGENT_SUPPORT_IPV6", "value": Some(support_ipv6.to_string()) } ], "resources": // Add requests to avoid getting defaulted https://github.com/metalbear-co/mirrord/issues/579 @@ -330,12 +333,14 @@ mod test { fn targeted() -> Result<(), Box> { let mut config_context = ConfigContext::default(); let agent = AgentFileConfig::default().generate_config(&mut config_context)?; + let support_ipv6 = false; let params = ContainerParams { name: "foobar".to_string(), port: 3000, gid: 13, tls_cert: None, pod_ips: None, + support_ipv6, }; let update = JobTargetedVariant::new( @@ -432,7 +437,8 @@ mod test { { "name": "RUST_LOG", "value": agent.log_level }, { "name": "MIRRORD_AGENT_STEALER_FLUSH_CONNECTIONS", "value": agent.flush_connections.to_string() }, { "name": "MIRRORD_AGENT_NFTABLES", "value": agent.nftables.to_string() }, - { "name": "MIRRORD_AGENT_JSON_LOG", "value": Some(agent.json_log.to_string()) } + { "name": "MIRRORD_AGENT_JSON_LOG", "value": Some(agent.json_log.to_string()) }, + { "name": "MIRRORD_AGENT_SUPPORT_IPV6", "value": Some(support_ipv6.to_string()) } ], "resources": // Add requests to avoid getting defaulted https://github.com/metalbear-co/mirrord/issues/579 { diff --git a/mirrord/kube/src/api/container/util.rs b/mirrord/kube/src/api/container/util.rs index 77f917378ce..23fd752181b 100644 --- a/mirrord/kube/src/api/container/util.rs +++ b/mirrord/kube/src/api/container/util.rs @@ -4,7 +4,7 @@ use futures::{AsyncBufReadExt, TryStreamExt}; use k8s_openapi::api::core::v1::{EnvVar, Pod, Toleration}; use kube::{api::LogParams, Api}; use mirrord_config::agent::{AgentConfig, LinuxCapability}; -use mirrord_protocol::{AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV}; +use mirrord_protocol::{AGENT_IPV6_ENV, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV}; use regex::Regex; use tracing::warn; @@ -59,6 +59,7 @@ pub(super) fn agent_env(agent: &AgentConfig, params: &&ContainerParams) -> Vec, + support_ipv6: bool, ) -> Result<(ContainerParams, Option), KubeApiError> { let runtime_data = match target.path.as_ref().unwrap_or(&Target::Targetless) { Target::Targetless => None, @@ -187,7 +188,7 @@ impl KubernetesAPI { .join(",") }); - let params = ContainerParams::new(tls_cert, pod_ips); + let params = ContainerParams::new(tls_cert, pod_ips, support_ipv6); Ok((params, runtime_data)) } @@ -209,7 +210,12 @@ impl KubernetesAPI { where P: Progress + Send + Sync, { - let (params, runtime_data) = self.create_agent_params(target, tls_cert).await?; + let support_ipv6 = config + .map(|layer_conf| layer_conf.feature.network.ipv6) + .unwrap_or_default(); + let (params, runtime_data) = self + .create_agent_params(target, tls_cert, support_ipv6) + .await?; if let Some(RuntimeData { guessed_container: true, container_name, diff --git a/mirrord/kube/src/api/runtime.rs b/mirrord/kube/src/api/runtime.rs index a77afd1b5b5..a431a3b4c03 100644 --- a/mirrord/kube/src/api/runtime.rs +++ b/mirrord/kube/src/api/runtime.rs @@ -3,7 +3,7 @@ use std::{ collections::BTreeMap, convert::Infallible, fmt::{self, Display, Formatter}, - net::Ipv4Addr, + net::IpAddr, ops::FromResidual, str::FromStr, }; @@ -71,7 +71,7 @@ impl Display for ContainerRuntime { #[derive(Debug)] pub struct RuntimeData { pub pod_name: String, - pub pod_ips: Vec, + pub pod_ips: Vec, pub pod_namespace: Option, pub node_name: String, pub container_id: String, @@ -128,9 +128,9 @@ impl RuntimeData { .filter_map(|pod_ip| { pod_ip .ip - .parse::() + .parse::() .inspect_err(|e| { - tracing::warn!("failed to parse pod IP {ip}: {e:?}", ip = pod_ip.ip); + tracing::warn!("failed to parse pod IP {ip}: {e:#?}", ip = pod_ip.ip); }) .ok() }) diff --git a/mirrord/layer/src/socket/ops.rs b/mirrord/layer/src/socket/ops.rs index 543a512629c..efc0095c8a4 100644 --- a/mirrord/layer/src/socket/ops.rs +++ b/mirrord/layer/src/socket/ops.rs @@ -4,6 +4,7 @@ use std::{ collections::HashMap, io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream}, + ops::Not, os::{ fd::{BorrowedFd, FromRawFd, IntoRawFd}, unix::io::RawFd, @@ -129,7 +130,7 @@ pub(super) fn socket(domain: c_int, type_: c_int, protocol: c_int) -> Detour, command: Vec<&str>) { let service = service.await; - let mut process = run_exec_with_target(command, &service.target, None, None, None).await; + let mut process = + run_exec_with_target(command, &service.pod_container_target(), None, None, None).await; let res = process.wait().await; assert!(res.success()); } @@ -458,8 +485,14 @@ mod traffic_tests { pub async fn listen_localhost(#[future] service: KubeService) { let service = service.await; let node_command = vec!["node", "node-e2e/listen/test_listen_localhost.mjs"]; - let mut process = - run_exec_with_target(node_command, &service.target, None, None, None).await; + let mut process = run_exec_with_target( + node_command, + &service.pod_container_target(), + None, + None, + None, + ) + .await; let res = process.wait().await; assert!(res.success()); } @@ -471,8 +504,14 @@ mod traffic_tests { pub async fn gethostname_remote_result(#[future] hostname_service: KubeService) { let service = hostname_service.await; let node_command = vec!["python3", "-u", "python-e2e/hostname.py"]; - let mut process = - run_exec_with_target(node_command, &service.target, None, None, None).await; + let mut process = run_exec_with_target( + node_command, + &service.pod_container_target(), + None, + None, + None, + ) + .await; let res = process.wait().await; assert!(res.success()); @@ -511,7 +550,9 @@ mod traffic_tests { "MIRRORD_OUTGOING_REMOTE_UNIX_STREAMS", "/app/unix-socket-server.sock", )]); - let mut process = run_exec_with_target(executable, &service.target, None, None, env).await; + let mut process = + run_exec_with_target(executable, &service.pod_container_target(), None, None, env) + .await; let res = process.wait().await; // The test application panics if it does not successfully connect to the socket, send data, @@ -534,7 +575,14 @@ mod traffic_tests { .to_string(); let executable = vec![app_path.as_str()]; - let mut process = run_exec_with_target(executable, &service.target, None, None, None).await; + let mut process = run_exec_with_target( + executable, + &service.pod_container_target(), + None, + None, + None, + ) + .await; let res = process.wait().await; // The test application panics if it does not successfully connect to the socket, send data, @@ -551,8 +599,14 @@ mod traffic_tests { "node", "node-e2e/outgoing/test_outgoing_traffic_many_requests.mjs", ]; - let mut process = - run_exec_with_target(node_command, &service.target, None, None, None).await; + let mut process = run_exec_with_target( + node_command, + &service.pod_container_target(), + None, + None, + None, + ) + .await; let res = process.child.wait().await.unwrap(); assert!(res.success()); @@ -571,7 +625,7 @@ mod traffic_tests { let mirrord_args = vec!["--no-outgoing"]; let mut process = run_exec_with_target( node_command, - &service.target, + &service.pod_container_target(), None, Some(mirrord_args), None, diff --git a/tests/src/traffic/steal.rs b/tests/src/traffic/steal.rs index 518aa0bc13e..31b5f668a2b 100644 --- a/tests/src/traffic/steal.rs +++ b/tests/src/traffic/steal.rs @@ -9,7 +9,10 @@ mod steal_tests { }; use futures_util::{SinkExt, StreamExt}; - use kube::Client; + use hyper::{body, client::conn, Request, StatusCode}; + use hyper_util::rt::TokioIo; + use k8s_openapi::api::core::v1::Pod; + use kube::{Api, Client}; use reqwest::{header::HeaderMap, Url}; use rstest::*; use tokio::time::sleep; @@ -19,13 +22,13 @@ mod steal_tests { }; use crate::utils::{ - config_dir, get_service_host_and_port, get_service_url, http2_service, kube_client, - send_request, send_requests, service, tcp_echo_service, websocket_service, Application, - KubeService, + config_dir, get_service_host_and_port, get_service_url, http2_service, + ipv6::{ipv6_service, portforward_http_requests}, + kube_client, send_request, send_requests, service, tcp_echo_service, websocket_service, + Application, KubeService, }; #[cfg_attr(not(any(feature = "ephemeral", feature = "job")), ignore)] - #[cfg(target_os = "linux")] #[rstest] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[timeout(Duration::from_secs(240))] @@ -49,7 +52,12 @@ mod steal_tests { } let mut process = application - .run(&service.target, Some(&service.namespace), Some(flags), None) + .run( + &service.pod_container_target(), + Some(&service.namespace), + Some(flags), + None, + ) .await; process @@ -63,6 +71,47 @@ mod steal_tests { application.assert(&process).await; } + #[ignore] // Needs special cluster setup, so ignore by default. + #[rstest] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[timeout(Duration::from_secs(240))] + async fn steal_http_ipv6_traffic( + #[future] ipv6_service: KubeService, + #[future] kube_client: Client, + ) { + let application = Application::PythonFastApiHTTPIPv6; + let service = ipv6_service.await; + let kube_client = kube_client.await; + + let mut flags = vec!["--steal"]; + + if cfg!(feature = "ephemeral") { + flags.extend(["-e"].into_iter()); + } + + let mut process = application + .run( + &service.pod_container_target(), + Some(&service.namespace), + Some(flags), + Some(vec![("MIRRORD_INCOMING_ENABLE_IPV6", "true")]), + ) + .await; + + process + .wait_for_line(Duration::from_secs(40), "daemon subscribed") + .await; + + let api = Api::::namespaced(kube_client.clone(), &service.namespace); + portforward_http_requests(&api, service).await; + + tokio::time::timeout(Duration::from_secs(40), process.wait()) + .await + .unwrap(); + + application.assert(&process).await; + } + #[cfg_attr(not(any(feature = "ephemeral", feature = "job")), ignore)] #[cfg(target_os = "linux")] #[rstest] @@ -89,7 +138,7 @@ mod steal_tests { let mut process = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), Some(flags), Some(vec![("MIRRORD_AGENT_STEALER_FLUSH_CONNECTIONS", "true")]), @@ -125,7 +174,12 @@ mod steal_tests { } let mut process = application - .run(&service.target, Some(&service.namespace), Some(flags), None) + .run( + &service.pod_container_target(), + Some(&service.namespace), + Some(flags), + None, + ) .await; // Verify that we hooked the socket operations and the agent started stealing. @@ -208,7 +262,7 @@ mod steal_tests { let mut process = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), Some(flags), Some(vec![ @@ -288,7 +342,7 @@ mod steal_tests { let mut client = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), Some(flags), Some(vec![("MIRRORD_HTTP_HEADER_FILTER", "x-filter: yes")]), @@ -329,7 +383,7 @@ mod steal_tests { let mut client = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), None, Some(vec![("MIRRORD_CONFIG_FILE", config_path.to_str().unwrap())]), @@ -370,7 +424,7 @@ mod steal_tests { let mut client = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), None, Some(vec![("MIRRORD_CONFIG_FILE", config_path.to_str().unwrap())]), @@ -423,7 +477,7 @@ mod steal_tests { let mut mirrored_process = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), Some(flags), Some(vec![("MIRRORD_HTTP_HEADER_FILTER", "x-filter: yes")]), @@ -494,7 +548,7 @@ mod steal_tests { let mut mirrorded_process = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), Some(flags), Some(vec![("MIRRORD_HTTP_HEADER_FILTER", "x-filter: yes")]), @@ -559,7 +613,7 @@ mod steal_tests { let mut mirrorded_process = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), Some(flags), Some(vec![("MIRRORD_HTTP_HEADER_FILTER", "x-filter: yes")]), @@ -629,7 +683,7 @@ mod steal_tests { let mut mirrorded_process = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), Some(flags), Some(vec![("MIRRORD_HTTP_HEADER_FILTER", "x-filter: yes")]), @@ -708,7 +762,7 @@ mod steal_tests { let mut mirrorded_process = application .run( - &service.target, + &service.pod_container_target(), Some(&service.namespace), Some(vec!["--steal"]), Some(vec![("MIRRORD_HTTP_HEADER_FILTER", "x-filter: yes")]), diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 538a4d753e6..3b596efa357 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -4,7 +4,7 @@ use std::{ collections::HashMap, fmt::Debug, - net::Ipv4Addr, + net::IpAddr, ops::Not, path::PathBuf, process::{ExitStatus, Stdio}, @@ -39,6 +39,7 @@ use tokio::{ task::JoinHandle, }; +pub(crate) mod ipv6; pub mod sqs_resources; const TEXT: &str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."; @@ -90,6 +91,7 @@ fn format_time() -> String { pub enum Application { PythonFlaskHTTP, PythonFastApiHTTP, + PythonFastApiHTTPIPv6, NodeHTTP, NodeHTTP2, Go21HTTP, @@ -399,6 +401,15 @@ impl Application { "app_fastapi:app", ] } + Application::PythonFastApiHTTPIPv6 => { + vec![ + "uvicorn", + "--port=80", + "--host=::", + "--app-dir=./python-e2e/", + "app_fastapi:app", + ] + } Application::PythonCloseSocket => { vec!["python3", "-u", "python-e2e/close_socket.py"] } @@ -447,7 +458,7 @@ impl Application { } pub async fn assert(&self, process: &TestProcess) { - if let Application::PythonFastApiHTTP = self { + if matches!(self, Self::PythonFastApiHTTP | Self::PythonFastApiHTTPIPv6) { process.assert_log_level(true, "ERROR").await; process.assert_log_level(false, "ERROR").await; process.assert_log_level(true, "CRITICAL").await; @@ -728,15 +739,19 @@ impl Drop for ResourceGuard { pub struct KubeService { pub name: String, pub namespace: String, - pub target: String, guards: Vec, namespace_guard: Option, + pub pod_name: String, } impl KubeService { pub fn deployment_target(&self) -> String { format!("deployment/{}", self.name) } + + pub fn pod_container_target(&self) -> String { + format!("pod/{}/container/{CONTAINER_NAME}", self.pod_name) + } } impl Drop for KubeService { @@ -817,6 +832,17 @@ fn deployment_from_json(name: &str, image: &str, env: Value) -> Deployment { .expect("Failed creating `deployment` from json spec!") } +/// Change the `ipFamilies` and `ipFamilyPolicy` fields to make the service IPv6-only. +/// +/// # Panics +/// +/// Will panic if the given service does not have a spec. +fn set_ipv6_only(service: &mut Service) { + let spec = service.spec.as_mut().unwrap(); + spec.ip_families = Some(vec!["IPv6".to_string()]); + spec.ip_family_policy = Some("SingleStack".to_string()); +} + fn service_from_json(name: &str, service_type: &str) -> Service { serde_json::from_value(json!({ "apiVersion": "v1", @@ -1065,6 +1091,7 @@ pub async fn service( randomize_name, kube_client.await, default_env(), + false, ) .await } @@ -1093,6 +1120,7 @@ pub async fn service_with_env( randomize_name, kube_client, env, + false, ) .await } @@ -1108,6 +1136,7 @@ pub async fn service_with_env( /// This behavior can be changed, see [`PRESERVE_FAILED_ENV_NAME`]. /// * `randomize_name` - whether a random suffix should be added to the end of the resource names /// * `env` - `Value`, should be `Value::Array` of kubernetes container env var definitions. +#[allow(clippy::too_many_arguments)] async fn internal_service( namespace: &str, service_type: &str, @@ -1116,6 +1145,7 @@ async fn internal_service( randomize_name: bool, kube_client: Client, env: Value, + ipv6_only: bool, ) -> KubeService { let delete_after_fail = std::env::var_os(PRESERVE_FAILED_ENV_NAME).is_none(); @@ -1182,7 +1212,10 @@ async fn internal_service( watch_resource_exists(&deployment_api, &name).await; // `Service` - let service = service_from_json(&name, service_type); + let mut service = service_from_json(&name, service_type); + if ipv6_only { + set_ipv6_only(&mut service); + } let service_guard = ResourceGuard::create( service_api.clone(), name.clone(), @@ -1193,13 +1226,13 @@ async fn internal_service( .unwrap(); watch_resource_exists(&service_api, "default").await; - let target = get_instance_name::(kube_client.clone(), &name, namespace) + let pod_name = get_instance_name::(kube_client.clone(), &name, namespace) .await .unwrap(); let pod_api: Api = Api::namespaced(kube_client.clone(), namespace); - await_condition(pod_api, &target, is_pod_running()) + await_condition(pod_api, &pod_name, is_pod_running()) .await .unwrap(); @@ -1211,7 +1244,7 @@ async fn internal_service( KubeService { name, namespace: namespace.to_string(), - target: format!("pod/{target}/container/{CONTAINER_NAME}"), + pod_name, guards: vec![pod_guard, service_guard], namespace_guard, } @@ -1304,13 +1337,13 @@ pub async fn service_for_mirrord_ls( .unwrap(); watch_resource_exists(&service_api, "default").await; - let target = get_instance_name::(kube_client.clone(), &name, namespace) + let pod_name = get_instance_name::(kube_client.clone(), &name, namespace) .await .unwrap(); let pod_api: Api = Api::namespaced(kube_client.clone(), namespace); - await_condition(pod_api, &target, is_pod_running()) + await_condition(pod_api, &pod_name, is_pod_running()) .await .unwrap(); @@ -1322,7 +1355,7 @@ pub async fn service_for_mirrord_ls( KubeService { name, namespace: namespace.to_string(), - target: format!("pod/{target}/container/{CONTAINER_NAME}"), + pod_name, guards: vec![pod_guard, service_guard], namespace_guard, } @@ -1458,13 +1491,13 @@ pub async fn service_for_mirrord_ls( .unwrap(); watch_resource_exists(&job_api, &name).await; - let target = get_instance_name::(kube_client.clone(), &name, namespace) + let pod_name = get_instance_name::(kube_client.clone(), &name, namespace) .await .unwrap(); let pod_api: Api = Api::namespaced(kube_client.clone(), namespace); - await_condition(pod_api, &target, is_pod_running()) + await_condition(pod_api, &pod_name, is_pod_running()) .await .unwrap(); @@ -1476,7 +1509,6 @@ pub async fn service_for_mirrord_ls( KubeService { name, namespace: namespace.to_string(), - target: format!("pod/{target}/container/{CONTAINER_NAME}"), guards: vec![ pod_guard, service_guard, @@ -1485,6 +1517,7 @@ pub async fn service_for_mirrord_ls( job_guard, ], namespace_guard, + pod_name, } } @@ -1607,12 +1640,15 @@ async fn get_pod_or_node_host(kube_client: Client, name: &str, namespace: &str) .next() .and_then(|pod| pod.status) .and_then(|status| status.host_ip) - .and_then(|ip| { - ip.parse::() - .unwrap() - .is_private() - .not() - .then_some(ip) + .filter(|ip| { + // use this IP only if it's a public one. + match ip.parse::().unwrap() { + IpAddr::V4(ip4) => ip4.is_private(), + IpAddr::V6(ip6) => { + ip6.is_unicast_link_local() || ip6.is_unique_local() || ip6.is_loopback() + } + } + .not() }) .unwrap_or_else(resolve_node_host) } diff --git a/tests/src/utils/ipv6.rs b/tests/src/utils/ipv6.rs new file mode 100644 index 00000000000..4b766e7f42d --- /dev/null +++ b/tests/src/utils/ipv6.rs @@ -0,0 +1,100 @@ +#![cfg(test)] + +use http_body_util::{BodyExt, Empty}; +use hyper::{ + client::{conn, conn::http1::SendRequest}, + Request, +}; +use k8s_openapi::api::core::v1::Pod; +use kube::{Api, Client}; +use rstest::fixture; + +use crate::utils::{internal_service, kube_client, KubeService}; + +/// Create a new [`KubeService`] and related Kubernetes resources. The resources will be deleted +/// when the returned service is dropped, unless it is dropped during panic. +/// This behavior can be changed, see +/// [`PRESERVE_FAILED_ENV_NAME`](crate::utils::PRESERVE_FAILED_ENV_NAME). +/// +/// * `randomize_name` - whether a random suffix should be added to the end of the resource names +#[fixture] +pub async fn ipv6_service( + #[default("default")] namespace: &str, + #[default("NodePort")] service_type: &str, + #[default("ghcr.io/metalbear-co/mirrord-pytest:latest")] image: &str, + #[default("http-echo")] service_name: &str, + #[default(true)] randomize_name: bool, + #[future] kube_client: Client, +) -> KubeService { + internal_service( + namespace, + service_type, + image, + service_name, + randomize_name, + kube_client.await, + serde_json::json!([ + { + "name": "HOST", + "value": "::" + } + ]), + true, + ) + .await +} + +/// Send an HTTP request using the referenced `request_sender`, with the provided `method`, +/// then verify a success status code, and a response body that is the used method. +/// +/// # Panics +/// - If the request cannot be sent. +/// - If the response's code is not OK +/// - If the response's body is not the method's name. +pub async fn send_request_with_method( + method: &str, + request_sender: &mut SendRequest>, +) { + let req = Request::builder() + .method(method) + .header("Host", "::") + .body(Empty::::new()) + .unwrap(); + + println!("Request: {:?}", req); + + let res = request_sender.send_request(req).await.unwrap(); + println!("Response: {:?}", res); + assert_eq!(res.status(), hyper::StatusCode::OK); + let bytes = res.collect().await.unwrap().to_bytes(); + let response_string = String::from_utf8(bytes.to_vec()).unwrap(); + assert_eq!(response_string, method); +} + +/// Create a portforward to the pod of the test service, and send HTTP requests over it. +/// Send four HTTP request (GET, POST, PUT, DELETE), using the referenced `request_sender`, with the +/// provided `method`, verify OK status, and a response body that is the used method. +/// +/// # Panics +/// - If a request cannot be sent. +/// - If a response's code is not OK +/// - If a response's body is not the method's name. +pub async fn portforward_http_requests(api: &Api, service: KubeService) { + let mut portforwarder = api + .portforward(&service.pod_name, &[80]) + .await + .expect("Failed to start portforward to test pod"); + + let stream = portforwarder.take_stream(80).unwrap(); + let stream = hyper_util::rt::TokioIo::new(stream); + + let (mut request_sender, connection) = conn::http1::handshake(stream).await.unwrap(); + tokio::spawn(async move { + if let Err(err) = connection.await { + eprintln!("Error in connection from test function to deployed test app {err:#?}"); + } + }); + for method in ["GET", "POST", "PUT", "DELETE"] { + send_request_with_method(method, &mut request_sender).await; + } +} From 007597cd38d523a159b5e611c2ec27f36d1b5fcf Mon Sep 17 00:00:00 2001 From: meowjesty <43983236+meowjesty@users.noreply.github.com> Date: Thu, 9 Jan 2025 18:34:22 -0300 Subject: [PATCH 4/4] Fix fs policy test. (#3010) * Fix fs policy test. * changelog * no changelog --- changelog.d/+104-policy-fs.added.md | 2 +- tests/src/operator/policies/fs.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/changelog.d/+104-policy-fs.added.md b/changelog.d/+104-policy-fs.added.md index 1ad43a13736..312cf56f3fa 100644 --- a/changelog.d/+104-policy-fs.added.md +++ b/changelog.d/+104-policy-fs.added.md @@ -1 +1 @@ -Add policy to control file ops. +Add policy to control file ops from the operator. diff --git a/tests/src/operator/policies/fs.rs b/tests/src/operator/policies/fs.rs index afa2cdf62bc..3feae1d514c 100644 --- a/tests/src/operator/policies/fs.rs +++ b/tests/src/operator/policies/fs.rs @@ -26,12 +26,12 @@ async fn fs_service(#[future] kube_client: kube::Client) -> KubeService { #[rstest] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[timeout(Duration::from_secs(60))] -pub async fn create_cluster_fs_policy_and_try_file_operations( - #[future] service: KubeService, +pub async fn create_namespaced_fs_policy_and_try_file_open( + #[future] fs_service: KubeService, #[future] kube_client: kube::Client, ) { let kube_client = kube_client.await; - let service = service.await; + let service = fs_service.await; // Create policy, delete it when test exits. let _policy_guard = PolicyGuard::namespaced(