Skip to content

Commit

Permalink
lumberjack: added support for transport compression (#76)
Browse files Browse the repository at this point in the history
Co-authored-by: Heiko Reese <[email protected]>
  • Loading branch information
hreese and Heiko Reese authored Jun 13, 2023
1 parent 6f659c4 commit c5af4ff
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
9 changes: 7 additions & 2 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,10 @@ no encryption), `tls://` (TLS encryption) or `tlsnoverify://` (TLS encryption wi
certificate verification). The schema is followed by the hostname or IP address, a colon `:`,
and a port number. IPv6 addresses must be surrounded by square brackets.

Transport compression is disabled by default. Use `compression` to set the compression level
for all hosts. Compression levels can vary between 0 (no compression) and 9 (maximum compression).
To set per-host transport compression adding `?compression=<level>` to the server URI.

To prevent blocking, flows are buffered in a channel between the segment and the output
go routines. Each output go routine maintains a buffer of flows which are send either when the
buffer is full or after a configurable timeout. Proper parameter sizing for the queue,
Expand All @@ -936,13 +940,14 @@ These options help to observe the performance characteristics of the segment:

To see debug output, set the `-l debug` flag when starting `flowpipeline`.

See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for legal duration format
See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for proper duration format
strings and [strconv.ParseBool](https://pkg.go.dev/strconv#ParseBool) for allowed bool keywords.

```
- segment: lumberjack
config:
servers: tcp://foo.example.com:5044, tls://bar.example.com:5044, tlsnoverify://[2001:db8::1]:5044
servers: tcp://foo.example.com:5044, tls://bar.example.com:5044?compression=3, tlsnoverify://[2001:db8::1]:5044
compression: 0
batchsize: 1024
queuesize: = 2048
batchtimeout: "2000ms"
Expand Down
2 changes: 1 addition & 1 deletion segments/output/lumberjack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *resilientClient) connect() {
}
// try connecting indefinitely
for {
c.sc, err = lumber.SyncDialWith(dialFunc, c.ServerName, lumber.JSONEncoder(jsonEncoderWrapper))
c.sc, err = lumber.SyncDialWith(dialFunc, c.ServerName, lumber.JSONEncoder(jsonEncoderWrapper), lumber.CompressionLevel(c.Options.CompressionLevel))
if err == nil {
return
}
Expand Down
44 changes: 41 additions & 3 deletions segments/output/lumberjack/lumberjack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
type ServerOptions struct {
UseTLS bool
VerifyCertificate bool
CompressionLevel int
}

type Lumberjack struct {
Expand All @@ -44,10 +45,25 @@ func DoDebugPrintf(format string, v ...any) {

func (segment *Lumberjack) New(config map[string]string) segments.Segment {
var (
err error
buflen int
err error
buflen int
defaultCompression int
)

// parse default compression level
defaultCompressionString := config["compression"]
if defaultCompressionString == "" {
defaultCompression = 0
} else {
defaultCompression, err = strconv.Atoi(defaultCompressionString)
if err != nil {
log.Fatalf("[error] Lumberjack: Failed to parse default compression level %s: %s", defaultCompressionString, err)
}
if defaultCompression < 0 || defaultCompression > 9 {
log.Fatalf("[error] Lumberjack: Default compression level %d is out of range", defaultCompression)
}
}

// parse server URLs
rawServerStrings := strings.Split(config["servers"], ",")
for idx, serverName := range rawServerStrings {
Expand All @@ -62,6 +78,9 @@ func (segment *Lumberjack) New(config map[string]string) segments.Segment {
if err != nil {
log.Fatalf("[error] Lumberjack: Failed to parse server URL %s: %s", rawServerString, err)
}
urlQueryParams := serverURL.Query()

// parse TLS options
var useTLS, verifyTLS bool
switch serverURL.Scheme {
case "tcp":
Expand All @@ -76,9 +95,28 @@ func (segment *Lumberjack) New(config map[string]string) segments.Segment {
default:
log.Fatalf("[error] Lumberjack: Unknown scheme %s in server URL %s", serverURL.Scheme, rawServerString)
}

// parse compression level
var compressionLevel int
compressionString := urlQueryParams.Get("compression")

if compressionString == "" {
// use global default if not specified
compressionLevel = defaultCompression
} else {
compressionLevel, err = strconv.Atoi(compressionString)
if err != nil {
log.Fatalf("[error] Lumberjack: Failed to parse compression level %s for host %s: %s", compressionString, serverURL.Host, err)
}
if compressionLevel < 0 || compressionLevel > 9 {
log.Fatalf("[error] Lumberjack: Compression level %d out of range for host %s", compressionLevel, serverURL.Host)
}
}

segment.Servers[serverURL.Host] = ServerOptions{
UseTLS: useTLS,
VerifyCertificate: verifyTLS,
CompressionLevel: compressionLevel,
}
}
}
Expand Down Expand Up @@ -190,7 +228,7 @@ func (segment *Lumberjack) Run(wg *sync.WaitGroup) {
// connect to lumberjack server
client := NewResilientClient(server, options, segment.ReconnectWait)
defer client.Close()
log.Printf("[info] Lumberjack: Connected to %s (TLS: %v, VerifyTLS: %v)", server, options.UseTLS, options.VerifyCertificate)
log.Printf("[info] Lumberjack: Connected to %s (TLS: %v, VerifyTLS: %v, Compression: %d)", server, options.UseTLS, options.VerifyCertificate, options.CompressionLevel)

flowInterface := make([]interface{}, segment.BatchSize)
idx := 0
Expand Down

0 comments on commit c5af4ff

Please sign in to comment.