From 940cfee12618a1e8c1c0208165659bdc3fbb470d Mon Sep 17 00:00:00 2001 From: Shaun Remekie Date: Wed, 23 Oct 2024 19:18:43 +0200 Subject: [PATCH 1/5] initial refactor, moved logs logic to sub-module logs --- src/clients.rs | 24 ++ src/{combined_event.rs => events.rs} | 18 +- src/lib.rs | 427 +-------------------------- src/{ => logs}/config.rs | 0 src/{ => logs}/coralogix.rs | 31 +- src/{ => logs}/ecr.rs | 2 +- src/logs/mod.rs | 413 ++++++++++++++++++++++++++ src/{ => logs}/process.rs | 10 +- src/main.rs | 94 ++++-- tests/{integration.rs => logs.rs} | 170 +++++------ 10 files changed, 630 insertions(+), 559 deletions(-) create mode 100644 src/clients.rs rename src/{combined_event.rs => events.rs} (84%) rename src/{ => logs}/config.rs (100%) rename src/{ => logs}/coralogix.rs (95%) rename src/{ => logs}/ecr.rs (99%) create mode 100644 src/logs/mod.rs rename src/{ => logs}/process.rs (99%) rename tests/{integration.rs => logs.rs} (95%) diff --git a/src/clients.rs b/src/clients.rs new file mode 100644 index 0000000..2afb4f3 --- /dev/null +++ b/src/clients.rs @@ -0,0 +1,24 @@ +use aws_sdk_s3::Client as S3Client; +use aws_sdk_sqs::Client as SqsClient; +use aws_sdk_ecr::Client as EcrClient; +use aws_config::SdkConfig; + + +/// A type used to hold the AWS clients required to interact with AWS services +/// used by the lambda function. +#[derive(Clone)] +pub struct AwsClients { + pub s3: S3Client, + pub ecr: EcrClient, + pub sqs: SqsClient, +} + +impl AwsClients { + pub fn new(sdk_config: &SdkConfig) -> Self { + AwsClients { + s3: S3Client::new(&sdk_config), + ecr: EcrClient::new(&sdk_config), + sqs: SqsClient::new(&sdk_config), + } + } +} diff --git a/src/combined_event.rs b/src/events.rs similarity index 84% rename from src/combined_event.rs rename to src/events.rs index 54833cb..7946a9c 100644 --- a/src/combined_event.rs +++ b/src/events.rs @@ -10,7 +10,7 @@ use serde_json::Value; use tracing::debug; #[derive(Debug)] -pub enum CombinedEvent { +pub enum Combined { S3(S3Event), Sns(SnsEvent), CloudWatchLogs(LogsEvent), @@ -20,7 +20,7 @@ pub enum CombinedEvent { EcrScan(EcrScanEvent), } -impl<'de> Deserialize<'de> for CombinedEvent { +impl<'de> Deserialize<'de> for Combined { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, @@ -29,31 +29,31 @@ impl<'de> Deserialize<'de> for CombinedEvent { debug!("raw_value: {:?}", raw_value); if let Ok(event) = S3Event::deserialize(&raw_value) { tracing::info!("s3 event detected"); - return Ok(CombinedEvent::S3(event)); + return Ok(Combined::S3(event)); } if let Ok(event) = SnsEvent::deserialize(&raw_value) { tracing::info!("sns event detected"); - return Ok(CombinedEvent::Sns(event)); + return Ok(Combined::Sns(event)); } if let Ok(event) = EcrScanEvent::deserialize(&raw_value) { tracing::info!("ecr scan event detected"); - return Ok(CombinedEvent::EcrScan(event)); + return Ok(Combined::EcrScan(event)); } if let Ok(event) = LogsEvent::deserialize(&raw_value) { tracing::info!("cloudwatch event detected"); - return Ok(CombinedEvent::CloudWatchLogs(event)); + return Ok(Combined::CloudWatchLogs(event)); } if let Ok(event) = KinesisEvent::deserialize(&raw_value) { tracing::info!("kinesis event detected"); - return Ok(CombinedEvent::Kinesis(event)); + return Ok(Combined::Kinesis(event)); } if let Ok(event) = SqsEvent::deserialize(&raw_value) { tracing::info!("sqs event detected"); - return Ok(CombinedEvent::Sqs(event)); + return Ok(Combined::Sqs(event)); } // IMPORTANT: kafka must be evaluated last as it uses an arbitrary map to evaluate records. @@ -69,7 +69,7 @@ impl<'de> Deserialize<'de> for CombinedEvent { "unsupported or bad event type: {raw_value}" ))); } - return Ok(CombinedEvent::Kafka(event)); + return Ok(Combined::Kafka(event)); } Err(de::Error::custom(format!( diff --git a/src/lib.rs b/src/lib.rs index 99229e2..ac6239e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,32 +1,9 @@ -use async_recursion::async_recursion; -use aws_config::SdkConfig; -use aws_lambda_events::cloudwatch_logs::LogsEvent; -use aws_lambda_events::event::cloudwatch_logs::AwsLogs; -use aws_lambda_events::event::s3::S3Event; -use aws_sdk_ecr::Client as EcrClient; -use aws_sdk_s3::Client as S3Client; -use aws_sdk_sqs::types::MessageAttributeValue; -use aws_sdk_sqs::Client as SqsClient; -use combined_event::CombinedEvent; -use cx_sdk_rest_logs::config::{BackoffConfig, LogExporterConfig}; -use cx_sdk_rest_logs::{DynLogExporter, RestLogExporter}; -use http::header::USER_AGENT; -use lambda_runtime::{Context, Error, LambdaEvent}; -use std::collections::HashMap; -use std::string::String; -use std::sync::Arc; -use std::time::Duration; use tracing::level_filters::LevelFilter; -use tracing::{debug, info}; use tracing_subscriber::EnvFilter; -use crate::config::{Config, IntegrationType}; - -pub mod combined_event; -pub mod config; -pub mod coralogix; -pub mod ecr; -pub mod process; +pub mod logs; +pub mod clients; +pub mod events; pub fn set_up_logging() { tracing_subscriber::fmt() @@ -38,401 +15,3 @@ pub fn set_up_logging() { .with_ansi(false) .init(); } - -pub fn set_up_coralogix_exporter(config: &Config) -> Result { - let backoff = BackoffConfig { - initial_delay: Duration::from_millis(10000), - max_delay: Duration::from_millis(60000), - max_elapsed_time: Duration::from_secs(config.max_elapsed_time), - }; - - let mut headers: HashMap = HashMap::new(); - headers.insert( - USER_AGENT.to_string(), - concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),).to_owned(), - ); - headers.insert( - "X-Coralogix-Data-Source".to_owned(), - config.integration_type.to_string(), - ); - let config = LogExporterConfig { - url: config.endpoint.clone(), - request_timeout: Duration::from_secs(30), - backoff_config: backoff, - user_agent: None, - additional_headers: headers, - request_body_size_limit: None, - keep_alive_interval: None, - keep_alive_timeout: None, - keep_alive_while_idle: None, - }; - let exporter = Arc::new(RestLogExporter::builder().with_config(config).build()?); - - Ok(exporter) -} - -#[async_recursion] -// lambda handler -pub async fn function_handler( - clients: &AwsClients, - coralogix_exporter: DynLogExporter, - config: &Config, - evt: LambdaEvent, -) -> Result<(), Error> { - info!("Handling lambda invocation"); - - // TODO this may need to be moved process - // TODO will this always produce just one bucket/key? (check this) - debug!("Handling event: {:?}", evt); - debug!("Handling event payload: {:?}", evt.payload); - match evt.payload { - CombinedEvent::S3(s3_event) => { - info!("S3 EVENT Detected"); - let (bucket, key) = handle_s3_event(s3_event).await?; - crate::process::s3(&clients.s3, coralogix_exporter, config, bucket, key).await?; - } - CombinedEvent::Sns(sns_event) => { - debug!("SNS Event: {:?}", sns_event); - let message = &sns_event.records[0].sns.message; - if config.integration_type != IntegrationType::Sns { - let s3_event = serde_json::from_str::(message)?; - let (bucket, key) = handle_s3_event(s3_event).await?; - info!("SNS S3 EVENT Detected"); - crate::process::s3(&clients.s3, coralogix_exporter, config, bucket, key).await?; - } else { - info!("SNS TEXT EVENT Detected"); - crate::process::sns_logs( - sns_event.records[0].sns.message.clone(), - coralogix_exporter, - config, - ) - .await?; - } - } - CombinedEvent::CloudWatchLogs(logs_event) => { - info!("CLOUDWATCH EVENT Detected"); - let cloudwatch_event_log = handle_cloudwatch_logs_event(logs_event).await?; - crate::process::cloudwatch_logs(cloudwatch_event_log, coralogix_exporter, config) - .await?; - } - CombinedEvent::Sqs(sqs_event) => { - debug!("SQS Event: {:?}", sqs_event.records[0]); - for record in &sqs_event.records { - if let Some(message) = &record.body { - if config.integration_type != IntegrationType::Sqs { - let evt: CombinedEvent = serde_json::from_str(message)?; - let internal_event = LambdaEvent::new(evt, Context::default()); - - // recursively call function_handler - // note that there is no risk of hitting the recursion stack limit - // here as recursiion will only be called as many times as there are nested - // events in an SQS message - let result = function_handler( - clients, - coralogix_exporter.clone(), - config, - internal_event, - ) - .await; - - if result.is_ok() { - continue; - } - - if let (Some(dlq_arn), Some(event_source_arn), Some(dlq_url)) = ( - config.dlq_arn.clone(), - record.event_source_arn.clone(), - config.dlq_url.clone(), - ) { - if dlq_arn != event_source_arn { - // if the message is not from the dlq, return the orginal result - return result; - } - - tracing::info!("DLQ event detected"); - let mut current_retry_count = record - .message_attributes - .get("retry") - .and_then(|attr| attr.string_value.as_deref()) // Convert Option to Option<&str> - .map_or(Ok(0), str::parse::) // Parse as i32 or default to 0; map_or returns Result - .unwrap_or(0); // In case of parse error, default to 0 - - let retry_limit = config - .dlq_retry_limit - .clone() - .unwrap_or("3".to_string()) - .parse::() - .map_err(|e| format!("failed parse dlq retry limit - {}", e))?; - - if current_retry_count >= retry_limit { - tracing::info!( - "Retry limit reached for message: {:?}", - record.body - ); - s3_store_failed_event( - &clients.s3, - config.dlq_s3_bucket.clone().unwrap(), - record.body.clone().unwrap(), - ) - .await?; - - continue; - } - - // increment retry count - current_retry_count += 1; - - let retry_attr = MessageAttributeValue::builder() - .set_data_type(Some("String".to_string())) - .set_string_value(Some(current_retry_count.to_string())) - .build()?; - - let last_err_attr = MessageAttributeValue::builder() - .set_data_type(Some("String".to_string())) - .set_string_value(Some(result.err().unwrap().to_string())) - .build()?; - - tracing::info!("sending message to DLQ"); - clients - .sqs - .send_message() - .queue_url(dlq_url) - .message_attributes("retry", retry_attr) - .message_attributes("LastError", last_err_attr) - .message_body(message) - .send() - .await?; - - continue; - } - - result?; - } else { - debug!("SQS TEXT EVENT Detected"); - crate::process::sqs_logs( - message.clone(), - coralogix_exporter.clone(), - config, - ) - .await?; - } - } - } - } - CombinedEvent::Kinesis(kinesis_event) => { - for record in kinesis_event.records { - debug!("Kinesis record: {:?}", record); - let message = record.kinesis.data; - debug!("Kinesis data: {:?}", &message); - crate::process::kinesis_logs(message, coralogix_exporter.clone(), config).await?; - } - } - CombinedEvent::Kafka(kafka_event) => { - let mut all_records = Vec::new(); - for (topic_partition, mut records) in kafka_event.records { - debug!("Kafka record: {topic_partition:?} --> {records:?}"); - all_records.append(&mut records) - } - crate::process::kafka_logs(all_records, coralogix_exporter.clone(), config).await?; - } - CombinedEvent::EcrScan(ecr_scan_event) => { - debug!("ECR Scan event: {:?}", ecr_scan_event); - crate::process::ecr_scan_logs( - &clients.ecr, - ecr_scan_event, - coralogix_exporter.clone(), - config, - ) - .await?; - } - }; - - Ok(()) -} - -pub async fn handle_cloudwatch_logs_event(logs_event: LogsEvent) -> Result { - debug!("Cloudwatch Event: {:?}", logs_event.aws_logs.data); - Ok(logs_event.aws_logs) -} -pub async fn handle_s3_event(s3_event: S3Event) -> Result<(String, String), Error> { - debug!("S3 Event: {:?}", s3_event); - let bucket = s3_event.records[0] - .s3 - .bucket - .name - .as_ref() - .expect("Bucket name to exist") - .to_owned(); - let key = s3_event.records[0] - .s3 - .object - .key - .as_ref() - .expect("Object key to exist") - .to_owned(); - - let decoded_key = percent_encoding::percent_decode_str(&key.replace("+", " ")) - .decode_utf8()? - //.replace("+", " "); - .to_string(); - Ok((bucket, decoded_key)) -} - -/// A type used to hold the AWS clients required to interact with AWS services -/// used by the lambda function. -#[derive(Clone)] -pub struct AwsClients { - pub s3: S3Client, - pub ecr: EcrClient, - pub sqs: SqsClient, -} - -impl AwsClients { - pub fn new(sdk_config: &SdkConfig) -> Self { - AwsClients { - s3: S3Client::new(&sdk_config), - ecr: EcrClient::new(&sdk_config), - sqs: SqsClient::new(&sdk_config), - } - } -} - -async fn s3_store_failed_event( - s3client: &S3Client, - bucket: String, - event: String, -) -> Result<(), String> { - // create object name using md5sum of the data string - let digest = md5::compute(event.as_bytes()); - let mut object_name = format!("{:x}.json", digest); - let mut key = chrono::Local::now() - .format("coraligx-aws-shipper/failed-events/%Y/%m/%d/%H") - .to_string(); - - let mut data = event.clone().as_bytes().to_owned(); - - // if s3 event, read the object from s3 - if let Ok(e) = serde_json::from_str::(&event) { - let b = e.records[0] - .s3 - .bucket - .name - .as_deref() - .ok_or_else(|| format!("failed to get bucket name"))?; - let k = e.records[0] - .s3 - .object - .key - .as_deref() - .ok_or_else(|| format!("failed to get object key name"))?; - - match process::get_bytes_from_s3(s3client, b.to_string(), k.to_string()).await { - Ok(bytes) => { - data = bytes; - object_name = format!("{}/{}", b, k); - } - Err(e) => { - tracing::error!("failed to read object from s3 for dlq, storing original event instead - {}", e); - } - } - } - - // if cloudwatch logs event, use loggroup name and md5 sum as object name - if let Ok(e) = serde_json::from_str::(&event) { - object_name = format!( - "{}/{}/{}", - e.aws_logs.data.log_group, e.aws_logs.data.log_stream, object_name - ); - } - - key = format!("{}/{}", key, object_name); - let buffer = - aws_smithy_types::byte_stream::ByteStream::new(aws_smithy_types::body::SdkBody::from(data)); - - tracing::info!("uploading failed event to S3: s3://{}/{}", bucket, key); - s3client - .put_object() - .bucket(bucket) - .key(key) - .body(buffer) - .send() - .await - .map_err(|e| format!("failed uploading file to bucket - {}", e.into_service_error()))?; - - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - use aws_lambda_events::event::s3::S3Event; - - // Note: we test the s3_event handler directly here, since the integration tests will bypass it - // using the mock s3 client. The [handle_cloudwatch_logs_event] is however invoked as part of the - // integration test workflow. - #[tokio::test] - async fn test_handle_s3_event() { - let s3_event_str = |bucket: &str, key: &str| -> String { - format!( - r#"{{ - "Records": [ - {{ - "eventVersion": "2.0", - "eventSource": "aws:s3", - "awsRegion": "eu-west-1", - "eventTime": "1970-01-01T00:00:00.000Z", - "eventName": "ObjectCreated:Put", - "userIdentity": {{ - "principalId": "EXAMPLE" - }}, - "requestParameters": {{ - "sourceIPAddress": "127.0.0.1" - }}, - "responseElements": {{ - "x-amz-request-id": "EXAMPLE123456789", - "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH" - }}, - "s3": {{ - "s3SchemaVersion": "1.0", - "configurationId": "testConfigRule", - "bucket": {{ - "name": "{}", - "ownerIdentity": {{ - "principalId": "EXAMPLE" - }}, - "arn": "arn:aws:s3:::{}" - }}, - "object": {{ - "key": "{}", - "size": 311000048, - "eTag": "0123456789abcdef0123456789abcdef", - "sequencer": "0A1B2C3D4E5F678901" - }} - }} - }} - ] - }}"#, - bucket, bucket, key - ) - }; - - // test normal s3 event - let s3_event = s3_event_str("coralogix-serverless-repo", "coralogix-aws-shipper/s3.log"); - let evt: S3Event = - serde_json::from_str(s3_event.as_str()).expect("failed to parse s3_event"); - let (bucket, key) = handle_s3_event(evt).await.unwrap(); - assert_eq!(bucket, "coralogix-serverless-repo"); - assert_eq!(key, "coralogix-aws-shipper/s3.log"); - - // test s3 event with spaces in key name (note: aws event replaces spaces with +) - let s3_event = s3_event_str( - "coralogix-serverless-repo", - "coralogix-aws-shipper/s3+with+spaces.log", - ); - let evt: S3Event = - serde_json::from_str(s3_event.as_str()).expect("failed to parse s3_event"); - let (bucket, key) = handle_s3_event(evt).await.unwrap(); - assert_eq!(bucket, "coralogix-serverless-repo"); - assert_eq!(key, "coralogix-aws-shipper/s3 with spaces.log"); - } -} diff --git a/src/config.rs b/src/logs/config.rs similarity index 100% rename from src/config.rs rename to src/logs/config.rs diff --git a/src/coralogix.rs b/src/logs/coralogix.rs similarity index 95% rename from src/coralogix.rs rename to src/logs/coralogix.rs index 93aa44b..193856d 100644 --- a/src/coralogix.rs +++ b/src/logs/coralogix.rs @@ -1,6 +1,5 @@ -use crate::config::Config; -use crate::process::Metadata; -use crate::*; +use crate::logs::config::Config; +use crate::logs::process::Metadata; use cx_sdk_rest_logs::auth::AuthData; use cx_sdk_rest_logs::model::{LogSinglesEntry, LogSinglesRequest, Severity}; use cx_sdk_rest_logs::DynLogExporter; @@ -8,19 +7,21 @@ use futures::stream::{StreamExt, TryStreamExt}; use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::env; use std::iter::IntoIterator; use std::time::Instant; use std::vec::Vec; use time::OffsetDateTime; -use tracing::{error, info}; -use std::env; +use tracing::{error, info, debug}; +use std::collections::HashMap; +use crate::logs::*; pub async fn process_batches( logs: Vec, configured_app_name: &str, configured_sub_name: &str, config: &Config, - metadata_instance: &process::Metadata, + metadata_instance: &Metadata, exporter: DynLogExporter, ) -> Result<(), Error> { let logs: Vec = logs @@ -139,7 +140,7 @@ fn convert_to_log_entry( tracing::debug!("Sub Name: {}", &subsystem_name); let severity = get_severity_level(&log); let stream_name = metadata_instance.stream_name.clone(); - let topic_name = metadata_instance.topic_name.clone(); + // let topic_name = metadata_instance.topic_name.clone(); // let loggroup_name = metadata_instance.log_group.clone(); tracing::debug!("Severity: {:?}", severity); @@ -197,27 +198,33 @@ fn convert_to_log_entry( debug!("Custom metadata STR: {}", custom_metadata_str); let mut metadata = HashMap::new(); let pairs = custom_metadata_str.split(','); - + for pair in pairs { let split_pair: Vec<&str> = pair.split('=').collect(); match split_pair.as_slice() { [key, value] => { metadata.insert(key.to_string(), value.to_string()); - }, + } _ => { error!("Failed to split key-value pair: {}", pair); continue; } } } - + if !metadata.is_empty() { debug!("Custom metadata: {:?}", metadata); message.custom_metadata = metadata; } } debug!("Message metadata: {:?}", message.custom_metadata); - let body = if message.stream_name.is_some() || message.loggroup_name.is_some() || message.bucket_name.is_some() || message.key_name.is_some() || message.topic_name.is_some()|| !message.custom_metadata.is_empty() { + let body = if message.stream_name.is_some() + || message.loggroup_name.is_some() + || message.bucket_name.is_some() + || message.key_name.is_some() + || message.topic_name.is_some() + || !message.custom_metadata.is_empty() + { serde_json::to_value(&message).unwrap_or(message.message) } else { message.message @@ -331,7 +338,7 @@ fn get_severity_level(message: &str) -> Severity { #[cfg(test)] mod test { - use crate::coralogix::dynamic_metadata_for_log; + use crate::logs::coralogix::dynamic_metadata_for_log; #[test] fn test_nondynamic_app_name() { diff --git a/src/ecr.rs b/src/logs/ecr.rs similarity index 99% rename from src/ecr.rs rename to src/logs/ecr.rs index 3801b1d..8aaf289 100644 --- a/src/ecr.rs +++ b/src/logs/ecr.rs @@ -1,4 +1,4 @@ -use crate::config::Config; +use crate::logs::config::Config; use aws_lambda_events::ecr_scan::EcrScanEvent; use aws_sdk_ecr::types::ImageIdentifier; use aws_sdk_ecr::Client as EcrClient; diff --git a/src/logs/mod.rs b/src/logs/mod.rs new file mode 100644 index 0000000..4619228 --- /dev/null +++ b/src/logs/mod.rs @@ -0,0 +1,413 @@ +use async_recursion::async_recursion; +use cx_sdk_rest_logs::{DynLogExporter, RestLogExporter}; +use lambda_runtime::{Context, Error, LambdaEvent}; +use aws_lambda_events::cloudwatch_logs::LogsEvent; +use aws_lambda_events::event::cloudwatch_logs::AwsLogs; +use cx_sdk_rest_logs::config::{BackoffConfig, LogExporterConfig}; +use tracing::{debug, info}; +use std::time::Duration; +use std::collections::HashMap; +use crate::clients::AwsClients; +use crate::logs::config::IntegrationType; +use crate::events; +use http::header::USER_AGENT; +use aws_lambda_events::event::s3::S3Event; +use aws_sdk_s3::Client as S3Client; +use aws_sdk_sqs::types::MessageAttributeValue; +use std::sync::Arc; + + +pub mod config; +pub mod process; +pub mod ecr; +pub mod coralogix; + + +pub fn set_up_coralogix_exporter(config: &config::Config) -> Result { + let backoff = BackoffConfig { + initial_delay: Duration::from_millis(10000), + max_delay: Duration::from_millis(60000), + max_elapsed_time: Duration::from_secs(config.max_elapsed_time), + }; + + let mut headers: HashMap = HashMap::new(); + headers.insert( + USER_AGENT.to_string(), + concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),).to_owned(), + ); + headers.insert( + "X-Coralogix-Data-Source".to_owned(), + config.integration_type.to_string(), + ); + let config = LogExporterConfig { + url: config.endpoint.clone(), + request_timeout: Duration::from_secs(30), + backoff_config: backoff, + user_agent: None, + additional_headers: headers, + request_body_size_limit: None, + keep_alive_interval: None, + keep_alive_timeout: None, + keep_alive_while_idle: None, + }; + let exporter = Arc::new(RestLogExporter::builder().with_config(config).build()?); + + Ok(exporter) +} + +#[async_recursion] +// lambda handler +pub async fn function_handler( + clients: &AwsClients, + coralogix_exporter: DynLogExporter, + config: &config::Config, + evt: LambdaEvent, +) -> Result<(), Error> { + info!("Handling lambda invocation"); + + // TODO this may need to be moved process + // TODO will this always produce just one bucket/key? (check this) + debug!("Handling event: {:?}", evt); + debug!("Handling event payload: {:?}", evt.payload); + match evt.payload { + events::Combined::S3(s3_event) => { + info!("S3 EVENT Detected"); + let (bucket, key) = handle_s3_event(s3_event).await?; + crate::logs::process::s3(&clients.s3, coralogix_exporter, config, bucket, key).await?; + } + events::Combined::Sns(sns_event) => { + debug!("SNS Event: {:?}", sns_event); + let message = &sns_event.records[0].sns.message; + if config.integration_type != IntegrationType::Sns { + let s3_event = serde_json::from_str::(message)?; + let (bucket, key) = handle_s3_event(s3_event).await?; + info!("SNS S3 EVENT Detected"); + crate::logs::process::s3(&clients.s3, coralogix_exporter, config, bucket, key).await?; + } else { + info!("SNS TEXT EVENT Detected"); + crate::logs::process::sns_logs( + sns_event.records[0].sns.message.clone(), + coralogix_exporter, + config, + ) + .await?; + } + } + events::Combined::CloudWatchLogs(logs_event) => { + info!("CLOUDWATCH EVENT Detected"); + let cloudwatch_event_log = handle_cloudwatch_logs_event(logs_event).await?; + process::cloudwatch_logs(cloudwatch_event_log, coralogix_exporter, config) + .await?; + } + events::Combined::Sqs(sqs_event) => { + debug!("SQS Event: {:?}", sqs_event.records[0]); + for record in &sqs_event.records { + if let Some(message) = &record.body { + if config.integration_type != IntegrationType::Sqs { + let evt: events::Combined = serde_json::from_str(message)?; + let internal_event = LambdaEvent::new(evt, Context::default()); + + // recursively call function_handler + // note that there is no risk of hitting the recursion stack limit + // here as recursiion will only be called as many times as there are nested + // events in an SQS message + let result = function_handler( + clients, + coralogix_exporter.clone(), + config, + internal_event, + ) + .await; + + if result.is_ok() { + continue; + } + + if let (Some(dlq_arn), Some(event_source_arn), Some(dlq_url)) = ( + config.dlq_arn.clone(), + record.event_source_arn.clone(), + config.dlq_url.clone(), + ) { + if dlq_arn != event_source_arn { + // if the message is not from the dlq, return the orginal result + return result; + } + + tracing::info!("DLQ event detected"); + let mut current_retry_count = record + .message_attributes + .get("retry") + .and_then(|attr| attr.string_value.as_deref()) // Convert Option to Option<&str> + .map_or(Ok(0), str::parse::) // Parse as i32 or default to 0; map_or returns Result + .unwrap_or(0); // In case of parse error, default to 0 + + let retry_limit = config + .dlq_retry_limit + .clone() + .unwrap_or("3".to_string()) + .parse::() + .map_err(|e| format!("failed parse dlq retry limit - {}", e))?; + + if current_retry_count >= retry_limit { + tracing::info!( + "Retry limit reached for message: {:?}", + record.body + ); + s3_store_failed_event( + &clients.s3, + config.dlq_s3_bucket.clone().unwrap(), + record.body.clone().unwrap(), + ) + .await?; + + continue; + } + + // increment retry count + current_retry_count += 1; + + let retry_attr = MessageAttributeValue::builder() + .set_data_type(Some("String".to_string())) + .set_string_value(Some(current_retry_count.to_string())) + .build()?; + + let last_err_attr = MessageAttributeValue::builder() + .set_data_type(Some("String".to_string())) + .set_string_value(Some(result.err().unwrap().to_string())) + .build()?; + + tracing::info!("sending message to DLQ"); + clients + .sqs + .send_message() + .queue_url(dlq_url) + .message_attributes("retry", retry_attr) + .message_attributes("LastError", last_err_attr) + .message_body(message) + .send() + .await?; + + continue; + } + + result?; + } else { + debug!("SQS TEXT EVENT Detected"); + process::sqs_logs( + message.clone(), + coralogix_exporter.clone(), + config, + ) + .await?; + } + } + } + } + events::Combined::Kinesis(kinesis_event) => { + for record in kinesis_event.records { + debug!("Kinesis record: {:?}", record); + let message = record.kinesis.data; + debug!("Kinesis data: {:?}", &message); + process::kinesis_logs(message, coralogix_exporter.clone(), config).await?; + } + } + events::Combined::Kafka(kafka_event) => { + let mut all_records = Vec::new(); + for (topic_partition, mut records) in kafka_event.records { + debug!("Kafka record: {topic_partition:?} --> {records:?}"); + all_records.append(&mut records) + } + process::kafka_logs(all_records, coralogix_exporter.clone(), config).await?; + } + events::Combined::EcrScan(ecr_scan_event) => { + debug!("ECR Scan event: {:?}", ecr_scan_event); + process::ecr_scan_logs( + &clients.ecr, + ecr_scan_event, + coralogix_exporter.clone(), + config, + ) + .await?; + } + }; + + Ok(()) +} + +pub async fn handle_cloudwatch_logs_event(logs_event: LogsEvent) -> Result { + debug!("Cloudwatch Event: {:?}", logs_event.aws_logs.data); + Ok(logs_event.aws_logs) +} + +pub async fn handle_s3_event(s3_event: S3Event) -> Result<(String, String), Error> { + debug!("S3 Event: {:?}", s3_event); + let bucket = s3_event.records[0] + .s3 + .bucket + .name + .as_ref() + .expect("Bucket name to exist") + .to_owned(); + let key = s3_event.records[0] + .s3 + .object + .key + .as_ref() + .expect("Object key to exist") + .to_owned(); + + let decoded_key = percent_encoding::percent_decode_str(&key.replace("+", " ")) + .decode_utf8()? + //.replace("+", " "); + .to_string(); + Ok((bucket, decoded_key)) +} + + +async fn s3_store_failed_event( + s3client: &S3Client, + bucket: String, + event: String, +) -> Result<(), String> { + // create object name using md5sum of the data string + let digest = md5::compute(event.as_bytes()); + let mut object_name = format!("{:x}.json", digest); + let mut key = chrono::Local::now() + .format("coraligx-aws-shipper/failed-events/%Y/%m/%d/%H") + .to_string(); + + let mut data = event.clone().as_bytes().to_owned(); + + // if s3 event, read the object from s3 + if let Ok(e) = serde_json::from_str::(&event) { + let b = e.records[0] + .s3 + .bucket + .name + .as_deref() + .ok_or_else(|| format!("failed to get bucket name"))?; + let k = e.records[0] + .s3 + .object + .key + .as_deref() + .ok_or_else(|| format!("failed to get object key name"))?; + + match process::get_bytes_from_s3(s3client, b.to_string(), k.to_string()).await { + Ok(bytes) => { + data = bytes; + object_name = format!("{}/{}", b, k); + } + Err(e) => { + tracing::error!( + "failed to read object from s3 for dlq, storing original event instead - {}", + e + ); + } + } + } + + // if cloudwatch logs event, use loggroup name and md5 sum as object name + if let Ok(e) = serde_json::from_str::(&event) { + object_name = format!( + "{}/{}/{}", + e.aws_logs.data.log_group, e.aws_logs.data.log_stream, object_name + ); + } + + key = format!("{}/{}", key, object_name); + let buffer = + aws_smithy_types::byte_stream::ByteStream::new(aws_smithy_types::body::SdkBody::from(data)); + + tracing::info!("uploading failed event to S3: s3://{}/{}", bucket, key); + s3client + .put_object() + .bucket(bucket) + .key(key) + .body(buffer) + .send() + .await + .map_err(|e| { + format!( + "failed uploading file to bucket - {}", + e.into_service_error() + ) + })?; + + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use aws_lambda_events::event::s3::S3Event; + + // Note: we test the s3_event handler directly here, since the integration tests will bypass it + // using the mock s3 client. The [handle_cloudwatch_logs_event] is however invoked as part of the + // integration test workflow. + #[tokio::test] + async fn test_handle_s3_event() { + let s3_event_str = |bucket: &str, key: &str| -> String { + format!( + r#"{{ + "Records": [ + {{ + "eventVersion": "2.0", + "eventSource": "aws:s3", + "awsRegion": "eu-west-1", + "eventTime": "1970-01-01T00:00:00.000Z", + "eventName": "ObjectCreated:Put", + "userIdentity": {{ + "principalId": "EXAMPLE" + }}, + "requestParameters": {{ + "sourceIPAddress": "127.0.0.1" + }}, + "responseElements": {{ + "x-amz-request-id": "EXAMPLE123456789", + "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH" + }}, + "s3": {{ + "s3SchemaVersion": "1.0", + "configurationId": "testConfigRule", + "bucket": {{ + "name": "{}", + "ownerIdentity": {{ + "principalId": "EXAMPLE" + }}, + "arn": "arn:aws:s3:::{}" + }}, + "object": {{ + "key": "{}", + "size": 311000048, + "eTag": "0123456789abcdef0123456789abcdef", + "sequencer": "0A1B2C3D4E5F678901" + }} + }} + }} + ] + }}"#, + bucket, bucket, key + ) + }; + + // test normal s3 event + let s3_event = s3_event_str("coralogix-serverless-repo", "coralogix-aws-shipper/s3.log"); + let evt: S3Event = + serde_json::from_str(s3_event.as_str()).expect("failed to parse s3_event"); + let (bucket, key) = handle_s3_event(evt).await.unwrap(); + assert_eq!(bucket, "coralogix-serverless-repo"); + assert_eq!(key, "coralogix-aws-shipper/s3.log"); + + // test s3 event with spaces in key name (note: aws event replaces spaces with +) + let s3_event = s3_event_str( + "coralogix-serverless-repo", + "coralogix-aws-shipper/s3+with+spaces.log", + ); + let evt: S3Event = + serde_json::from_str(s3_event.as_str()).expect("failed to parse s3_event"); + let (bucket, key) = handle_s3_event(evt).await.unwrap(); + assert_eq!(bucket, "coralogix-serverless-repo"); + assert_eq!(key, "coralogix-aws-shipper/s3 with spaces.log"); + } +} diff --git a/src/process.rs b/src/logs/process.rs similarity index 99% rename from src/process.rs rename to src/logs/process.rs index 96d5c0f..dd419aa 100644 --- a/src/process.rs +++ b/src/logs/process.rs @@ -20,9 +20,9 @@ use std::time::Instant; use tracing::{debug, info}; use std::env; -use crate::config::{Config, IntegrationType}; -use crate::coralogix; -use crate::ecr; +use crate::logs::config::{Config, IntegrationType}; +use crate::logs::coralogix; +use crate::logs::ecr; pub async fn s3( s3_client: &Client, @@ -654,7 +654,7 @@ pub async fn kafka_logs( Ok(()) } -fn ungzip(compressed_data: Vec, key: String) -> Result, Error> { +fn ungzip(compressed_data: Vec, _: String) -> Result, Error> { if compressed_data.is_empty() { tracing::warn!("Input data is empty, cannot ungzip a zero-byte file."); return Ok(Vec::new()); @@ -757,7 +757,7 @@ mod test { use fancy_regex::Regex; use itertools::Itertools; - use crate::process::{block, sample, split}; + use crate::logs::process::{block, sample, split}; #[test] fn test() { diff --git a/src/main.rs b/src/main.rs index 3a53aa6..87ff269 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,34 @@ +pub mod logs; +pub mod clients; +pub mod events; + use aws_config::BehaviorVersion; -use coralogix_aws_shipper::combined_event::CombinedEvent; -use coralogix_aws_shipper::config; +use crate::events::Combined; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; -use tracing::info; +use tracing::{info, warn}; + +// define constants TELEMETRY_MODE env variable name +const TELEMETRY_MODE: &str = "TELEMETRY_MODE"; + +enum TelemetryMode { + Logs, + Metrics, + Traces, +} + +impl From<&str> for TelemetryMode { + fn from(s: &str) -> Self { + match s.to_lowercase().as_str() { + "logs" => TelemetryMode::Logs, + "metrics" => TelemetryMode::Metrics, + "traces" => TelemetryMode::Traces, + _ => { + warn!("Invalid telemetry mode, {} defaulting to [logs]", s); + TelemetryMode::Logs + } + } + } +} #[tokio::main] async fn main() -> Result<(), Error> { @@ -14,27 +40,49 @@ async fn main() -> Result<(), Error> { env!("CARGO_PKG_VERSION") ); + let mode = TelemetryMode::from( + std::env::var(TELEMETRY_MODE) + .unwrap_or("".to_string()) + .as_str(), + ); + let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; - let clients = coralogix_aws_shipper::AwsClients::new(&aws_config); - let mut config = config::Config::load_from_env()?; + let aws_clients = clients::AwsClients::new(&aws_config); + // let mut config = config::Config::load_from_env()?; + + + match mode { + TelemetryMode::Metrics => { + warn!("metrics telemetry mode not implemented"); + Ok(()) + } + + TelemetryMode::Traces => { + warn!("traces telemetry mode not implemented"); + Ok(()) + } + + // default to logs telemetry mode + _ => { + info!("Running in logs telemetry mode"); + let mut conf = crate::logs::config::Config::load_from_env()?; + let api_key_value = conf.api_key.token().to_string(); + if api_key_value.starts_with("arn:aws") && api_key_value.contains(":secretsmanager") { + conf.api_key = crate::logs::config::get_api_key_from_secrets_manager(&aws_config, api_key_value) + .await + .map_err(|e| e.to_string())?; + }; - // if APIKey provided is an ARN, get the APIKey from Secrets Manager - let api_key_value = config.api_key.token().to_string(); - if api_key_value.starts_with("arn:aws") && api_key_value.contains(":secretsmanager") { - config.api_key = config::get_api_key_from_secrets_manager(&aws_config, api_key_value) + let coralogix_exporter = crate::logs::set_up_coralogix_exporter(&conf)?; + run(service_fn(|request: LambdaEvent| { + logs::function_handler( + &aws_clients, + coralogix_exporter.clone(), + &conf, + request, + ) + })) .await - .map_err(|e| e.to_string())?; - }; - - let coralogix_exporter = coralogix_aws_shipper::set_up_coralogix_exporter(&config)?; - - run(service_fn(|request: LambdaEvent| { - coralogix_aws_shipper::function_handler( - &clients, - coralogix_exporter.clone(), - &config, - request, - ) - })) - .await + } + } } diff --git a/tests/integration.rs b/tests/logs.rs similarity index 95% rename from tests/integration.rs rename to tests/logs.rs index 83be69d..84ce419 100644 --- a/tests/integration.rs +++ b/tests/logs.rs @@ -4,8 +4,10 @@ use aws_config::BehaviorVersion; use aws_sdk_ecr::Client as EcrClient; use aws_sdk_s3::Client as S3Client; use aws_sdk_sqs::Client as SqsClient; -use coralogix_aws_shipper::combined_event::CombinedEvent; -use coralogix_aws_shipper::config::Config; +// use coralogix_aws_shipper::combined_event::Combined; +use coralogix_aws_shipper::events::Combined; +use coralogix_aws_shipper::logs::config::Config; +use coralogix_aws_shipper::clients::AwsClients; use cx_sdk_core::auth::AuthData; use cx_sdk_rest_logs::model::{LogBulkRequest, LogSinglesRequest}; use cx_sdk_rest_logs::LogExporter; @@ -259,7 +261,7 @@ async fn run_test_s3_event() { let config = Config::load_from_env().expect("failed to load config from env"); let (bucket, key) = ("coralogix-serverless-repo", "coralogix-aws-shipper/s3.log"); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -267,13 +269,13 @@ async fn run_test_s3_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -330,7 +332,7 @@ async fn run_test_folder_s3_event() { "coralogix-serverless-repo", "coralogix-aws-shipper/elb1/s3.log", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -338,12 +340,12 @@ async fn run_test_folder_s3_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -396,7 +398,7 @@ async fn run_cloudtraillogs_s3_event() { .expect("failed to create s3 client"); let config = Config::load_from_env().expect("failed to load config from env"); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( s3event_string( "coralogix-serverless-repo", "coralogix-aws-shipper/cloudtrail.log.gz", @@ -410,13 +412,13 @@ async fn run_cloudtraillogs_s3_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -474,7 +476,7 @@ async fn run_csv_s3_event() { "coralogix-serverless-repo", "coralogix-aws-shipper/s3csv.log", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -482,13 +484,13 @@ async fn run_csv_s3_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -541,7 +543,7 @@ async fn run_vpcflowlgos_s3_event() { .expect("failed to create s3 client"); let config = Config::load_from_env().unwrap(); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( s3event_string( "coralogix-serverless-repo", "coralogix-aws-shipper/vpcflow.log.gz", @@ -555,13 +557,13 @@ async fn run_vpcflowlgos_s3_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -611,7 +613,7 @@ async fn test_vpcflowlgos_s3_event() { async fn run_sns_event() { let config = Config::load_from_env().unwrap(); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( r#"{ "Records": [ { @@ -643,13 +645,13 @@ async fn run_sns_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = get_mock_s3client(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -707,7 +709,7 @@ async fn run_test_s3_event_large() { "coralogix-serverless-repo", "coralogix-aws-shipper/large.log", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -715,13 +717,13 @@ async fn run_test_s3_event_large() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -789,7 +791,7 @@ async fn run_test_s3_event_large_with_sampling() { "coralogix-serverless-repo", "coralogix-aws-shipper/large.log", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -797,13 +799,13 @@ async fn run_test_s3_event_large_with_sampling() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -855,7 +857,7 @@ async fn test_s3_event_large_with_sampling() { async fn run_cloudwatchlogs_event() { let config = Config::load_from_env().unwrap(); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( r#"{ "awslogs": { "data": "H4sIAAAAAAAAAHWPwQqCQBCGX0Xm7EFtK+smZBEUgXoLCdMhFtKV3akI8d0bLYmibvPPN3wz00CJxmQnTO41whwWQRIctmEcB6sQbFC3CjW3XW8kxpOpP+OC22d1Wml1qZkQGtoMsScxaczKN3plG8zlaHIta5KqWsozoTYw3/djzwhpLwivWFGHGpAFe7DL68JlBUk+l7KSN7tCOEJ4M3/qOI49vMHj+zCKdlFqLaU2ZHV2a4Ct/an0/ivdX8oYc1UVX860fQDQiMdxRQEAAA==" @@ -868,13 +870,13 @@ async fn run_cloudwatchlogs_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = get_mock_s3client(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -933,7 +935,7 @@ async fn run_blocking_and_newline_pattern() { "coralogix-aws-shipper/multiline.log", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -941,13 +943,13 @@ async fn run_blocking_and_newline_pattern() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -991,7 +993,7 @@ async fn test_blocking_and_newline_pattern() { ("AWS_REGION", Some("eu-central-1")), ("INTEGRATION_TYPE", Some("S3")), ("BLOCKING_PATTERN", Some("ERROR")), // blocking pattern - ("NEWLINE_PATTERN", Some(r"<\|>")), // newline pattern + ("NEWLINE_PATTERN", Some(r"<\|>")), // newline pattern ], run_blocking_and_newline_pattern(), ) @@ -1007,7 +1009,7 @@ async fn run_test_empty_s3_event() { "coralogix-serverless-repo", "coralogix-aws-shipper/empty.log", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -1015,12 +1017,12 @@ async fn run_test_empty_s3_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1053,7 +1055,7 @@ async fn run_sqs_s3_event() { get_mock_s3client(Some("./tests/fixtures/s3.log")).expect("failed to create s3 client"); let config = Config::load_from_env().unwrap(); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( r#"{ "Records": [ { @@ -1082,13 +1084,13 @@ async fn run_sqs_s3_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1137,7 +1139,7 @@ async fn test_sqs_s3_event() { async fn run_sqs_event() { let config = Config::load_from_env().unwrap(); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( r#"{ "Records": [ { @@ -1167,13 +1169,13 @@ async fn run_sqs_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = get_mock_s3client(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1223,7 +1225,7 @@ async fn test_sqs_event() { async fn run_kinesis_event() { let config = Config::load_from_env().unwrap(); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( r#"{ "Records": [ { @@ -1252,13 +1254,13 @@ async fn run_kinesis_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = get_mock_s3client(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1315,7 +1317,7 @@ async fn run_cloudfront_s3_event() { "coralogix-serverless-repo", "coralogix-aws-shipper/cloudfront.gz", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -1323,13 +1325,13 @@ async fn run_cloudfront_s3_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1386,7 +1388,7 @@ async fn run_test_s3_event_with_metadata() { let config = Config::load_from_env().expect("failed to load config from env"); let (bucket, key) = ("coralogix-serverless-repo", "coralogix-aws-shipper/s3.log"); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -1394,13 +1396,13 @@ async fn run_test_s3_event_with_metadata() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1459,7 +1461,7 @@ async fn run_test_s3_event_elb() { "coralogix-serverless-repo", "coralogix-aws-shipper/elb.log.gz", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -1467,13 +1469,13 @@ async fn run_test_s3_event_elb() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1533,7 +1535,7 @@ async fn test_s3_event_elb() { async fn run_kafka_event() { let config = Config::load_from_env().unwrap(); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( r#"{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", @@ -1577,13 +1579,13 @@ async fn run_kafka_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = get_mock_s3client(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1614,7 +1616,7 @@ async fn run_kafka_event() { async fn run_kafka_event_with_base64() { let config = Config::load_from_env().unwrap(); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( r#"{ "Records": [ { @@ -1664,13 +1666,13 @@ async fn run_kafka_event_with_base64() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = get_mock_s3client(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1749,7 +1751,7 @@ async fn test_kafka_event() { #[tokio::test] async fn test_invalid_event() { - let r = serde_json::from_str::( + let r = serde_json::from_str::( r#"{ "test": "unsupported event", "type": "invalid" @@ -1765,7 +1767,7 @@ async fn run_test_ecrscan_event() { let ecr_client = get_mock_ecrclient(Some("./tests/fixtures/ecr_scan.log")) .expect("failed to create ecr client"); let config = Config::load_from_env().expect("failed to load config from env"); - let evt: CombinedEvent = serde_json::from_str( + let evt: Combined = serde_json::from_str( r#"{ "account": "0000000000", "detail": { @@ -1799,13 +1801,13 @@ async fn run_test_ecrscan_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = get_mock_s3client(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -1844,7 +1846,7 @@ async fn test_ecrscan_event() { } async fn run_test_s3_retry_limit_reached_dlq_event() { - let evt: CombinedEvent = serde_json::from_str(r#"{ + let evt: Combined = serde_json::from_str(r#"{ "Records": [ { "messageId": "b5156cca-4ed0-4cb7-9442-8e46ae4b3e5d", @@ -1936,7 +1938,7 @@ async fn run_test_s3_retry_limit_reached_dlq_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = make_client!(aws_sdk_s3, s3_relay_client); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, @@ -1946,7 +1948,7 @@ async fn run_test_s3_retry_limit_reached_dlq_event() { let exporter = Arc::new(FakeLogExporter::new()); let event = LambdaEvent::new(evt, Context::default()); - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -2016,7 +2018,7 @@ async fn test_s3_retry_limit_reached_dlq_event() { } async fn run_test_cloudwatch_retry_limit_reached_dlq_event() { - let evt: CombinedEvent = serde_json::from_str(r#"{ + let evt: Combined = serde_json::from_str(r#"{ "Records": [ { "messageId": "b5156cca-4ed0-4cb7-9442-8e46ae4b3e5d", @@ -2082,7 +2084,7 @@ async fn run_test_cloudwatch_retry_limit_reached_dlq_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = make_client!(aws_sdk_s3, s3_relay_client); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, @@ -2092,7 +2094,7 @@ async fn run_test_cloudwatch_retry_limit_reached_dlq_event() { let exporter = Arc::new(FailingLogExporter::default()); let event = LambdaEvent::new(evt, Context::default()); - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -2150,7 +2152,7 @@ async fn test_cloudwatch_retry_limit_reached_dlq_event() { } async fn run_test_route_failed_event_to_dlq() { - let evt: CombinedEvent = serde_json::from_str(r#"{ + let evt: Combined = serde_json::from_str(r#"{ "Records": [ { "messageId": "b5156cca-4ed0-4cb7-9442-8e46ae4b3e5d", @@ -2229,7 +2231,7 @@ async fn run_test_route_failed_event_to_dlq() { let sqs_client = make_client!(aws_sdk_sqs, sqs_replay_client); let s3_client = make_client!(aws_sdk_s3, s3_relay_client); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, @@ -2239,7 +2241,7 @@ async fn run_test_route_failed_event_to_dlq() { let event = LambdaEvent::new(evt, Context::default()); let config = Config::load_from_env().expect("failed to load config from env"); - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -2282,7 +2284,7 @@ async fn test_route_failed_event_to_dlq() { } async fn run_dlq_success_msg() { - let evt: CombinedEvent = serde_json::from_str(r#"{ + let evt: Combined = serde_json::from_str(r#"{ "Records": [ { "messageId": "b5156cca-4ed0-4cb7-9442-8e46ae4b3e5d", @@ -2333,7 +2335,7 @@ async fn run_dlq_success_msg() { let sqs_client = get_mock_sqsclient(None).unwrap(); let s3_client = get_mock_s3client(Some("./tests/fixtures/s3.log")).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, @@ -2343,7 +2345,7 @@ async fn run_dlq_success_msg() { let event = LambdaEvent::new(evt, Context::default()); let config = Config::load_from_env().expect("failed to load config from env"); - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -2422,14 +2424,13 @@ macro_rules! make_client { }; } - async fn run_test_s3_event_with_custom_metadata() { let s3_client = get_mock_s3client(Some("./tests/fixtures/s3.log")).expect("failed to create s3 client"); let config = Config::load_from_env().expect("failed to load config from env"); let (bucket, key) = ("coralogix-serverless-repo", "coralogix-aws-shipper/s3.log"); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -2437,13 +2438,13 @@ async fn run_test_s3_event_with_custom_metadata() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -2476,7 +2477,6 @@ async fn run_test_s3_event_with_custom_metadata() { ); } - #[tokio::test] async fn test_s3_event_with_custom_metadata() { temp_env::async_with_vars( @@ -2502,7 +2502,7 @@ async fn run_csv_s3_custom_headers_event() { "coralogix-serverless-repo", "coralogix-aws-shipper/s3csv.log", ); - let evt: CombinedEvent = serde_json::from_str(s3event_string(bucket, key).as_str()) + let evt: Combined = serde_json::from_str(s3event_string(bucket, key).as_str()) .expect("failed to parse s3_event"); let exporter = Arc::new(FakeLogExporter::new()); @@ -2510,13 +2510,13 @@ async fn run_csv_s3_custom_headers_event() { let sqs_client = get_mock_sqsclient(None).unwrap(); let ecr_client = get_mock_ecrclient(None).unwrap(); - let clients = coralogix_aws_shipper::AwsClients { + let clients = AwsClients { s3: s3_client, sqs: sqs_client, ecr: ecr_client, }; - coralogix_aws_shipper::function_handler(&clients, exporter.clone(), &config, event) + coralogix_aws_shipper::logs::function_handler(&clients, exporter.clone(), &config, event) .await .unwrap(); @@ -2563,4 +2563,4 @@ async fn test_csv_s3_custom_headers_event() { run_csv_s3_custom_headers_event(), ) .await; -} \ No newline at end of file +} From d8c4efaeed53b0ca67b29e833169d3d41012315a Mon Sep 17 00:00:00 2001 From: Shaun Remekie Date: Wed, 23 Oct 2024 19:34:57 +0200 Subject: [PATCH 2/5] cargo.lock --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index eb3afc2..5a30e68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -882,7 +882,7 @@ checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" [[package]] name = "coralogix-aws-shipper" -version = "1.0.13" +version = "1.0.14" dependencies = [ "anyhow", "async-recursion", From aa271841e4f3a2ea8c0a6b35060d7443aac676bb Mon Sep 17 00:00:00 2001 From: Shaun Remekie Date: Wed, 23 Oct 2024 19:35:33 +0200 Subject: [PATCH 3/5] change function_handler to just handler --- src/logs/mod.rs | 4 ++-- tests/logs.rs | 52 ++++++++++++++++++++++++------------------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/logs/mod.rs b/src/logs/mod.rs index 4619228..b845952 100644 --- a/src/logs/mod.rs +++ b/src/logs/mod.rs @@ -57,7 +57,7 @@ pub fn set_up_coralogix_exporter(config: &config::Config) -> Result Date: Wed, 23 Oct 2024 19:36:14 +0200 Subject: [PATCH 4/5] added placeholder handler function for metrics. --- src/main.rs | 21 +++++++++++++-------- src/metrics/mod.rs | 17 +++++++++++++++++ 2 files changed, 30 insertions(+), 8 deletions(-) create mode 100644 src/metrics/mod.rs diff --git a/src/main.rs b/src/main.rs index 87ff269..b2e6483 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ pub mod logs; +pub mod metrics; pub mod clients; pub mod events; @@ -48,20 +49,24 @@ async fn main() -> Result<(), Error> { let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; let aws_clients = clients::AwsClients::new(&aws_config); - // let mut config = config::Config::load_from_env()?; - match mode { - TelemetryMode::Metrics => { - warn!("metrics telemetry mode not implemented"); - Ok(()) - } - TelemetryMode::Traces => { warn!("traces telemetry mode not implemented"); Ok(()) } + TelemetryMode::Metrics => { + warn!("metrics telemetry mode not implemented"); + // TODO: implement metrics + run(service_fn(|request: LambdaEvent| { + metrics::handler( + &aws_clients, + request, + ) + })).await + } + // default to logs telemetry mode _ => { info!("Running in logs telemetry mode"); @@ -75,7 +80,7 @@ async fn main() -> Result<(), Error> { let coralogix_exporter = crate::logs::set_up_coralogix_exporter(&conf)?; run(service_fn(|request: LambdaEvent| { - logs::function_handler( + logs::handler( &aws_clients, coralogix_exporter.clone(), &conf, diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..7fe5481 --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1,17 @@ + +use crate::clients::AwsClients; +use async_recursion::async_recursion; +use lambda_runtime::{Error, LambdaEvent}; +use crate::events; +use tracing::warn; + +#[async_recursion] +// metric telemetry handler +// TODO: implement +pub async fn handler( + _: &AwsClients, + _: LambdaEvent, +) -> Result<(), Error> { + warn!("metrics telemetry mode not implemented"); + Ok(()) +} From 405477c89fc536c8e307c9a35fa13f28210550ea Mon Sep 17 00:00:00 2001 From: Shaun Remekie Date: Thu, 24 Oct 2024 15:35:12 +0200 Subject: [PATCH 5/5] added changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9e6854..ea445a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v1.0.15 / 2024-01-24 +### 💡 Enhancements 💡 +- Internal code refactoring to isolate logs workflow from additional telemetry workflows to come. + ## v1.0.14 / 2024-01-10 ### 🧰 Bug fixes 🧰 - Allow matches with arn of aws secretmanager in govcloud, previously only matched with public cloud secretmanager arn