Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #26026: Add a size limit on files handled by relayd #6070

Draft
wants to merge 3 commits into
base: branches/rudder/8.3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions relay/sources/relayd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ name = "benches"
# (it works since https://github.com/openssl/openssl/commit/e2590c3a162eb118c36b09c2168164283aa099b4)
anyhow = "1"
base64 = "0.22"
bytesize = { version = "1.3.0", features = ["serde"] }
bytes = "1"
chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] }
clap = { version = "4.4.6", features = ["derive"] }
Expand Down
3 changes: 3 additions & 0 deletions relay/sources/relayd/src/api/shared_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ pub fn routes_1(job_config: Arc<JobConfig>) -> BoxedFilter<(impl Reply,)> {
});

let job_config_put = job_config;
let max_size = job_config_put.cfg.shared_files.max_size;
let put = method::put()
.map(move || job_config_put.clone())
// Checking the header is enough as hyper will not read more.
.and(body::content_length_limit(max_size.as_u64()))
.and(base)
.and(query::<SharedFilesPutParams>())
.and(body::bytes())
Expand Down
15 changes: 14 additions & 1 deletion relay/sources/relayd/src/configuration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
};

use anyhow::{anyhow, bail, Context, Error};
use bytesize::ByteSize;
use secrecy::SecretString;
use serde::{
de::{Deserializer, Error as SerdeError, Unexpected, Visitor},
Expand Down Expand Up @@ -316,6 +317,8 @@ pub struct InventoryConfig {
pub catchup: CatchupConfig,
#[serde(default)]
pub cleanup: CleanupConfig,
#[serde_inline_default(ByteSize::mib(5))]
pub max_size: ByteSize,
}

impl Default for InventoryConfig {
Expand Down Expand Up @@ -350,6 +353,8 @@ pub struct ReportingConfig {
pub cleanup: CleanupConfig,
#[serde(default)]
pub skip_event_types: HashSet<String>,
#[serde_inline_default(ByteSize::mib(1))]
pub max_size: ByteSize,
}

impl Default for ReportingConfig {
Expand Down Expand Up @@ -427,6 +432,8 @@ pub struct SharedFiles {
pub path: PathBuf,
#[serde(default)]
pub cleanup: SharedFilesCleanupConfig,
#[serde_inline_default(ByteSize::mib(50))]
pub max_size: ByteSize,
}

impl Default for SharedFiles {
Expand Down Expand Up @@ -539,7 +546,7 @@ pub struct RsyncConfig {
#[serde_inline_default("localhost:5310".into())]
listen: String,

// False to allow non authenticated clients
// False to allow non-authenticated clients
#[serde_inline_default(true)]
authentication: bool,
}
Expand Down Expand Up @@ -614,6 +621,7 @@ mod tests {
frequency: Duration::from_secs(3600),
retention: Duration::from_secs(3600 * 24 * 7),
},
max_size: ByteSize::mib(5),
},
reporting: ReportingConfig {
directory: PathBuf::from("/var/rudder/reports/"),
Expand All @@ -627,6 +635,7 @@ mod tests {
retention: Duration::from_secs(3600 * 24 * 7),
},
skip_event_types: HashSet::new(),
max_size: ByteSize::mib(1),
},
},
output: OutputConfig {
Expand Down Expand Up @@ -655,6 +664,7 @@ mod tests {
cleanup: SharedFilesCleanupConfig {
frequency: Duration::from_secs(600),
},
max_size: ByteSize::mib(50),
},
shared_folder: SharedFolder {
path: PathBuf::from("/var/rudder/configuration-repository/shared-files/"),
Expand Down Expand Up @@ -720,6 +730,7 @@ mod tests {
frequency: Duration::from_secs(10),
retention: Duration::from_secs(10),
},
max_size: ByteSize::mib(150),
},
reporting: ReportingConfig {
directory: PathBuf::from("target/tmp/reporting/"),
Expand All @@ -733,6 +744,7 @@ mod tests {
retention: Duration::from_secs(30 * 60 + 20),
},
skip_event_types: HashSet::new(),
max_size: ByteSize::gib(1),
},
},
output: OutputConfig {
Expand Down Expand Up @@ -763,6 +775,7 @@ mod tests {
cleanup: SharedFilesCleanupConfig {
frequency: Duration::from_secs(43),
},
max_size: ByteSize::kib(42),
},
shared_folder: SharedFolder {
path: PathBuf::from("tests/api_shared_folder"),
Expand Down
30 changes: 29 additions & 1 deletion relay/sources/relayd/src/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{
path::{Path, PathBuf},
};

use anyhow::Error;
use anyhow::{bail, Error};
use bytesize::ByteSize;
use sha2::{Digest, Sha256};
use tokio::fs::{remove_file, rename};
use tracing::debug;
Expand Down Expand Up @@ -66,6 +67,33 @@ async fn failure(file: ReceivedFile, directory: RootDirectory) -> Result<(), Err
Ok(())
}

/// Returns Ok if the file is smaller than the limit, otherwise moves it to the failed directory
/// and returns an error.
async fn ensure_file_size_limit(
file: ReceivedFile,
limit: ByteSize,
directory: PathBuf,
) -> Result<(), Error> {
match file.metadata().map(|m| m.len()) {
Ok(size) => {
if size > limit.as_u64() {
failure(file.clone(), directory).await?;
bail!(
"skipping {:#?} as it is too large ({} bytes > {})",
file,
size,
limit
);
} else {
Ok(())
}
}
Err(e) => {
bail!("skipping {:#?} as it could not be read: {}", file, e);
}
}
}

/// Computes an id from a file name. It is added as key-value in tracing and allows following a file across relays
/// with a simple id.
/// It does not need to be cryptographic but only to have reasonable statistic quality, but we already have sha2 available
Expand Down
15 changes: 15 additions & 0 deletions relay/sources/relayd/src/processing/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::Error;
use tokio::sync::mpsc;
use tracing::{debug, error, info, instrument, span, Instrument, Level};

use crate::processing::ensure_file_size_limit;
use crate::{
configuration::main::InventoryOutputSelect,
input::watch::*,
Expand Down Expand Up @@ -83,6 +84,20 @@ async fn serve(
continue;
}

match ensure_file_size_limit(
file.clone(),
job_config.cfg.processing.inventory.max_size,
job_config.cfg.processing.inventory.directory.clone(),
)
.await
{
Ok(_) => (),
Err(e) => {
error!("{:?}", e);
continue;
}
}

let queue_id = queue_id_from_file(&file);
debug!("received: {:?}", file);
let span = span!(
Expand Down
15 changes: 15 additions & 0 deletions relay/sources/relayd/src/processing/reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::Error;
use tokio::{sync::mpsc, task::spawn_blocking};
use tracing::{debug, error, info, instrument, span, warn, Instrument, Level};

use crate::processing::ensure_file_size_limit;
use crate::{
configuration::main::ReportingOutputSelect,
data::{RunInfo, RunLog},
Expand Down Expand Up @@ -65,6 +66,20 @@ async fn serve(job_config: Arc<JobConfig>, mut rx: mpsc::Receiver<ReceivedFile>)
continue;
}

match ensure_file_size_limit(
file.clone(),
job_config.cfg.processing.reporting.max_size,
job_config.cfg.processing.reporting.directory.clone(),
)
.await
{
Ok(_) => (),
Err(e) => {
error!("{:?}", e);
continue;
}
}

let queue_id = queue_id_from_file(&file);
let span = span!(
Level::INFO,
Expand Down
3 changes: 3 additions & 0 deletions relay/sources/relayd/tests/files/config/main.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ https_idle_timeout = "42s"
[processing.inventory]
directory = "target/tmp/inventories/"
output = "upstream"
max_size = "150 MiB"

[processing.inventory.catchup]
# to test compatibility with previous syntax
Expand All @@ -31,6 +32,7 @@ retention = "10s"
directory = "target/tmp/reporting/"
output = "database"
skip_event_types = []
max_size = "1GiB"

[processing.reporting.catchup]
frequency = "10s"
Expand Down Expand Up @@ -58,6 +60,7 @@ use_sudo = false

[shared_files]
path = "tests/api_shared_files"
max_size = "42kiB"

[shared_files.cleanup]
frequency = "43s"
Expand Down
6 changes: 6 additions & 0 deletions relay/sources/relayd/tests/shared_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ fn it_shares_files() {
.send().unwrap();
assert_eq!(500, upload.status());

// Too big

let upload = client.put("http://127.0.0.1:3030/rudder/relay-api/1/shared-files/37817c4d-fbf7-4850-a985-50021f4e8f41/e745a140-40bc-4b86-b6dc-084488fc906b/file2?ttl=1d").body(format!("{}\n{}", signature, content))
.header("Content-Length", "10000000000") .send().unwrap();
assert_eq!(500, upload.status());

// Correct upload

let upload = client.put("http://127.0.0.1:3030/rudder/relay-api/1/shared-files/37817c4d-fbf7-4850-a985-50021f4e8f41/e745a140-40bc-4b86-b6dc-084488fc906b/file2?ttl=1d").body(format!("{}\n{}", signature, content))
Expand Down