Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for collecting and sending function logs to APM Server #303

Merged
merged 11 commits into from
Sep 19, 2022
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.1.0...main[View commits]
[float]
===== Features
- Disable CGO to prevent libc/ABI compatibility issues {lambda-pull}292[292]
- Add support for collecting and shipping function logs to APM Server {lambda-pull}303[303]

[float]
===== Bug fixes
Expand Down
12 changes: 9 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type App struct {

// New returns an App or an error if the
// creation failed.
func New(ctx context.Context, opts ...configOption) (*App, error) {
func New(ctx context.Context, opts ...ConfigOption) (*App, error) {
c := appConfig{}

for _, opt := range opts {
Expand All @@ -62,7 +62,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
return nil, err
}

apmServerApiKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger)
apmServerAPIKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger)
if err != nil {
return nil, err
}
Expand All @@ -75,11 +75,17 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
addr = c.logsapiAddr
}

subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform}
if c.enableFunctionLogSubscription {
subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function)
}

lc, err := logsapi.NewClient(
logsapi.WithLogsAPIBaseURL(fmt.Sprintf("http://%s", c.awsLambdaRuntimeAPI)),
logsapi.WithListenerAddress(addr),
logsapi.WithLogBuffer(100),
logsapi.WithLogger(app.logger),
logsapi.WithSubscriptionTypes(subscriptionLogStreams...),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,7 +130,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
apmOpts = append(apmOpts,
apmproxy.WithURL(os.Getenv("ELASTIC_APM_LAMBDA_APM_SERVER")),
apmproxy.WithLogger(app.logger),
apmproxy.WithAPIKey(apmServerApiKey),
apmproxy.WithAPIKey(apmServerAPIKey),
apmproxy.WithSecretToken(apmServerSecretToken),
)

Expand Down
37 changes: 24 additions & 13 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,56 +20,67 @@ package app
import "github.com/aws/aws-sdk-go-v2/aws"

type appConfig struct {
awsLambdaRuntimeAPI string
awsConfig aws.Config
extensionName string
disableLogsAPI bool
logLevel string
logsapiAddr string
awsLambdaRuntimeAPI string
awsConfig aws.Config
extensionName string
disableLogsAPI bool
enableFunctionLogSubscription bool
logLevel string
logsapiAddr string
}

type configOption func(*appConfig)
// ConfigOption is used to configure the lambda extension
type ConfigOption func(*appConfig)

// WithLambdaRuntimeAPI sets the AWS Lambda Runtime API
// endpoint (normally taken from $AWS_LAMBDA_RUNTIME_API),
// used by the AWS client.
func WithLambdaRuntimeAPI(api string) configOption {
func WithLambdaRuntimeAPI(api string) ConfigOption {
return func(c *appConfig) {
c.awsLambdaRuntimeAPI = api
}
}

// WithExtensionName sets the extension name.
func WithExtensionName(name string) configOption {
func WithExtensionName(name string) ConfigOption {
return func(c *appConfig) {
c.extensionName = name
}
}

// WithoutLogsAPI disables the logs api.
func WithoutLogsAPI() configOption {
func WithoutLogsAPI() ConfigOption {
return func(c *appConfig) {
c.disableLogsAPI = true
}
}

// WithFunctionLogSubscription enables the logs api subscription
// to function log stream. This option will only work if LogsAPI
// is not disabled by the WithoutLogsAPI config option.
func WithFunctionLogSubscription() ConfigOption {
return func(c *appConfig) {
c.enableFunctionLogSubscription = true
}
}

// WithLogLevel sets the log level.
func WithLogLevel(level string) configOption {
func WithLogLevel(level string) ConfigOption {
return func(c *appConfig) {
c.logLevel = level
}
}

// WithLogsapiAddress sets the listener address of the
// server listening for logs event.
func WithLogsapiAddress(s string) configOption {
func WithLogsapiAddress(s string) ConfigOption {
return func(c *appConfig) {
c.logsapiAddr = s
}
}

// WithAWSConfig sets the AWS config.
func WithAWSConfig(awsConfig aws.Config) configOption {
func WithAWSConfig(awsConfig aws.Config) ConfigOption {
return func(c *appConfig) {
c.awsConfig = awsConfig
}
Expand Down
18 changes: 13 additions & 5 deletions app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package app

import (
"context"
"github.com/elastic/apm-aws-lambda/apmproxy"
"github.com/elastic/apm-aws-lambda/extension"
"github.com/elastic/apm-aws-lambda/logsapi"
"fmt"
"sync"
"time"

"github.com/elastic/apm-aws-lambda/apmproxy"
"github.com/elastic/apm-aws-lambda/extension"
)

// Run runs the app.
Expand Down Expand Up @@ -57,7 +57,7 @@ func (app *App) Run(ctx context.Context) error {
}()

if app.logsClient != nil {
if err := app.logsClient.StartService([]logsapi.EventType{logsapi.Platform}, app.extensionClient.ExtensionID); err != nil {
if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
app.logger.Warnf("Error while subscribing to the Logs API: %v", err)

// disable logs API if the service failed to start
Expand Down Expand Up @@ -169,7 +169,15 @@ func (app *App) processEvent(
runtimeDone := make(chan struct{})
if app.logsClient != nil {
go func() {
if err := app.logsClient.ProcessLogs(invocationCtx, event.RequestID, app.apmClient, metadataContainer, runtimeDone, prevEvent); err != nil {
if err := app.logsClient.ProcessLogs(
invocationCtx,
event.RequestID,
event.InvokedFunctionArn,
app.apmClient,
metadataContainer,
runtimeDone,
prevEvent,
); err != nil {
app.logger.Errorf("Error while processing Lambda Logs ; %v", err)
} else {
close(runtimeDone)
Expand Down
32 changes: 24 additions & 8 deletions logsapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,33 @@ import (
"go.uber.org/zap"
)

// SubscriptionType represents the log streams that the Lambda Logs API
// provides for subscription
type SubscriptionType string

const (
// Platform logstream records events and errors related to
// invocations and extensions
Platform SubscriptionType = "platform"
// Function logstream records logs written by lambda function
// to stderr or stdout
Function SubscriptionType = "function"
// Extension logstream records logs generated by extension
Extension SubscriptionType = "extension"
)

// ClientOption is a config option for a Client.
type ClientOption func(*Client)

// Client is the client used to subscribe to the Logs API.
type Client struct {
httpClient *http.Client
logsAPIBaseURL string
logsChannel chan LogEvent
listenerAddr string
server *http.Server
logger *zap.SugaredLogger
httpClient *http.Client
logsAPIBaseURL string
logsAPISubscriptionTypes []SubscriptionType
logsChannel chan LogEvent
listenerAddr string
server *http.Server
logger *zap.SugaredLogger
}

// NewClient returns a new Client with the given URL.
Expand Down Expand Up @@ -69,7 +85,7 @@ func NewClient(opts ...ClientOption) (*Client, error) {
}

// StartService starts the HTTP server listening for log events and subscribes to the Logs API.
func (lc *Client) StartService(eventTypes []EventType, extensionID string) error {
func (lc *Client) StartService(extensionID string) error {
addr, err := lc.startHTTPServer()
if err != nil {
return err
Expand All @@ -93,7 +109,7 @@ func (lc *Client) StartService(eventTypes []EventType, extensionID string) error

uri := fmt.Sprintf("http://%s", net.JoinHostPort(host, port))

if err := lc.subscribe(eventTypes, extensionID, uri); err != nil {
if err := lc.subscribe(lc.logsAPISubscriptionTypes, extensionID, uri); err != nil {
if err := lc.Shutdown(); err != nil {
lc.logger.Warnf("failed to shutdown the server: %v", err)
}
Expand Down
20 changes: 14 additions & 6 deletions logsapi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package logsapi_test

import (
"bytes"
"github.com/elastic/apm-aws-lambda/logsapi"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/elastic/apm-aws-lambda/logsapi"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
Expand Down Expand Up @@ -103,13 +104,14 @@ func TestSubscribe(t *testing.T) {
}))
defer s.Close()

c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL))...)
cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithSubscriptionTypes(logsapi.Platform))
c, err := logsapi.NewClient(cOpts...)
require.NoError(t, err)

if tc.expectedErr {
require.Error(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo"))
require.Error(t, c.StartService("foo"))
} else {
require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo"))
require.NoError(t, c.StartService("foo"))
}

require.NoError(t, c.Shutdown())
Expand Down Expand Up @@ -141,9 +143,15 @@ func TestSubscribeAWSRequest(t *testing.T) {
}))
defer s.Close()

c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogBuffer(1))...)
cOpts := append(
tc.opts,
logsapi.WithLogsAPIBaseURL(s.URL),
logsapi.WithLogBuffer(1),
logsapi.WithSubscriptionTypes(logsapi.Platform, logsapi.Function),
)
c, err := logsapi.NewClient(cOpts...)
require.NoError(t, err)
require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "testID"))
require.NoError(t, c.StartService("testID"))

// Create a request to send to the logs listener
platformDoneEvent := `{
Expand Down
56 changes: 34 additions & 22 deletions logsapi/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,23 @@ import (
"github.com/elastic/apm-aws-lambda/extension"
)

// EventType represents the type of logs in Lambda
type EventType string
lahsivjar marked this conversation as resolved.
Show resolved Hide resolved
// LogEventType represents the log type that is received in the log messages
type LogEventType string

const (
// Platform is to receive logs emitted by the platform
Platform EventType = "platform"
// Function is to receive logs emitted by the function
Function EventType = "function"
// Extension is to receive logs emitted by the extension
Extension EventType = "extension"
)

// SubEventType is a Logs API sub event type
type SubEventType string

const (
// RuntimeDone event is sent when lambda function is finished it's execution
RuntimeDone SubEventType = "platform.runtimeDone"
Fault SubEventType = "platform.fault"
Report SubEventType = "platform.report"
Start SubEventType = "platform.start"
// PlatformRuntimeDone event is sent when lambda function is finished it's execution
PlatformRuntimeDone LogEventType = "platform.runtimeDone"
PlatformFault LogEventType = "platform.fault"
PlatformReport LogEventType = "platform.report"
PlatformStart LogEventType = "platform.start"
PlatformEnd LogEventType = "platform.end"
FunctionLog LogEventType = "function"
)

// LogEvent represents an event received from the Logs API
type LogEvent struct {
Time time.Time `json:"time"`
Type SubEventType `json:"type"`
Type LogEventType `json:"type"`
StringRecord string
Record LogEventRecord
}
Expand All @@ -68,19 +58,26 @@ type LogEventRecord struct {
func (lc *Client) ProcessLogs(
ctx context.Context,
requestID string,
invokedFnArn string,
apmClient *apmproxy.Client,
metadataContainer *apmproxy.MetadataContainer,
runtimeDoneSignal chan struct{},
prevEvent *extension.NextEventResponse,
) error {
// platformStartReqID is to identify the requestID for the function
// logs under the assumption that function logs for a specific request
// ID will be bounded by PlatformStart and PlatformEnd events.
var platformStartReqID string
for {
select {
case logEvent := <-lc.logsChannel:
lc.logger.Debugf("Received log event %v", logEvent.Type)
switch logEvent.Type {
case PlatformStart:
platformStartReqID = logEvent.Record.RequestID
// Check the logEvent for runtimeDone and compare the RequestID
// to the id that came in via the Next API
case RuntimeDone:
case PlatformRuntimeDone:
if logEvent.Record.RequestID == requestID {
lc.logger.Info("Received runtimeDone event for this function invocation")
runtimeDoneSignal <- struct{}{}
Expand All @@ -89,7 +86,7 @@ func (lc *Client) ProcessLogs(

lc.logger.Debug("Log API runtimeDone event request id didn't match")
// Check if the logEvent contains metrics and verify that they can be linked to the previous invocation
case Report:
case PlatformReport:
if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID {
lc.logger.Debug("Received platform report for the previous function invocation")
processedMetrics, err := ProcessPlatformReport(metadataContainer, prevEvent, logEvent)
Expand All @@ -102,6 +99,21 @@ func (lc *Client) ProcessLogs(
lc.logger.Warn("report event request id didn't match the previous event id")
lc.logger.Debug("Log API runtimeDone event request id didn't match")
}
case FunctionLog:
// TODO: @lahsivjar Buffer logs and send batches of data to APM-Server.
// Buffering should account for metadata being available before sending.
lc.logger.Debug("Received function log")
processedLog, err := ProcessFunctionLog(
metadataContainer,
platformStartReqID,
invokedFnArn,
logEvent,
)
if err != nil {
lc.logger.Errorf("Error processing function log : %v", err)
} else {
apmClient.EnqueueAPMData(processedLog)
}
}
case <-ctx.Done():
lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine")
Expand Down
Loading