diff --git a/README.md b/README.md index 32ec951d..6a5bd197 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,6 @@ Environmental variables: | `max_inflight` | Yes | Limit the maximum number of requests in flight | | `mode` | Yes | The mode which of-watchdog operates in, Default `streaming` [see doc](#3-streaming-fork-modestreaming---default). Options are [http](#1-http-modehttp), [serialising fork](#2-serializing-fork-modeserializing), [streaming fork](#3-streaming-fork-modestreaming---default), [static](#4-static-modestatic) | | `prefix_logs` | Yes | When set to `true` the watchdog will add a prefix of "Date Time" + "stderr/stdout" to every line read from the function process. Default `true` | - +| `log_buffer_size` | The amount of bytes to read from stderr/stdout for log lines. When exceeded, the user will see an "bufio.Scanner: token too long" error. The default value is `bufio.MaxScanTokenSize` | > Note: the .lock file is implemented for health-checking, but cannot be disabled yet. You must create this file in /tmp/. diff --git a/config/config.go b/config/config.go index 76a9549b..1dcba00e 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ package config import ( + "bufio" "fmt" "log" "strconv" @@ -44,6 +45,9 @@ type WatchdogConfig struct { // PrefixLogs adds a date time stamp and the stdio name to any // logging from executing functions PrefixLogs bool + + // LogBufferSize is the size for scanning logs for stdout/stderr + LogBufferSize int } // Process returns a string for the process and a slice for the arguments from the FunctionProcess. @@ -67,6 +71,8 @@ func New(env []string) (WatchdogConfig, error) { upstreamURL string ) + logBufferSize := bufio.MaxScanTokenSize + // default behaviour for backwards compatibility prefixLogs := true if val, exists := envMap["prefix_logs"]; exists { @@ -108,6 +114,13 @@ func New(env []string) (WatchdogConfig, error) { healthcheckInterval = parseIntOrDurationValue(val, writeTimeout) } + if val, exists := envMap["log_buffer_size"]; exists { + var err error + if logBufferSize, err = strconv.Atoi(val); err != nil { + return WatchdogConfig{}, fmt.Errorf("invalid log_buffer_size value: %s, error: %w", val, err) + } + } + c := WatchdogConfig{ TCPPort: getInt(envMap, "port", 8080), HTTPReadTimeout: getDuration(envMap, "read_timeout", time.Second*10), @@ -125,7 +138,9 @@ func New(env []string) (WatchdogConfig, error) { MetricsPort: 8081, MaxInflight: getInt(envMap, "max_inflight", 0), PrefixLogs: prefixLogs, + LogBufferSize: logBufferSize, } + if val := envMap["mode"]; len(val) > 0 { c.OperationalMode = WatchdogModeConst(val) } diff --git a/config/config_test.go b/config/config_test.go index 48eeef7f..fe293ef5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -4,6 +4,7 @@ package config import ( + "bufio" "fmt" "testing" "time" @@ -14,6 +15,27 @@ func TestNew(t *testing.T) { if defaults.TCPPort != 8080 { t.Errorf("Want TCPPort: 8080, got: %d", defaults.TCPPort) } + +} + +func Test_LogBufferSize_Default(t *testing.T) { + env := []string{} + + actual, _ := New(env) + want := bufio.MaxScanTokenSize + if actual.LogBufferSize != want { + t.Errorf("Want %v. got: %v", want, actual.LogBufferSize) + } +} + +func Test_LogBufferSize_Override(t *testing.T) { + env := []string{"log_buffer_size=1024"} + + actual, _ := New(env) + want := 1024 + if actual.LogBufferSize != want { + t.Errorf("Want %v. got: %v", want, actual.LogBufferSize) + } } func Test_OperationalMode_Default(t *testing.T) { diff --git a/executor/streaming_runner.go b/executor/forking_runner.go similarity index 92% rename from executor/streaming_runner.go rename to executor/forking_runner.go index bef850aa..0d72154b 100644 --- a/executor/streaming_runner.go +++ b/executor/forking_runner.go @@ -29,8 +29,9 @@ type FunctionRequest struct { // ForkFunctionRunner forks a process for each invocation type ForkFunctionRunner struct { - ExecTimeout time.Duration - LogPrefix bool + ExecTimeout time.Duration + LogPrefix bool + LogBufferSize int } // Run run a fork for each invocation @@ -69,7 +70,7 @@ func (f *ForkFunctionRunner) Run(req FunctionRequest) error { errPipe, _ := cmd.StderrPipe() // Prints stderr to console and is picked up by container logging driver. - bindLoggingPipe("stderr", errPipe, os.Stderr, f.LogPrefix) + bindLoggingPipe("stderr", errPipe, os.Stderr, f.LogPrefix, f.LogBufferSize) startErr := cmd.Start() diff --git a/executor/http_runner.go b/executor/http_runner.go index 0b158804..05ca1875 100644 --- a/executor/http_runner.go +++ b/executor/http_runner.go @@ -34,6 +34,7 @@ type HTTPFunctionRunner struct { UpstreamURL *url.URL BufferHTTPBody bool LogPrefix bool + LogBufferSize int } // Start forks the process used for processing incoming requests @@ -57,8 +58,8 @@ func (f *HTTPFunctionRunner) Start() error { errPipe, _ := cmd.StderrPipe() // Logs lines from stderr and stdout to the stderr and stdout of this process - bindLoggingPipe("stderr", errPipe, os.Stderr, f.LogPrefix) - bindLoggingPipe("stdout", f.StdoutPipe, os.Stdout, f.LogPrefix) + bindLoggingPipe("stderr", errPipe, os.Stderr, f.LogPrefix, f.LogBufferSize) + bindLoggingPipe("stdout", f.StdoutPipe, os.Stdout, f.LogPrefix, f.LogBufferSize) f.Client = makeProxyClient(f.ExecTimeout) diff --git a/executor/logging.go b/executor/logging.go index a40ca48f..a76ba364 100644 --- a/executor/logging.go +++ b/executor/logging.go @@ -10,10 +10,17 @@ import ( ) // bindLoggingPipe spawns a goroutine for passing through logging of the given output pipe. -func bindLoggingPipe(name string, pipe io.Reader, output io.Writer, logPrefix bool) { +// +func bindLoggingPipe(name string, pipe io.Reader, output io.Writer, logPrefix bool, maxBufferSize int) { log.Printf("Started logging: %s from function.", name) scanner := bufio.NewScanner(pipe) + + size := bufio.MaxScanTokenSize + + buffer := make([]byte, size) + scanner.Buffer(buffer, size) + logFlags := log.Flags() prefix := log.Prefix() if logPrefix == false { diff --git a/main.go b/main.go index 39bb1cdf..a9b4aa26 100644 --- a/main.go +++ b/main.go @@ -175,11 +175,11 @@ func buildRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool) switch watchdogConfig.OperationalMode { case config.ModeStreaming: - requestHandler = makeForkRequestHandler(watchdogConfig, prefixLogs) + requestHandler = makeForkRequestHandler(watchdogConfig, prefixLogs, watchdogConfig.LogBufferSize) case config.ModeSerializing: requestHandler = makeSerializingForkRequestHandler(watchdogConfig, prefixLogs) case config.ModeHTTP: - requestHandler = makeHTTPRequestHandler(watchdogConfig, prefixLogs) + requestHandler = makeHTTPRequestHandler(watchdogConfig, prefixLogs, watchdogConfig.LogBufferSize) case config.ModeStatic: requestHandler = makeStaticRequestHandler(watchdogConfig) default: @@ -245,10 +245,11 @@ func makeSerializingForkRequestHandler(watchdogConfig config.WatchdogConfig, log } } -func makeForkRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool) func(http.ResponseWriter, *http.Request) { +func makeForkRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool, logBufferSize int) func(http.ResponseWriter, *http.Request) { functionInvoker := executor.ForkFunctionRunner{ - ExecTimeout: watchdogConfig.ExecTimeout, - LogPrefix: prefixLogs, + ExecTimeout: watchdogConfig.ExecTimeout, + LogPrefix: prefixLogs, + LogBufferSize: logBufferSize, } return func(w http.ResponseWriter, r *http.Request) { @@ -305,7 +306,7 @@ func getEnvironment(r *http.Request) []string { return envs } -func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool) func(http.ResponseWriter, *http.Request) { +func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs bool, logBufferSize int) func(http.ResponseWriter, *http.Request) { commandName, arguments := watchdogConfig.Process() functionInvoker := executor.HTTPFunctionRunner{ ExecTimeout: watchdogConfig.ExecTimeout, @@ -313,6 +314,7 @@ func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs boo ProcessArgs: arguments, BufferHTTPBody: watchdogConfig.BufferHTTPBody, LogPrefix: prefixLogs, + LogBufferSize: logBufferSize, } if len(watchdogConfig.UpstreamURL) == 0 {