Skip to content

Commit

Permalink
Allow for very long log lines
Browse files Browse the repository at this point in the history
With this environment-variable (log_buffer_size), very large
logs can be scanned and printed out to stdio.

This was reported by RateHub (an OpenFaaS Pro) user.

Tested with a 5MB JSON file and a Node.js process that
printed the value to console.log/console.error.

It worked as expected. After reducing the value to the
default, an error was returned.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Nov 8, 2021
1 parent aeccace commit 1c6daa7
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,6 @@ Environmental variables:
| `max_inflight` | Yes | Limit the maximum number of requests in flight |
| `mode` | Yes | The mode which of-watchdog operates in, Default `streaming` [see doc](#3-streaming-fork-modestreaming---default). Options are [http](#1-http-modehttp), [serialising fork](#2-serializing-fork-modeserializing), [streaming fork](#3-streaming-fork-modestreaming---default), [static](#4-static-modestatic) |
| `prefix_logs` | Yes | When set to `true` the watchdog will add a prefix of "Date Time" + "stderr/stdout" to every line read from the function process. Default `true` |

| `log_buffer_size` | The amount of bytes to read from stderr/stdout for log lines. When exceeded, the user will see an "bufio.Scanner: token too long" error. The default value is `bufio.MaxScanTokenSize` |

> Note: the .lock file is implemented for health-checking, but cannot be disabled yet. You must create this file in /tmp/.
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package config

import (
"bufio"
"fmt"
"log"
"strconv"
Expand Down Expand Up @@ -44,6 +45,9 @@ type WatchdogConfig struct {
// PrefixLogs adds a date time stamp and the stdio name to any
// logging from executing functions
PrefixLogs bool

// LogBufferSize is the size for scanning logs for stdout/stderr
LogBufferSize int
}

// Process returns a string for the process and a slice for the arguments from the FunctionProcess.
Expand All @@ -67,6 +71,8 @@ func New(env []string) (WatchdogConfig, error) {
upstreamURL string
)

logBufferSize := bufio.MaxScanTokenSize

// default behaviour for backwards compatibility
prefixLogs := true
if val, exists := envMap["prefix_logs"]; exists {
Expand Down Expand Up @@ -108,6 +114,13 @@ func New(env []string) (WatchdogConfig, error) {
healthcheckInterval = parseIntOrDurationValue(val, writeTimeout)
}

if val, exists := envMap["log_buffer_size"]; exists {
var err error
if logBufferSize, err = strconv.Atoi(val); err != nil {
return WatchdogConfig{}, fmt.Errorf("invalid log_buffer_size value: %s, error: %w", val, err)
}
}

c := WatchdogConfig{
TCPPort: getInt(envMap, "port", 8080),
HTTPReadTimeout: getDuration(envMap, "read_timeout", time.Second*10),
Expand All @@ -125,7 +138,9 @@ func New(env []string) (WatchdogConfig, error) {
MetricsPort: 8081,
MaxInflight: getInt(envMap, "max_inflight", 0),
PrefixLogs: prefixLogs,
LogBufferSize: logBufferSize,
}

if val := envMap["mode"]; len(val) > 0 {
c.OperationalMode = WatchdogModeConst(val)
}
Expand Down
22 changes: 22 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package config

import (
"bufio"
"fmt"
"testing"
"time"
Expand All @@ -14,6 +15,27 @@ func TestNew(t *testing.T) {
if defaults.TCPPort != 8080 {
t.Errorf("Want TCPPort: 8080, got: %d", defaults.TCPPort)
}

}

func Test_LogBufferSize_Default(t *testing.T) {
env := []string{}

actual, _ := New(env)
want := bufio.MaxScanTokenSize
if actual.LogBufferSize != want {
t.Errorf("Want %v. got: %v", want, actual.LogBufferSize)
}
}

func Test_LogBufferSize_Override(t *testing.T) {
env := []string{"log_buffer_size=1024"}

actual, _ := New(env)
want := 1024
if actual.LogBufferSize != want {
t.Errorf("Want %v. got: %v", want, actual.LogBufferSize)
}
}

func Test_OperationalMode_Default(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions executor/streaming_runner.go → executor/forking_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type FunctionRequest struct {

// ForkFunctionRunner forks a process for each invocation
type ForkFunctionRunner struct {
ExecTimeout time.Duration
LogPrefix bool
ExecTimeout time.Duration
LogPrefix bool
LogBufferSize int
}

// Run run a fork for each invocation
Expand Down Expand Up @@ -69,7 +70,7 @@ func (f *ForkFunctionRunner) Run(req FunctionRequest) error {
errPipe, _ := cmd.StderrPipe()

// Prints stderr to console and is picked up by container logging driver.
bindLoggingPipe("stderr", errPipe, os.Stderr, f.LogPrefix)
bindLoggingPipe("stderr", errPipe, os.Stderr, f.LogPrefix, f.LogBufferSize)

startErr := cmd.Start()

Expand Down
5 changes: 3 additions & 2 deletions executor/http_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type HTTPFunctionRunner struct {
UpstreamURL *url.URL
BufferHTTPBody bool
LogPrefix bool
LogBufferSize int
}

// Start forks the process used for processing incoming requests
Expand All @@ -57,8 +58,8 @@ func (f *HTTPFunctionRunner) Start() error {
errPipe, _ := cmd.StderrPipe()

// Logs lines from stderr and stdout to the stderr and stdout of this process
bindLoggingPipe("stderr", errPipe, os.Stderr, f.LogPrefix)
bindLoggingPipe("stdout", f.StdoutPipe, os.Stdout, f.LogPrefix)
bindLoggingPipe("stderr", errPipe, os.Stderr, f.LogPrefix, f.LogBufferSize)
bindLoggingPipe("stdout", f.StdoutPipe, os.Stdout, f.LogPrefix, f.LogBufferSize)

f.Client = makeProxyClient(f.ExecTimeout)

Expand Down
9 changes: 8 additions & 1 deletion executor/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ import (
)

// bindLoggingPipe spawns a goroutine for passing through logging of the given output pipe.
func bindLoggingPipe(name string, pipe io.Reader, output io.Writer, logPrefix bool) {
//
func bindLoggingPipe(name string, pipe io.Reader, output io.Writer, logPrefix bool, maxBufferSize int) {
log.Printf("Started logging: %s from function.", name)

scanner := bufio.NewScanner(pipe)

size := bufio.MaxScanTokenSize

buffer := make([]byte, size)
scanner.Buffer(buffer, size)

logFlags := log.Flags()
prefix := log.Prefix()
if logPrefix == false {
Expand Down
14 changes: 8 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ func buildRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool)

switch watchdogConfig.OperationalMode {
case config.ModeStreaming:
requestHandler = makeForkRequestHandler(watchdogConfig, prefixLogs)
requestHandler = makeForkRequestHandler(watchdogConfig, prefixLogs, watchdogConfig.LogBufferSize)
case config.ModeSerializing:
requestHandler = makeSerializingForkRequestHandler(watchdogConfig, prefixLogs)
case config.ModeHTTP:
requestHandler = makeHTTPRequestHandler(watchdogConfig, prefixLogs)
requestHandler = makeHTTPRequestHandler(watchdogConfig, prefixLogs, watchdogConfig.LogBufferSize)
case config.ModeStatic:
requestHandler = makeStaticRequestHandler(watchdogConfig)
default:
Expand Down Expand Up @@ -245,10 +245,11 @@ func makeSerializingForkRequestHandler(watchdogConfig config.WatchdogConfig, log
}
}

func makeForkRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool) func(http.ResponseWriter, *http.Request) {
func makeForkRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool, logBufferSize int) func(http.ResponseWriter, *http.Request) {
functionInvoker := executor.ForkFunctionRunner{
ExecTimeout: watchdogConfig.ExecTimeout,
LogPrefix: prefixLogs,
ExecTimeout: watchdogConfig.ExecTimeout,
LogPrefix: prefixLogs,
LogBufferSize: logBufferSize,
}

return func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -305,14 +306,15 @@ func getEnvironment(r *http.Request) []string {
return envs
}

func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool) func(http.ResponseWriter, *http.Request) {
func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool, logBufferSize int) func(http.ResponseWriter, *http.Request) {
commandName, arguments := watchdogConfig.Process()
functionInvoker := executor.HTTPFunctionRunner{
ExecTimeout: watchdogConfig.ExecTimeout,
Process: commandName,
ProcessArgs: arguments,
BufferHTTPBody: watchdogConfig.BufferHTTPBody,
LogPrefix: prefixLogs,
LogBufferSize: logBufferSize,
}

if len(watchdogConfig.UpstreamURL) == 0 {
Expand Down

0 comments on commit 1c6daa7

Please sign in to comment.