Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Apache Kafka log source #940

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ RUN apk add --update git make
WORKDIR /go/src/github.com/google/mtail
COPY . /go/src/github.com/google/mtail
RUN make depclean && make install_deps && PREFIX=/go make STATIC=y -B install

RUN apk add -U --no-cache ca-certificates

FROM scratch
COPY --from=builder /go/bin/mtail /usr/bin/mtail
COPY --from=alpine /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
ENTRYPOINT ["/usr/bin/mtail"]
EXPOSE 3903
WORKDIR /tmp


ARG version=0.0.0-local
Expand Down
135 changes: 135 additions & 0 deletions cmd/config-gen/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package main

import (
"bytes"
"flag"
"fmt"
"os"
"reflect"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/segmentio/kafka-go"
)

const (
KafkaConfig = "KafkaConfig"
S3Config = "S3Config"
)

var (
configType string
targetFile string
targetModule string
)

func init() {
flag.StringVar(&configType, "type", KafkaConfig, "The type of config to generate")
flag.StringVar(&targetFile, "file", "config_generated.go", "The target file to generate the config from")
flag.StringVar(&targetModule, "module", "main", "The target module name")
}

func main() {
flag.Parse()

ims := make(map[string]bool)

ims["net/url"] = true

var p interface{}

// file header buffer
hb := new(bytes.Buffer)

// function buffer
fb := new(bytes.Buffer)

fmt.Fprintf(hb, "// Code generated by github.com/wanmail/mtail/cmd/config-gen; DO NOT EDIT.\n")
fmt.Fprintf(hb, "package %s\n\n", targetModule)

switch configType {
case KafkaConfig:
ims["github.com/segmentio/kafka-go"] = true
p = kafka.ReaderConfig{}
fmt.Fprintf(fb, "func parse%s(u *url.URL, config *kafka.ReaderConfig) error {\n\n", configType)
case S3Config:
ims["github.com/aws/aws-sdk-go-v2/aws"] = true
p = aws.Config{}
fmt.Fprintf(fb, "func parse%s(u *url.URL, config *aws.Config) error {\n\n", configType)
}

v := reflect.ValueOf(p)
t := v.Type()

for i := 0; i < t.NumField(); i++ {
field := t.Field(i)

switch field.Type.Kind() {
case reflect.String:
switch field.Type.String() {
case "string":
fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name)
fmt.Fprintf(fb, " config.%s = %s\n", field.Name, field.Name)
fmt.Fprintf(fb, " }\n\n")
default:
fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name)
fmt.Fprintf(fb, " config.%s = %s(%s)\n", field.Name, field.Type.String(), field.Name)
fmt.Fprintf(fb, " }\n\n")
}

case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
ims["strconv"] = true
fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name)
fmt.Fprintf(fb, " i, err := strconv.Atoi(%s)\n", field.Name)
fmt.Fprintf(fb, " if err != nil {\n")
fmt.Fprintf(fb, " return err\n")
fmt.Fprintf(fb, " }\n")
switch field.Type.String() {
case "int":
fmt.Fprintf(fb, " config.%s = i\n", field.Name)
case "time.Duration":
ims["time"] = true
fmt.Fprintf(fb, " config.%s = time.Second * time.Duration(i)\n", field.Name)
default:
fmt.Fprintf(fb, " config.%s = %s(i)\n", field.Name, field.Type.String())
}
fmt.Fprintf(fb, " }\n\n")

case reflect.Bool:
fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name)
fmt.Fprintf(fb, " b, err := strconv.ParseBool(%s)\n", field.Name)
fmt.Fprintf(fb, " if err != nil {\n")
fmt.Fprintf(fb, " return err\n")
fmt.Fprintf(fb, " }\n")
fmt.Fprintf(fb, " config.%s = b\n", field.Name)
fmt.Fprintf(fb, " }\n\n")

case reflect.Slice:
if field.Type.Elem().Kind() == reflect.String {
ims["strings"] = true
fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name)
fmt.Fprintf(fb, " config.%s = strings.Split(%s, \",\")\n", field.Name, field.Name)
fmt.Fprintf(fb, " }\n\n")
}
default:
fmt.Fprintf(fb, " // %s is not supported\n\n", field.Name)

}
}

fmt.Fprintf(hb, "import (\n")
for im := range ims {
fmt.Fprintf(hb, " \"%s\"\n", im)
}
fmt.Fprintf(hb, ")\n\n")

fmt.Fprintf(fb, " return nil\n")
fmt.Fprintf(fb, "}\n")

hb.Write(fb.Bytes())

err := os.WriteFile(targetFile, hb.Bytes(), 0644)
if err != nil {
fmt.Println("Failed to write file:", err)
os.Exit(1)
}
}
10 changes: 10 additions & 0 deletions docs/Deploying.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ default.
Example: `mtail --progs /etc/mtail --logs /var/log/syslog --poll_interval 250ms
--poll_log_interval 250ms`

### Consume data in Apache Kafka

Use `--logs` flag to read data from Apache Kafka.

You need to convert kafka configuration into URL format.

You can refer to [parseKafkaURL](../internal//tailer/logstream/kafka.go#L26) function to write URL.

Example: `mtail --progs /etc/mtail --logs "kafka://test-group@localhost:9092/test-topic"`

### Setting garbage collection intervals

`mtail` accumulates metrics and log files during its operation. By default,
Expand Down
22 changes: 21 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,47 @@ go 1.21.1

require (
contrib.go.opencensus.io/exporter/jaeger v0.2.1
github.com/aws/aws-sdk-go-v2 v1.32.7
github.com/aws/aws-sdk-go-v2/config v1.28.7
github.com/aws/aws-sdk-go-v2/service/s3 v1.71.1
github.com/aws/smithy-go v1.22.1
github.com/golang/glog v1.2.2
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/google/go-cmp v0.6.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.4
github.com/prometheus/common v0.60.0
github.com/segmentio/kafka-go v0.4.47
go.opencensus.io v0.24.0
golang.org/x/sys v0.26.0
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.48 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.26 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.8 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sync v0.8.0 // indirect
google.golang.org/api v0.105.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.56.3 // indirect
Expand Down
Loading