Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Serialize for lambda telemetry #759

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 216 additions & 15 deletions lambda-extension/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use http::{Request, Response};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use lambda_runtime_api_client::body::Body;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::{boxed::Box, fmt, sync::Arc};
use tokio::sync::Mutex;
use tower::Service;
use tracing::{error, trace};

/// Payload received from the Telemetry API
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct LambdaTelemetry {
/// Time when the telemetry was generated
pub time: DateTime<Utc>,
Expand All @@ -20,7 +20,7 @@ pub struct LambdaTelemetry {
}

/// Record in a LambdaTelemetry entry
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(tag = "type", content = "record", rename_all = "lowercase")]
pub enum LambdaTelemetryRecord {
/// Function log records
Expand All @@ -37,8 +37,10 @@ pub enum LambdaTelemetryRecord {
/// Phase of initialisation
phase: InitPhase,
/// Lambda runtime version
#[serde(skip_serializing_if = "Option::is_none")]
runtime_version: Option<String>,
/// Lambda runtime version ARN
#[serde(skip_serializing_if = "Option::is_none")]
runtime_version_arn: Option<String>,
},
/// Platform init runtime done record
Expand All @@ -47,10 +49,12 @@ pub enum LambdaTelemetryRecord {
/// Type of initialization
initialization_type: InitType,
/// Phase of initialisation
#[serde(skip_serializing_if = "Option::is_none")]
phase: Option<InitPhase>,
/// Status of initalization
status: Status,
/// When the status = failure, the error_type describes what kind of error occurred
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
/// Spans
#[serde(default)]
Expand All @@ -75,8 +79,10 @@ pub enum LambdaTelemetryRecord {
/// Request identifier
request_id: String,
/// Version of the Lambda function
#[serde(skip_serializing_if = "Option::is_none")]
version: Option<String>,
/// Trace Context
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},
/// Record marking the completion of an invocation
Expand All @@ -87,13 +93,16 @@ pub enum LambdaTelemetryRecord {
/// Status of the invocation
status: Status,
/// When unsuccessful, the error_type describes what kind of error occurred
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
/// Metrics corresponding to the runtime
#[serde(skip_serializing_if = "Option::is_none")]
metrics: Option<RuntimeDoneMetrics>,
/// Spans
#[serde(default)]
spans: Vec<Span>,
/// Trace Context
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},
/// Platfor report record
Expand All @@ -104,13 +113,15 @@ pub enum LambdaTelemetryRecord {
/// Status of the invocation
status: Status,
/// When unsuccessful, the error_type describes what kind of error occurred
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
/// Metrics
metrics: ReportMetrics,
/// Spans
#[serde(default)]
spans: Vec<Span>,
/// Trace Context
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},

Expand Down Expand Up @@ -147,7 +158,7 @@ pub enum LambdaTelemetryRecord {
}

/// Type of Initialization
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum InitType {
/// Initialised on demand
Expand All @@ -159,7 +170,7 @@ pub enum InitType {
}

/// Phase in which initialization occurs
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum InitPhase {
/// Initialization phase
Expand All @@ -169,7 +180,7 @@ pub enum InitPhase {
}

/// Status of invocation/initialization
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum Status {
/// Success
Expand All @@ -183,7 +194,7 @@ pub enum Status {
}

/// Span
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Span {
/// Duration of the span
Expand All @@ -195,7 +206,7 @@ pub struct Span {
}

/// Tracing Context
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct TraceContext {
/// Span ID
Expand All @@ -207,23 +218,23 @@ pub struct TraceContext {
}

/// Type of tracing
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
pub enum TracingType {
/// Amazon trace type
#[serde(rename = "X-Amzn-Trace-Id")]
AmznTraceId,
}

///Init report metrics
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct InitReportMetrics {
/// Duration of initialization
pub duration_ms: f64,
}

/// Report metrics
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ReportMetrics {
/// Duration in milliseconds
Expand All @@ -237,15 +248,15 @@ pub struct ReportMetrics {
#[serde(rename = "maxMemoryUsedMB")]
pub max_memory_used_mb: u64,
/// Init duration in case of a cold start
#[serde(default = "Option::default")]
#[serde(default = "Option::default", skip_serializing_if = "Option::is_none")]
pub init_duration_ms: Option<f64>,
/// Restore duration in milliseconds
#[serde(default = "Option::default")]
#[serde(default = "Option::default", skip_serializing_if = "Option::is_none")]
pub restore_duration_ms: Option<f64>,
}

/// Runtime done metrics
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct RuntimeDoneMetrics {
/// Duration in milliseconds
Expand Down Expand Up @@ -303,7 +314,7 @@ where
}

#[cfg(test)]
mod tests {
mod deserialization_tests {
use super::*;
use chrono::{Duration, TimeZone};

Expand Down Expand Up @@ -459,3 +470,193 @@ mod tests {
),
}
}

#[cfg(test)]
mod serialization_tests {
use chrono::{Duration, TimeZone};

use super::*;
macro_rules! serialize_tests {
($($name:ident: $value:expr,)*) => {
$(
#[test]
fn $name() {
let (input, expected) = $value;
let actual = serde_json::to_string(&input).expect("unable to serialize");
println!("Input: {:?}\n", input);
println!("Expected:\n {:?}\n", expected);
println!("Actual:\n {:?}\n", actual);

assert!(actual == expected);
}
)*
}
}

serialize_tests! {
// function
function: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::Function("hello world".to_string()),
},
r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":"hello world"}"#,
),
// extension
extension: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::Extension("hello world".to_string()),
},
r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":"hello world"}"#,
),
//platform.Start
platform_start: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformStart {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
version: Some("$LATEST".to_string()),
tracing: Some(TraceContext{
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
}
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
// platform.initStart
platform_init_start: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitStart {
initialization_type: InitType::OnDemand,
phase: InitPhase::Init,
runtime_version: None,
runtime_version_arn: None,
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initStart","record":{"initializationType":"on-demand","phase":"init"}}"#,
),
// platform.runtimeDone
platform_runtime_done: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformRuntimeDone {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
status: Status::Success,
error_type: None,
metrics: Some(RuntimeDoneMetrics {
duration_ms: 2599.0,
produced_bytes: Some(8),
}),
spans: vec!(
Span {
name:"responseLatency".to_string(),
start: Utc
.with_ymd_and_hms(2022, 10, 21, 14, 5, 3)
.unwrap()
.checked_add_signed(Duration::milliseconds(165))
.unwrap(),
duration_ms: 2598.0
},
Span {
name:"responseDuration".to_string(),
start: Utc
.with_ymd_and_hms(2022, 10, 21, 14, 5, 5)
.unwrap()
.checked_add_signed(Duration::milliseconds(763))
.unwrap(),
duration_ms: 0.0
},
),
tracing: Some(TraceContext{
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.runtimeDone","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","metrics":{"durationMs":2599.0,"producedBytes":8},"spans":[{"durationMs":2598.0,"name":"responseLatency","start":"2022-10-21T14:05:03.165Z"},{"durationMs":0.0,"name":"responseDuration","start":"2022-10-21T14:05:05.763Z"}],"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
// platform.report
platform_report: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformReport {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
status: Status::Success,
error_type: None,
metrics: ReportMetrics {
duration_ms: 2599.4,
billed_duration_ms: 2600,
memory_size_mb:128,
max_memory_used_mb:94,
init_duration_ms: Some(549.04),
restore_duration_ms: None,
},
spans: Vec::new(),
tracing: Some(TraceContext {
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.report","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","metrics":{"durationMs":2599.4,"billedDurationMs":2600,"memorySizeMB":128,"maxMemoryUsedMB":94,"initDurationMs":549.04},"spans":[],"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
// platform.telemetrySubscription
platform_telemetry_subscription: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformTelemetrySubscription {
name: "my-extension".to_string(),
state: "Subscribed".to_string(),
types: vec!("platform".to_string(), "function".to_string()),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.telemetrySubscription","record":{"name":"my-extension","state":"Subscribed","types":["platform","function"]}}"#,
),
// platform.initRuntimeDone
platform_init_runtime_done: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitRuntimeDone {
initialization_type: InitType::OnDemand,
status: Status::Success,
phase: None,
error_type: None,
spans: Vec::new(),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initRuntimeDone","record":{"initializationType":"on-demand","status":"success","spans":[]}}"#,
),
// platform.extension
platform_extension: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformExtension {
name: "my-extension".to_string(),
state: "Ready".to_string(),
events: vec!("SHUTDOWN".to_string(), "INVOKE".to_string()),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.extension","record":{"name":"my-extension","state":"Ready","events":["SHUTDOWN","INVOKE"]}}"#,
),
// platform.initReport
platform_init_report: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitReport {
initialization_type: InitType::OnDemand,
phase: InitPhase::Init,
metrics: InitReportMetrics { duration_ms: 500.0 },
spans: Vec::new(),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initReport","record":{"initializationType":"on-demand","phase":"init","metrics":{"durationMs":500.0},"spans":[]}}"#,
),

}
}