diff --git a/Gopkg.lock b/Gopkg.lock index 4fb9a6b..d2340fb 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -263,6 +263,15 @@ revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" version = "v2.2.1" +[[projects]] + branch = "master" + digest = "1:41da83b5ccde4d8c66fa0c8590cf3947c51e1f42ba6eb450e91d2703abb47a9a" + name = "github.com/stvp/go-udp-testing" + packages = ["."] + pruneopts = "" + revision = "c4434f09ec131ecf30f986d5dcb1636508bfa49a" + + [solve-meta] analyzer-name = "dep" analyzer-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index af90e94..8dcba46 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -43,3 +43,7 @@ [[constraint]] name = "gopkg.in/alecthomas/kingpin.v2" version = "~2.1.11" + +[[constraint]] + branch = "master" + name = "github.com/stvp/go-udp-testing" \ No newline at end of file diff --git a/Makefile b/Makefile index 3dab99e..5cacfa3 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,8 @@ compile: linux32 linux64 darwin64 test: ginkgo -r -v . + # For tests that aren't ginkgo based + go test ./logging linux32: CGO_ENABLED=0 GOARCH=386 GOOS=linux go build --ldflags="-X main.version=${BUILD_NUMBER}" -o dist/linux/386/firehose-to-syslog_linux_386 @@ -22,4 +24,4 @@ docker-dev: $(SHELL) ./Docker/build-dev.sh docker-final: - $(SHELL) ./Docker/build.sh \ No newline at end of file + $(SHELL) ./Docker/build.sh \ No newline at end of file diff --git a/README.md b/README.md index a31e624..9c3a974 100644 --- a/README.md +++ b/README.md @@ -67,8 +67,8 @@ Flags: --path-prof="" Set the Path to write profiling file --log-formatter-type=LOG-FORMATTER-TYPE Log formatter type to use. Valid options - are text, json. If none provided, defaults - to json. + are text, json, json-cee. If none + provided, defaults to json. --cert-pem-syslog="" Certificate Pem file --ignore-missing-apps Enable throttling on cache lookup for missing apps @@ -85,7 +85,10 @@ Since v3 firehose-to-syslog support TLS syslog `--cert-pem-syslog` using PEM enc Please refer to https://github.com/RackSec/srslog/blob/master/script/gen-certs.py for Cert generation. +# JSON CEE rsyslog compatibility +rsyslog supports parsing JSON log messages throught its [mmjsonparse module](https://www.rsyslog.com/doc/master/configuration/modules/mmjsonparse.html). This module expects a parsable JSON log message to be prepended with the value `@cee:`. +For `LOG-FORMATTER-TYPE`, using `json-cee` will yield the same result as using `json` but the JSON sent will have `@cee:` prepended to it. The `json-cee` formatter type also ensures that a RFC3164 compatible log message is sent, which was found to be required by rsyslog when using the mmjsonparse module. # Event documentation @@ -107,7 +110,7 @@ We have 3 caching strategies: cd $GOPATH/src/github.com/cloudfoundry-community/firehose-to-syslog # Test - ginkgo -r . + make test # Build binary go build @@ -186,7 +189,7 @@ Showing top 10 nodes out of 44 (cum >= 20ms) cf set-env firehose-to-syslog FIREHOSE_SUBSCRIPTION_ID firehose-to-syslog cf set-env firehose-to-syslog FIREHOSE_CLIENT_ID [your doppler.firehose enabled client id] cf set-env firehose-to-syslog FIREHOSE_CLIENT_SECRET [your doppler.firehose enabled client secret] - cf set-env firehose-to-syslog LOG_FORMATTER_TYPE [Log formatter type to use. Valid options are : text, json] + cf set-env firehose-to-syslog LOG_FORMATTER_TYPE [Log formatter type to use. Valid options are : text, json, json-cee] ``` 1. Turn off the health check if you're staging to Diego. ``` diff --git a/cli.go b/cli.go index a4379c5..e874589 100644 --- a/cli.go +++ b/cli.go @@ -44,7 +44,7 @@ var ( orgs = kingpin.Flag("orgs", "Forwarded on the app logs from theses organisations' example: --orgs=org1,org2").Default("").Envar("ORGS").String() modeProf = kingpin.Flag("mode-prof", "Enable profiling mode, one of [cpu, mem, block]").Default("").Envar("MODE_PROF").String() pathProf = kingpin.Flag("path-prof", "Set the Path to write profiling file").Default("").Envar("PATH_PROF").String() - logFormatterType = kingpin.Flag("log-formatter-type", "Log formatter type to use. Valid options are text, json. If none provided, defaults to json.").Envar("LOG_FORMATTER_TYPE").String() + logFormatterType = kingpin.Flag("log-formatter-type", "Log formatter type to use. Valid options are text, json, json-cee. If none provided, defaults to json.").Envar("LOG_FORMATTER_TYPE").String() certPath = kingpin.Flag("cert-pem-syslog", "Certificate Pem file").Envar("CERT_PEM").Default("").String() ignoreMissingApps = kingpin.Flag("ignore-missing-apps", "Enable throttling on cache lookup for missing apps").Envar("IGNORE_MISSING_APPS").Default("false").Bool() stripAppSuffixes = kingpin.Flag("strip-app-name-suffixes", "Suffixes that should be stripped from application names, comma separated").Envar("STRIP_APP_NAME_SUFFIXES").Default("").String() diff --git a/logging/logging_logrus.go b/logging/logging_logrus.go index 2ef6200..575ef73 100644 --- a/logging/logging_logrus.go +++ b/logging/logging_logrus.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "os" + "time" syslog "github.com/RackSec/srslog" logrus_syslog "github.com/shinji62/logrus-syslog-ng" @@ -32,6 +33,18 @@ func NewLogging(SyslogServerFlag string, SysLogProtocolFlag string, LogFormatter } } +// This srslog formatter is based on srslog/formatter.go's RFC3164Formatter. +// The default rsyslog input module expects RFC3164 formatted logs, using srslog's DefaultFormatter with "@cee:" wasn't +// compatible enough to get it working. +// some historical context on the efforts to standardize the syslog message +// format(s): https://www.rsyslog.com/doc/syslog_parsing.html +func CeeFormatter(p syslog.Priority, hostname, tag, content string) string { + timestamp := time.Now().Format(time.Stamp) + msg := fmt.Sprintf("<%d>%s %s %s[%d]: @cee: %s", + p, timestamp, hostname, tag, os.Getpid(), content) + return msg +} + func (l *LoggingLogrus) Connect() bool { success := false @@ -55,6 +68,9 @@ func (l *LoggingLogrus) Connect() bool { if err != nil { LogError(fmt.Sprintf("Unable to connect to syslog server [%s]!\n", l.syslogServer), err.Error()) } else { + if l.logFormatterType == "json-cee" { + hook.(*logrus_syslog.SyslogHook).Writer.SetFormatter(CeeFormatter) + } LogStd(fmt.Sprintf("Received hook to syslog server [%s]!\n", l.syslogServer), false) l.Logger.Hooks.Add(hook) success = true diff --git a/logging/logging_suite_test.go b/logging/logging_suite_test.go index aed5305..0b9c696 100644 --- a/logging/logging_suite_test.go +++ b/logging/logging_suite_test.go @@ -1,6 +1,11 @@ package logging import ( + "encoding/json" + "fmt" + "github.com/stvp/go-udp-testing" + "math/rand" + "strings" "testing" . "github.com/onsi/ginkgo" @@ -13,6 +18,28 @@ func TestEvents(t *testing.T) { RunSpecs(t, "Logging Suite") } +// Can't use ginkgo because go-udp-testing requires to be passed testing.T +func TestJSONCeeLoggingFormatter(t *testing.T) { + RegisterTestingT(t) + + port := rand.Intn(65535 - 1080) + 1081 + listeningUDPSocket := fmt.Sprintf("127.0.0.1:%d", port) + + udp.SetAddr(listeningUDPSocket) + result := udp.ReceiveString(t, func() { + logging := NewLogging(listeningUDPSocket, "udp", "json-cee", "", false, true) + logging.Connect() + logging.ShipEvents(nil, "msg field content") + }) + + Ω(result).Should(ContainSubstring("@cee:")) + + jsonPayload := strings.Split(result, "@cee:")[1] + var payload map[string]interface{} + json.Unmarshal([]byte(jsonPayload), &payload) + Ω(payload["msg"]).Should(Equal("msg field content")) +} + var _ = Describe("Logging", func() { Describe("SetupLogging", func() { Context("called with a Text formatter", func() { diff --git a/vendor/github.com/stvp/go-udp-testing/.gitignore b/vendor/github.com/stvp/go-udp-testing/.gitignore new file mode 100644 index 0000000..ebf8aae --- /dev/null +++ b/vendor/github.com/stvp/go-udp-testing/.gitignore @@ -0,0 +1 @@ +go-udp-testing.test diff --git a/vendor/github.com/stvp/go-udp-testing/.travis.yml b/vendor/github.com/stvp/go-udp-testing/.travis.yml new file mode 100644 index 0000000..f1309c9 --- /dev/null +++ b/vendor/github.com/stvp/go-udp-testing/.travis.yml @@ -0,0 +1,2 @@ +language: go + diff --git a/vendor/github.com/stvp/go-udp-testing/LICENSE b/vendor/github.com/stvp/go-udp-testing/LICENSE new file mode 100644 index 0000000..2fd91fe --- /dev/null +++ b/vendor/github.com/stvp/go-udp-testing/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Stovepipe Studios, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/stvp/go-udp-testing/README.md b/vendor/github.com/stvp/go-udp-testing/README.md new file mode 100644 index 0000000..a7fa17d --- /dev/null +++ b/vendor/github.com/stvp/go-udp-testing/README.md @@ -0,0 +1,73 @@ +go-udp-testing +============== + +[![Build Status](https://travis-ci.org/stvp/go-udp-testing.png?branch=master)](https://travis-ci.org/stvp/go-udp-testing) + +Provides UDP socket test helpers for Go. + +[Documentation](http://godoc.org/github.com/stvp/go-udp-testing) + +Examples +-------- + +```go +package main + +import ( + "github.com/stvp/go-udp-testing" + "testing" +) + +func TestStatsdReporting(t *testing.T) { + udp.SetAddr(":8125") + + udp.ShouldReceiveOnly(t, "mystat:2|g", func() { + statsd.Gauge("mystat", 2) + }) + + udp.ShouldNotReceiveOnly(t, "mystat:1|c", func() { + statsd.Gauge("bukkit", 2) + }) + + udp.ShouldReceive(t, "bar:2|g", func() { + statsd.Gauge("foo", 2) + statsd.Gauge("bar", 2) + statsd.Gauge("baz", 2) + }) + + udp.ShouldNotReceive(t, "bar:2|g", func() { + statsd.Gauge("foo", 2) + statsd.Gauge("baz", 2) + }) + + expected := []string{ + "bar:2|g", + "baz:5|g", + } + udp.ShouldReceiveAll(t, expected, func() { + statsd.Gauge("bar", 2) + statsd.Gauge("baz", 2) + }) + + unexpected := []string{ + "bar", + "baz", + } + udp.ShouldNotReceiveAny(t, unexpected, func() { + statsd.Gauge("foo", 1) + }) + + expected := []string{ "" } + "bar:2|g", + "baz:5|g", + } + unexpected := []string{ + "foo", + } + udp.ShouldReceiveAllAndNotReceiveAny(t, expected, unexpected, func() { + statsd.Gauge("bar", 2) + statsd.Gauge("baz", 5) + }) +} +``` + diff --git a/vendor/github.com/stvp/go-udp-testing/udp.go b/vendor/github.com/stvp/go-udp-testing/udp.go new file mode 100644 index 0000000..a7a5a55 --- /dev/null +++ b/vendor/github.com/stvp/go-udp-testing/udp.go @@ -0,0 +1,191 @@ +// Package udp implements UDP test helpers. It lets you assert that certain +// strings must or must not be sent to a given local UDP listener. +package udp + +import ( + "net" + "runtime" + "strings" + "testing" + "time" +) + +var ( + addr *string + listener *net.UDPConn + Timeout time.Duration = time.Millisecond +) + +type fn func() + +// SetAddr sets the UDP port that will be listened on. +func SetAddr(a string) { + addr = &a +} + +func start(t *testing.T) { + resAddr, err := net.ResolveUDPAddr("udp", *addr) + if err != nil { + t.Fatal(err) + } + listener, err = net.ListenUDP("udp", resAddr) + if err != nil { + t.Fatal(err) + } +} + +func stop(t *testing.T) { + if err := listener.Close(); err != nil { + t.Fatal(err) + } +} + +func getMessage(t *testing.T, body fn) string { + start(t) + defer stop(t) + + body() + + message := make([]byte, 1024*32) + var bufLen int + for { + listener.SetReadDeadline(time.Now().Add(Timeout)) + n, _, _ := listener.ReadFrom(message[bufLen:]) + if n == 0 { + break + } else { + bufLen += n + } + } + + return string(message[0:bufLen]) +} + +func get(t *testing.T, match string, body fn) (got string, equals bool, contains bool) { + got = getMessage(t, body) + equals = got == match + contains = strings.Contains(got, match) + return got, equals, contains +} + +func printLocation(t *testing.T) { + _, file, line, _ := runtime.Caller(2) + t.Errorf("At: %s:%d", file, line) +} + +// ShouldReceiveOnly will fire a test error if the given function doesn't send +// exactly the given string over UDP. +func ShouldReceiveOnly(t *testing.T, expected string, body fn) { + got, equals, _ := get(t, expected, body) + if !equals { + printLocation(t) + t.Errorf("Expected: %#v", expected) + t.Errorf("But got: %#v", got) + } +} + +// ShouldNotReceiveOnly will fire a test error if the given function sends +// exactly the given string over UDP. +func ShouldNotReceiveOnly(t *testing.T, notExpected string, body fn) { + _, equals, _ := get(t, notExpected, body) + if equals { + printLocation(t) + t.Errorf("Expected not to get: %#v", notExpected) + } +} + +// ShouldReceive will fire a test error if the given function doesn't send the +// given string over UDP. +func ShouldReceive(t *testing.T, expected string, body fn) { + got, _, contains := get(t, expected, body) + if !contains { + printLocation(t) + t.Errorf("Expected to find: %#v", expected) + t.Errorf("But got: %#v", got) + } +} + +// ShouldNotReceive will fire a test error if the given function sends the +// given string over UDP. +func ShouldNotReceive(t *testing.T, expected string, body fn) { + got, _, contains := get(t, expected, body) + if contains { + printLocation(t) + t.Errorf("Expected not to find: %#v", expected) + t.Errorf("But got: %#v", got) + } +} + +// ShouldReceiveAll will fire a test error unless all of the given strings are +// sent over UDP. +func ShouldReceiveAll(t *testing.T, expected []string, body fn) { + got := getMessage(t, body) + failed := false + + for _, str := range expected { + if !strings.Contains(got, str) { + if !failed { + printLocation(t) + failed = true + } + t.Errorf("Expected to find: %#v", str) + } + } + + if failed { + t.Errorf("But got: %#v", got) + } +} + +// ShouldNotReceiveAny will fire a test error if any of the given strings are +// sent over UDP. +func ShouldNotReceiveAny(t *testing.T, unexpected []string, body fn) { + got := getMessage(t, body) + failed := false + + for _, str := range unexpected { + if strings.Contains(got, str) { + if !failed { + printLocation(t) + failed = true + } + t.Errorf("Expected not to find: %#v", str) + } + } + + if failed { + t.Errorf("But got: %#v", got) + } +} + +func ShouldReceiveAllAndNotReceiveAny(t *testing.T, expected []string, unexpected []string, body fn) { + got := getMessage(t, body) + failed := false + + for _, str := range expected { + if !strings.Contains(got, str) { + if !failed { + printLocation(t) + failed = true + } + t.Errorf("Expected to find: %#v", str) + } + } + for _, str := range unexpected { + if strings.Contains(got, str) { + if !failed { + printLocation(t) + failed = true + } + t.Errorf("Expected not to find: %#v", str) + } + } + + if failed { + t.Errorf("but got: %#v", got) + } +} + +func ReceiveString(t *testing.T, body fn) string { + return getMessage(t, body) +} diff --git a/vendor/github.com/stvp/go-udp-testing/udp_test.go b/vendor/github.com/stvp/go-udp-testing/udp_test.go new file mode 100644 index 0000000..4303ff2 --- /dev/null +++ b/vendor/github.com/stvp/go-udp-testing/udp_test.go @@ -0,0 +1,101 @@ +package udp + +import ( + "net" + "testing" + "time" +) + +var ( + testAddr = ":8126" +) + +func setup(t *testing.T) net.Conn { + udpClient, err := net.DialTimeout("udp", testAddr, time.Second) + if err != nil { + t.Fatal(err) + } + + SetAddr(testAddr) + + return udpClient +} + +func TestAll(t *testing.T) { + udpClient := setup(t) + + testValues := [][]interface{}{ + []interface{}{"foo", "foo", true, true}, + []interface{}{"foo", "bar", false, false}, + []interface{}{"foo", "foobar", false, true}, + []interface{}{"foo", "", false, false}, + []interface{}{"", "", true, true}, + } + + for _, values := range testValues { + shouldGet := values[0].(string) + sendString := values[1].(string) + shouldEquals := values[2].(bool) + shouldContains := values[3].(bool) + + got, equals, contains := get(t, shouldGet, func() { + udpClient.Write([]byte(sendString)) + }) + + if got != sendString { + t.Errorf("Should've got %#v but got %#v", sendString, got) + } + if equals != shouldEquals { + t.Errorf("Equals should've been %#v but was %#v", shouldEquals, equals) + } + if contains != shouldContains { + t.Errorf("Contains should've been %#v but was %#v", shouldContains, contains) + } + } + + ShouldReceiveOnly(t, "foo", func() { + udpClient.Write([]byte("foo")) + }) + + ShouldNotReceiveOnly(t, "bar", func() { + udpClient.Write([]byte("foo")) + }) + + ShouldReceive(t, "foo", func() { + udpClient.Write([]byte("barfoo")) + }) + + ShouldNotReceive(t, "bar", func() { + udpClient.Write([]byte("fooba")) + }) + + ShouldReceiveAll(t, []string{"foo", "bar"}, func() { + udpClient.Write([]byte("foobizbar")) + }) + + ShouldNotReceiveAny(t, []string{"fooby", "bars"}, func() { + udpClient.Write([]byte("foobizbar")) + }) + + ShouldReceiveAllAndNotReceiveAny(t, []string{"foo", "bar"}, []string{"fooby", "bars"}, func() { + udpClient.Write([]byte("foo")) + udpClient.Write([]byte("biz")) + udpClient.Write([]byte("bar")) + }) + + // This should fail, but it also shouldn't stall out + // ShouldReceive(t, "foo", func() {}) +} + +func TestRaceConditionInReadingResults(t *testing.T) { + udpClient := setup(t) + + ShouldReceiveAllAndNotReceiveAny(t, []string{"foo", "bar", "biz"}, []string{"fooby", "bars"}, func() { + time.Sleep(time.Millisecond * 100) + udpClient.Write([]byte("foo")) + time.Sleep(time.Millisecond * 200) + udpClient.Write([]byte("biz")) + time.Sleep(time.Millisecond * 500) + udpClient.Write([]byte("bar")) + }) +}