From 92a729c10d0d6212461eb0ec4305512a3371b29a Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Thu, 4 Jul 2024 17:59:03 +0200 Subject: [PATCH 01/13] refactor: Merge all `Lines` channel functions into one base struct. --- internal/tailer/logstream/base.go | 18 ++++++++++++++++++ internal/tailer/logstream/dgramstream.go | 11 +++-------- internal/tailer/logstream/filestream.go | 11 +++-------- internal/tailer/logstream/pipestream.go | 11 +++-------- internal/tailer/logstream/socketstream.go | 11 +++-------- 5 files changed, 30 insertions(+), 32 deletions(-) create mode 100644 internal/tailer/logstream/base.go diff --git a/internal/tailer/logstream/base.go b/internal/tailer/logstream/base.go new file mode 100644 index 000000000..0f8669de2 --- /dev/null +++ b/internal/tailer/logstream/base.go @@ -0,0 +1,18 @@ +// Copyright 2024 Google Inc. All Rights Reserved. +// This file is available under the Apache license. + +package logstream + +import ( + "github.com/google/mtail/internal/logline" +) + +type streamBase struct { + lines chan *logline.LogLine // outbound channel for lines +} + +// Lines returns the output log line channel for this stream. The stream is +// completed when this channel closes. +func (s *streamBase) Lines() <-chan *logline.LogLine { + return s.lines +} diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index 058b8174f..7059b019e 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -16,9 +16,9 @@ import ( ) type dgramStream struct { - cancel context.CancelFunc + streamBase - lines chan *logline.LogLine + cancel context.CancelFunc scheme string // Datagram scheme, either "unixgram" or "udp". address string // Given name for the underlying socket path on the filesystem or hostport. @@ -34,7 +34,7 @@ func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, return nil, ErrEmptySocketAddress } ctx, cancel := context.WithCancel(ctx) - ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)} + ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), streamBase: streamBase{lines: make(chan *logline.LogLine)}} if err := ss.stream(ctx, wg, waker, oneShot); err != nil { return nil, err } @@ -145,8 +145,3 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak }() return nil } - -// Lines implements the LogStream interface, returning the output lines channel. -func (ds *dgramStream) Lines() <-chan *logline.LogLine { - return ds.lines -} diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index ad2807baa..d7779a4fb 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -34,9 +34,9 @@ var fileTruncates = expvar.NewMap("file_truncates_total") // a new goroutine and closes itself down. The shared context is used for // cancellation. type fileStream struct { - cancel context.CancelFunc + streamBase - lines chan *logline.LogLine + cancel context.CancelFunc pathname string // Given name for the underlying file on the filesystem @@ -49,7 +49,7 @@ type fileStream struct { // newFileStream creates a new log stream from a regular file. func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, oneShot OneShotMode) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) - fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)} + fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), streamBase: streamBase{lines: make(chan *logline.LogLine)}} // Stream from the start of the file when in one shot mode. streamFromStart := oneShot == OneShotEnabled if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { @@ -262,8 +262,3 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake <-started return nil } - -// Lines implements the LogStream interface, returning the output lines channel. -func (fs *fileStream) Lines() <-chan *logline.LogLine { - return fs.lines -} diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index 240ae99f1..79458ae9e 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -17,9 +17,9 @@ import ( ) type pipeStream struct { - cancel context.CancelFunc + streamBase - lines chan *logline.LogLine + cancel context.CancelFunc pathname string // Given name for the underlying named pipe on the filesystem @@ -33,7 +33,7 @@ type pipeStream struct { // `pathname` must already be verified as clean. func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) - ps := &pipeStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)} + ps := &pipeStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), streamBase: streamBase{lines: make(chan *logline.LogLine)}} if err := ps.stream(ctx, wg, waker, fi); err != nil { return nil, err } @@ -134,8 +134,3 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake }() return nil } - -// Lines implements the LogStream interface, returning the output lines channel. -func (ps *pipeStream) Lines() <-chan *logline.LogLine { - return ps.lines -} diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 81b30d2d8..46c2ed1a5 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -16,9 +16,9 @@ import ( ) type socketStream struct { - cancel context.CancelFunc + streamBase - lines chan *logline.LogLine + cancel context.CancelFunc oneShot OneShotMode scheme string // URL Scheme to listen with, either tcp or unix @@ -35,7 +35,7 @@ func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, return nil, ErrEmptySocketAddress } ctx, cancel := context.WithCancel(ctx) - ss := &socketStream{cancel: cancel, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)} + ss := &socketStream{cancel: cancel, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), streamBase: streamBase{lines: make(chan *logline.LogLine)}} if err := ss.stream(ctx, wg, waker); err != nil { return nil, err } @@ -163,8 +163,3 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake } } } - -// Lines implements the LogStream interface, returning the output lines channel. -func (ss *socketStream) Lines() <-chan *logline.LogLine { - return ss.lines -} From 2f8cc049886a5928e4fcd24b618c5a955d93c9bd Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Thu, 4 Jul 2024 18:20:54 +0200 Subject: [PATCH 02/13] refactor: Move the decode and send functions into `streamBase` --- internal/tailer/logstream/decode.go | 6 +++--- internal/tailer/logstream/dgramstream.go | 8 ++++---- internal/tailer/logstream/filestream.go | 10 +++++----- internal/tailer/logstream/pipestream.go | 4 ++-- internal/tailer/logstream/socketstream.go | 4 ++-- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/internal/tailer/logstream/decode.go b/internal/tailer/logstream/decode.go index d3bb8da4b..8c3421847 100644 --- a/internal/tailer/logstream/decode.go +++ b/internal/tailer/logstream/decode.go @@ -17,7 +17,7 @@ import ( var logLines = expvar.NewMap("log_lines_total") // decodeAndSend transforms the byte array `b` into unicode in `partial`, sending to the llp as each newline is decoded. -func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) int { +func (s *streamBase) decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) int { var ( r rune width int @@ -50,7 +50,7 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname case r == '\r': // nom case r == '\n': - sendLine(ctx, pathname, partial, lines) + s.sendLine(ctx, pathname, partial, lines) default: partial.WriteRune(r) } @@ -60,7 +60,7 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname return count } -func sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) { +func (s *streamBase) sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) { glog.V(2).Infof("sendline") logLines.Add(pathname, 1) lines <- logline.New(ctx, pathname, partial.String()) diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index 7059b019e..1c60df76b 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -88,7 +88,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if oneShot { glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ds.scheme, ds.address) if partial.Len() > 0 { - sendLine(ctx, ds.address, partial, ds.lines) + ds.sendLine(ctx, ds.address, partial, ds.lines) } return } @@ -96,7 +96,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak case <-ctx.Done(): glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ds.scheme, ds.address) if partial.Len() > 0 { - sendLine(ctx, ds.address, partial, ds.lines) + ds.sendLine(ctx, ds.address, partial, ds.lines) } return default: @@ -106,7 +106,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if n > 0 { total += n //nolint:contextcheck - decodeAndSend(ctx, ds.lines, ds.address, n, b[:n], partial) + ds.decodeAndSend(ctx, ds.lines, ds.address, n, b[:n], partial) ds.mu.Lock() ds.lastReadTime = time.Now() ds.mu.Unlock() @@ -120,7 +120,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - sendLine(ctx, ds.address, partial, ds.lines) + ds.sendLine(ctx, ds.address, partial, ds.lines) } glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ds.scheme, ds.address, err) return diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index d7779a4fb..d0e6327f4 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -111,7 +111,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.V(2).Infof("stream(%s): decode and send", fs.pathname) needSend := lastBytes needSend = append(needSend, b[:count]...) - sendCount := decodeAndSend(ctx, fs.lines, fs.pathname, len(needSend), needSend, partial) + sendCount := fs.decodeAndSend(ctx, fs.lines, fs.pathname, len(needSend), needSend, partial) if sendCount < len(needSend) { lastBytes = append([]byte{}, needSend[sendCount:]...) } else { @@ -163,7 +163,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if os.IsNotExist(serr) { glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.pathname) if partial.Len() > 0 { - sendLine(ctx, fs.pathname, partial, fs.lines) + fs.sendLine(ctx, fs.pathname, partial, fs.lines) } close(fs.lines) return @@ -198,7 +198,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.V(2).Infof("stream(%s): truncate? currentoffset is %d and size is %d", fs.pathname, currentOffset, newfi.Size()) // About to lose all remaining data because of the truncate so flush the accumulator. if partial.Len() > 0 { - sendLine(ctx, fs.pathname, partial, fs.lines) + fs.sendLine(ctx, fs.pathname, partial, fs.lines) } p, serr := fd.Seek(0, io.SeekStart) if serr != nil { @@ -219,7 +219,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Exit now, because oneShot means read only to EOF. glog.V(2).Infof("stream(%s): EOF in one shot mode, exiting", fs.pathname) if partial.Len() > 0 { - sendLine(ctx, fs.pathname, partial, fs.lines) + fs.sendLine(ctx, fs.pathname, partial, fs.lines) } close(fs.lines) return @@ -228,7 +228,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake case <-ctx.Done(): glog.V(2).Infof("stream(%s): context has been cancelled, exiting", fs.pathname) if partial.Len() > 0 { - sendLine(ctx, fs.pathname, partial, fs.lines) + fs.sendLine(ctx, fs.pathname, partial, fs.lines) } close(fs.lines) return diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index 79458ae9e..e34121ae7 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -98,7 +98,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if n > 0 { total += n - decodeAndSend(ctx, ps.lines, ps.pathname, n, b[:n], partial) + ps.decodeAndSend(ctx, ps.lines, ps.pathname, n, b[:n], partial) // Update the last read time if we were able to read anything. ps.mu.Lock() ps.lastReadTime = time.Now() @@ -114,7 +114,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Test to see if we should exit. if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - sendLine(ctx, ps.pathname, partial, ps.lines) + ps.sendLine(ctx, ps.pathname, partial, ps.lines) } glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.pathname, err) return diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 46c2ed1a5..07cf92f8b 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -130,7 +130,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if n > 0 { total += n //nolint:contextcheck - decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial) + ss.decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial) ss.mu.Lock() ss.lastReadTime = time.Now() ss.mu.Unlock() @@ -144,7 +144,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - sendLine(ctx, ss.address, partial, ss.lines) + ss.sendLine(ctx, ss.address, partial, ss.lines) } glog.V(2).Infof("stream(%s:%s): exiting, conn has error %s", ss.scheme, ss.address, err) From 226c24d61059946348748ab95bfbb47de1a5da83 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Thu, 4 Jul 2024 18:23:21 +0200 Subject: [PATCH 03/13] refactor: Remove `lines` parameter from the decode functions. --- internal/tailer/logstream/decode.go | 8 ++++---- internal/tailer/logstream/dgramstream.go | 8 ++++---- internal/tailer/logstream/filestream.go | 10 +++++----- internal/tailer/logstream/pipestream.go | 4 ++-- internal/tailer/logstream/socketstream.go | 4 ++-- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/internal/tailer/logstream/decode.go b/internal/tailer/logstream/decode.go index 8c3421847..2f548d4c1 100644 --- a/internal/tailer/logstream/decode.go +++ b/internal/tailer/logstream/decode.go @@ -17,7 +17,7 @@ import ( var logLines = expvar.NewMap("log_lines_total") // decodeAndSend transforms the byte array `b` into unicode in `partial`, sending to the llp as each newline is decoded. -func (s *streamBase) decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) int { +func (s *streamBase) decodeAndSend(ctx context.Context, pathname string, n int, b []byte, partial *bytes.Buffer) int { var ( r rune width int @@ -50,7 +50,7 @@ func (s *streamBase) decodeAndSend(ctx context.Context, lines chan<- *logline.Lo case r == '\r': // nom case r == '\n': - s.sendLine(ctx, pathname, partial, lines) + s.sendLine(ctx, pathname, partial) default: partial.WriteRune(r) } @@ -60,9 +60,9 @@ func (s *streamBase) decodeAndSend(ctx context.Context, lines chan<- *logline.Lo return count } -func (s *streamBase) sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) { +func (s *streamBase) sendLine(ctx context.Context, pathname string, partial *bytes.Buffer) { glog.V(2).Infof("sendline") logLines.Add(pathname, 1) - lines <- logline.New(ctx, pathname, partial.String()) + s.lines <- logline.New(ctx, pathname, partial.String()) partial.Reset() } diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index 1c60df76b..c524a2e06 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -88,7 +88,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if oneShot { glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ds.scheme, ds.address) if partial.Len() > 0 { - ds.sendLine(ctx, ds.address, partial, ds.lines) + ds.sendLine(ctx, ds.address, partial) } return } @@ -96,7 +96,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak case <-ctx.Done(): glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ds.scheme, ds.address) if partial.Len() > 0 { - ds.sendLine(ctx, ds.address, partial, ds.lines) + ds.sendLine(ctx, ds.address, partial) } return default: @@ -106,7 +106,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if n > 0 { total += n //nolint:contextcheck - ds.decodeAndSend(ctx, ds.lines, ds.address, n, b[:n], partial) + ds.decodeAndSend(ctx, ds.address, n, b[:n], partial) ds.mu.Lock() ds.lastReadTime = time.Now() ds.mu.Unlock() @@ -120,7 +120,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - ds.sendLine(ctx, ds.address, partial, ds.lines) + ds.sendLine(ctx, ds.address, partial) } glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ds.scheme, ds.address, err) return diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index d0e6327f4..3128cbab2 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -111,7 +111,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.V(2).Infof("stream(%s): decode and send", fs.pathname) needSend := lastBytes needSend = append(needSend, b[:count]...) - sendCount := fs.decodeAndSend(ctx, fs.lines, fs.pathname, len(needSend), needSend, partial) + sendCount := fs.decodeAndSend(ctx, fs.pathname, len(needSend), needSend, partial) if sendCount < len(needSend) { lastBytes = append([]byte{}, needSend[sendCount:]...) } else { @@ -163,7 +163,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if os.IsNotExist(serr) { glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.pathname) if partial.Len() > 0 { - fs.sendLine(ctx, fs.pathname, partial, fs.lines) + fs.sendLine(ctx, fs.pathname, partial) } close(fs.lines) return @@ -198,7 +198,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.V(2).Infof("stream(%s): truncate? currentoffset is %d and size is %d", fs.pathname, currentOffset, newfi.Size()) // About to lose all remaining data because of the truncate so flush the accumulator. if partial.Len() > 0 { - fs.sendLine(ctx, fs.pathname, partial, fs.lines) + fs.sendLine(ctx, fs.pathname, partial) } p, serr := fd.Seek(0, io.SeekStart) if serr != nil { @@ -219,7 +219,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Exit now, because oneShot means read only to EOF. glog.V(2).Infof("stream(%s): EOF in one shot mode, exiting", fs.pathname) if partial.Len() > 0 { - fs.sendLine(ctx, fs.pathname, partial, fs.lines) + fs.sendLine(ctx, fs.pathname, partial) } close(fs.lines) return @@ -228,7 +228,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake case <-ctx.Done(): glog.V(2).Infof("stream(%s): context has been cancelled, exiting", fs.pathname) if partial.Len() > 0 { - fs.sendLine(ctx, fs.pathname, partial, fs.lines) + fs.sendLine(ctx, fs.pathname, partial) } close(fs.lines) return diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index e34121ae7..7d2cefa45 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -98,7 +98,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if n > 0 { total += n - ps.decodeAndSend(ctx, ps.lines, ps.pathname, n, b[:n], partial) + ps.decodeAndSend(ctx, ps.pathname, n, b[:n], partial) // Update the last read time if we were able to read anything. ps.mu.Lock() ps.lastReadTime = time.Now() @@ -114,7 +114,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Test to see if we should exit. if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - ps.sendLine(ctx, ps.pathname, partial, ps.lines) + ps.sendLine(ctx, ps.pathname, partial) } glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.pathname, err) return diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 07cf92f8b..eb09b8e65 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -130,7 +130,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if n > 0 { total += n //nolint:contextcheck - ss.decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial) + ss.decodeAndSend(ctx, ss.address, n, b[:n], partial) ss.mu.Lock() ss.lastReadTime = time.Now() ss.mu.Unlock() @@ -144,7 +144,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - ss.sendLine(ctx, ss.address, partial, ss.lines) + ss.sendLine(ctx, ss.address, partial) } glog.V(2).Infof("stream(%s:%s): exiting, conn has error %s", ss.scheme, ss.address, err) From 32e4004c7624010b210d61bc025af4a8f94454fd Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 6 Jul 2024 13:16:53 +0200 Subject: [PATCH 04/13] refactor: Remove the `pathname` parameter to `sendLine`. This makes all the log invocations identical as well as we don't need to construct a name for dgram and socket streams. --- internal/tailer/logstream/base.go | 2 + internal/tailer/logstream/decode.go | 10 +-- internal/tailer/logstream/dgramstream.go | 37 +++++--- .../tailer/logstream/dgramstream_unix_test.go | 4 +- internal/tailer/logstream/filestream.go | 90 ++++++++++--------- internal/tailer/logstream/pipestream.go | 28 +++--- internal/tailer/logstream/socketstream.go | 38 +++++--- .../logstream/socketstream_unix_test.go | 4 +- 8 files changed, 126 insertions(+), 87 deletions(-) diff --git a/internal/tailer/logstream/base.go b/internal/tailer/logstream/base.go index 0f8669de2..4188886f9 100644 --- a/internal/tailer/logstream/base.go +++ b/internal/tailer/logstream/base.go @@ -8,6 +8,8 @@ import ( ) type streamBase struct { + sourcename string // human readable name of the logstream source + lines chan *logline.LogLine // outbound channel for lines } diff --git a/internal/tailer/logstream/decode.go b/internal/tailer/logstream/decode.go index 2f548d4c1..d0db5bea4 100644 --- a/internal/tailer/logstream/decode.go +++ b/internal/tailer/logstream/decode.go @@ -17,7 +17,7 @@ import ( var logLines = expvar.NewMap("log_lines_total") // decodeAndSend transforms the byte array `b` into unicode in `partial`, sending to the llp as each newline is decoded. -func (s *streamBase) decodeAndSend(ctx context.Context, pathname string, n int, b []byte, partial *bytes.Buffer) int { +func (s *streamBase) decodeAndSend(ctx context.Context, n int, b []byte, partial *bytes.Buffer) int { var ( r rune width int @@ -50,7 +50,7 @@ func (s *streamBase) decodeAndSend(ctx context.Context, pathname string, n int, case r == '\r': // nom case r == '\n': - s.sendLine(ctx, pathname, partial) + s.sendLine(ctx, partial) default: partial.WriteRune(r) } @@ -60,9 +60,9 @@ func (s *streamBase) decodeAndSend(ctx context.Context, pathname string, n int, return count } -func (s *streamBase) sendLine(ctx context.Context, pathname string, partial *bytes.Buffer) { +func (s *streamBase) sendLine(ctx context.Context, partial *bytes.Buffer) { glog.V(2).Infof("sendline") - logLines.Add(pathname, 1) - s.lines <- logline.New(ctx, pathname, partial.String()) + logLines.Add(s.sourcename, 1) + s.lines <- logline.New(ctx, s.sourcename, partial.String()) partial.Reset() } diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index c524a2e06..f1691c3a0 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -6,6 +6,7 @@ package logstream import ( "bytes" "context" + "fmt" "net" "sync" "time" @@ -34,7 +35,15 @@ func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, return nil, ErrEmptySocketAddress } ctx, cancel := context.WithCancel(ctx) - ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), streamBase: streamBase{lines: make(chan *logline.LogLine)}} + ss := &dgramStream{cancel: cancel, + scheme: scheme, + address: address, + lastReadTime: time.Now(), + streamBase: streamBase{ + sourcename: fmt.Sprintf("%s://%s", scheme, address), + lines: make(chan *logline.LogLine), + }, + } if err := ss.stream(ctx, wg, waker, oneShot); err != nil { return nil, err } @@ -50,7 +59,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak logErrors.Add(ds.address, 1) return err } - glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ds.scheme, ds.address, c) + glog.V(2).Infof("stream(%s): opened new datagram socket %v", ds.sourcename, c) b := make([]byte, datagramReadBufferSize) partial := bytes.NewBufferString("") var total int @@ -58,8 +67,8 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak go func() { defer wg.Done() defer func() { - glog.V(2).Infof("stream(%s:%s): read total %d bytes", ds.scheme, ds.address, total) - glog.V(2).Infof("stream(%s:%s): closing connection", ds.scheme, ds.address) + glog.V(2).Infof("stream(%s): read total %d bytes", ds.sourcename, total) + glog.V(2).Infof("stream(%s): closing connection", ds.sourcename) err := c.Close() if err != nil { logErrors.Add(ds.address, 1) @@ -75,7 +84,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak for { n, _, err := c.ReadFrom(b) - glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ds.scheme, ds.address, n, err) + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ds.sourcename, n, err) if ds.staleTimer != nil { ds.staleTimer.Stop() @@ -86,17 +95,17 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak // equivalent to an "EOF" in connection and file oriented streams. if n == 0 { if oneShot { - glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ds.scheme, ds.address) + glog.V(2).Infof("stream(%s): exiting because zero byte read and one shot", ds.sourcename) if partial.Len() > 0 { - ds.sendLine(ctx, ds.address, partial) + ds.sendLine(ctx, partial) } return } select { case <-ctx.Done(): - glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ds.scheme, ds.address) + glog.V(2).Infof("stream(%s): exiting because zero byte read after cancellation", ds.sourcename) if partial.Len() > 0 { - ds.sendLine(ctx, ds.address, partial) + ds.sendLine(ctx, partial) } return default: @@ -106,7 +115,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if n > 0 { total += n //nolint:contextcheck - ds.decodeAndSend(ctx, ds.address, n, b[:n], partial) + ds.decodeAndSend(ctx, n, b[:n], partial) ds.mu.Lock() ds.lastReadTime = time.Now() ds.mu.Unlock() @@ -120,14 +129,14 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - ds.sendLine(ctx, ds.address, partial) + ds.sendLine(ctx, partial) } - glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ds.scheme, ds.address, err) + glog.V(2).Infof("stream(%s): exiting, stream has error %s", ds.sourcename, err) return } // Yield and wait - glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address) + glog.V(2).Infof("stream(%s): waiting", ds.sourcename) select { case <-ctx.Done(): // Exit after next read attempt. @@ -139,7 +148,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak glog.V(2).Infof("stream(%s): context cancelled, exiting after next zero byte read", ds.scheme, ds.address) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address) + glog.V(2).Infof("stream(%s): Wake received", ds.sourcename) } } }() diff --git a/internal/tailer/logstream/dgramstream_unix_test.go b/internal/tailer/logstream/dgramstream_unix_test.go index c9e2347dd..0899dfe65 100644 --- a/internal/tailer/logstream/dgramstream_unix_test.go +++ b/internal/tailer/logstream/dgramstream_unix_test.go @@ -49,7 +49,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { testutil.FatalIfErr(t, err) expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: addr, Line: "1"}, + {Context: context.TODO(), Filename: sockName, Line: "1"}, } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines()) @@ -99,7 +99,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { testutil.FatalIfErr(t, err) expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: addr, Line: "1"}, + {Context: context.TODO(), Filename: sockName, Line: "1"}, } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines()) diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index 3128cbab2..c844ea2c1 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -49,7 +49,15 @@ type fileStream struct { // newFileStream creates a new log stream from a regular file. func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, oneShot OneShotMode) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) - fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), streamBase: streamBase{lines: make(chan *logline.LogLine)}} + fs := &fileStream{ + cancel: cancel, + pathname: pathname, + lastReadTime: time.Now(), + streamBase: streamBase{ + sourcename: pathname, + lines: make(chan *logline.LogLine), + }, + } // Stream from the start of the file when in one shot mode. streamFromStart := oneShot == OneShotEnabled if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { @@ -61,23 +69,23 @@ func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, p func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, fi os.FileInfo, oneShot OneShotMode, streamFromStart bool) error { fd, err := os.OpenFile(fs.pathname, os.O_RDONLY, 0o600) if err != nil { - logErrors.Add(fs.pathname, 1) + logErrors.Add(fs.sourcename, 1) return err } - logOpens.Add(fs.pathname, 1) - glog.V(2).Infof("stream(%s): opened new file", fs.pathname) + logOpens.Add(fs.sourcename, 1) + glog.V(2).Infof("stream(%s): opened new file", fs.sourcename) if !streamFromStart { // Normal operation for first stream is to ignore the past, and seek to // EOF immediately to start tailing. if _, err := fd.Seek(0, io.SeekEnd); err != nil { - logErrors.Add(fs.pathname, 1) + logErrors.Add(fs.sourcename, 1) if err := fd.Close(); err != nil { - logErrors.Add(fs.pathname, 1) - glog.Infof("stream(%s): closing file: %v", fs.pathname, err) + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): closing file: %v", fs.sourcename, err) } return err } - glog.V(2).Infof("stream(%s): seeked to end", fs.pathname) + glog.V(2).Infof("stream(%s): seeked to end", fs.sourcename) } b := make([]byte, defaultReadBufferSize) var lastBytes []byte @@ -88,19 +96,19 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake go func() { defer wg.Done() defer func() { - glog.V(2).Infof("stream(%s): read total %d bytes", fs.pathname, total) - glog.V(2).Infof("stream(%s): closing file descriptor", fs.pathname) + glog.V(2).Infof("stream(%s): read total %d bytes", fs.sourcename, total) + glog.V(2).Infof("stream(%s): closing file descriptor", fs.sourcename) if err := fd.Close(); err != nil { - logErrors.Add(fs.pathname, 1) - glog.Infof("stream(%s): closing file: %v", fs.pathname, err) + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): closing file: %v", fs.sourcename, err) } - logCloses.Add(fs.pathname, 1) + logCloses.Add(fs.sourcename, 1) }() close(started) for { // Blocking read but regular files will return EOF straight away. count, err := fd.Read(b) - glog.V(2).Infof("stream(%s): read %d bytes, err is %v", fs.pathname, count, err) + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", fs.sourcename, count, err) if fs.staleTimer != nil { fs.staleTimer.Stop() @@ -108,10 +116,10 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if count > 0 { total += count - glog.V(2).Infof("stream(%s): decode and send", fs.pathname) + glog.V(2).Infof("stream(%s): decode and send", fs.sourcename) needSend := lastBytes needSend = append(needSend, b[:count]...) - sendCount := fs.decodeAndSend(ctx, fs.pathname, len(needSend), needSend, partial) + sendCount := fs.decodeAndSend(ctx, len(needSend), needSend, partial) if sendCount < len(needSend) { lastBytes = append([]byte{}, needSend[sendCount:]...) } else { @@ -129,25 +137,25 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } if err != nil && err != io.EOF { - logErrors.Add(fs.pathname, 1) + logErrors.Add(fs.sourcename, 1) // TODO: This could be generalised to check for any retryable // errors, and end on unretriables; e.g. ESTALE looks // retryable. if errors.Is(err, syscall.ESTALE) { - glog.Infof("stream(%s): reopening stream due to %s", fs.pathname, err) + glog.Infof("stream(%s): reopening stream due to %s", fs.sourcename, err) // streamFromStart always true on a stream reopen if nerr := fs.stream(ctx, wg, waker, fi, oneShot, true); nerr != nil { - glog.Infof("stream(%s): new stream: %v", fs.pathname, nerr) + glog.Infof("stream(%s): new stream: %v", fs.sourcename, nerr) } // Close this stream. return } - glog.Infof("stream(%s): read error: %v", fs.pathname, err) + glog.Infof("stream(%s): read error: %v", fs.sourcename, err) } // If we have read no bytes and are at EOF, check for truncation and rotation. if err == io.EOF && count == 0 { - glog.V(2).Infof("stream(%s): eof an no bytes", fs.pathname) + glog.V(2).Infof("stream(%s): eof an no bytes", fs.sourcename) // Both rotation and truncation need to stat, so check for // rotation first. It is assumed that rotation is the more // common change pattern anyway. @@ -161,33 +169,33 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // us we're deleted because the tailer can only tell us to // cancel. if os.IsNotExist(serr) { - glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.pathname) + glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.sourcename) if partial.Len() > 0 { - fs.sendLine(ctx, fs.pathname, partial) + fs.sendLine(ctx, partial) } close(fs.lines) return } - logErrors.Add(fs.pathname, 1) + logErrors.Add(fs.sourcename, 1) goto Sleep } if !os.SameFile(fi, newfi) { - glog.V(2).Infof("stream(%s): adding a new file routine", fs.pathname) + glog.V(2).Infof("stream(%s): adding a new file routine", fs.sourcename) // Stream from start always true on a stream reopen if err := fs.stream(ctx, wg, waker, newfi, oneShot, true); err != nil { - glog.Info("stream(%s): new stream: %v", fs.pathname, err) + glog.Info("stream(%s): new stream: %v", fs.sourcename, err) } // We're at EOF so there's nothing left to read here. return } currentOffset, serr := fd.Seek(0, io.SeekCurrent) if serr != nil { - logErrors.Add(fs.pathname, 1) + logErrors.Add(fs.sourcename, 1) glog.Info(serr) continue } - glog.V(2).Infof("stream(%s): current seek is %d", fs.pathname, currentOffset) - glog.V(2).Infof("stream(%s): new size is %d", fs.pathname, newfi.Size()) + glog.V(2).Infof("stream(%s): current seek is %d", fs.sourcename, currentOffset) + glog.V(2).Infof("stream(%s): new size is %d", fs.sourcename, newfi.Size()) // We know that newfi is from the current file. Truncation can // only be detected if the new file is currently shorter than // the current seek offset. In test this can be a race, but in @@ -195,18 +203,18 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // than the previous after rotation in the time it takes for // mtail to notice. if newfi.Size() < currentOffset { - glog.V(2).Infof("stream(%s): truncate? currentoffset is %d and size is %d", fs.pathname, currentOffset, newfi.Size()) + glog.V(2).Infof("stream(%s): truncate? currentoffset is %d and size is %d", fs.sourcename, currentOffset, newfi.Size()) // About to lose all remaining data because of the truncate so flush the accumulator. if partial.Len() > 0 { - fs.sendLine(ctx, fs.pathname, partial) + fs.sendLine(ctx, partial) } p, serr := fd.Seek(0, io.SeekStart) if serr != nil { - logErrors.Add(fs.pathname, 1) - glog.Infof("stream(%s): seek: %v", fs.pathname, serr) + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): seek: %v", fs.sourcename, serr) } - glog.V(2).Infof("stream(%s): Seeked to %d", fs.pathname, p) - fileTruncates.Add(fs.pathname, 1) + glog.V(2).Infof("stream(%s): Seeked to %d", fs.sourcename, p) + fileTruncates.Add(fs.sourcename, 1) continue } } @@ -217,18 +225,18 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if err == io.EOF { if oneShot == OneShotEnabled { // Exit now, because oneShot means read only to EOF. - glog.V(2).Infof("stream(%s): EOF in one shot mode, exiting", fs.pathname) + glog.V(2).Infof("stream(%s): EOF in one shot mode, exiting", fs.sourcename) if partial.Len() > 0 { - fs.sendLine(ctx, fs.pathname, partial) + fs.sendLine(ctx, partial) } close(fs.lines) return } select { case <-ctx.Done(): - glog.V(2).Infof("stream(%s): context has been cancelled, exiting", fs.pathname) + glog.V(2).Infof("stream(%s): context has been cancelled, exiting", fs.sourcename) if partial.Len() > 0 { - fs.sendLine(ctx, fs.pathname, partial) + fs.sendLine(ctx, partial) } close(fs.lines) return @@ -239,7 +247,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Don't exit, instead yield and wait for a termination signal or // wakeup. - glog.V(2).Infof("stream(%s): waiting", fs.pathname) + glog.V(2).Infof("stream(%s): waiting", fs.sourcename) select { case <-ctx.Done(): // Exit after next read attempt. @@ -254,7 +262,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", fs.pathname) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("stream(%s): Wake received", fs.pathname) + glog.V(2).Infof("stream(%s): Wake received", fs.sourcename) } } }() diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index 7d2cefa45..a9a13ae83 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -33,7 +33,15 @@ type pipeStream struct { // `pathname` must already be verified as clean. func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) - ps := &pipeStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), streamBase: streamBase{lines: make(chan *logline.LogLine)}} + ps := &pipeStream{ + cancel: cancel, + pathname: pathname, + lastReadTime: time.Now(), + streamBase: streamBase{ + sourcename: pathname, + lines: make(chan *logline.LogLine), + }, + } if err := ps.stream(ctx, wg, waker, fi); err != nil { return nil, err } @@ -67,7 +75,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake logErrors.Add(ps.pathname, 1) return err } - glog.V(2).Infof("stream(%s): opened new pipe %v", ps.pathname, fd) + glog.V(2).Infof("stream(%s): opened new pipe %v", ps.sourcename, fd) b := make([]byte, defaultPipeReadBufferSize) partial := bytes.NewBufferString("") var total int @@ -75,8 +83,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake go func() { defer wg.Done() defer func() { - glog.V(2).Infof("stream(%s): read total %d bytes", ps.pathname, total) - glog.V(2).Infof("stream(%s): closing file descriptor %v", ps.pathname, fd) + glog.V(2).Infof("stream(%s): read total %d bytes", ps.sourcename, total) + glog.V(2).Infof("stream(%s): closing file descriptor %v", ps.sourcename, fd) err := fd.Close() if err != nil { logErrors.Add(ps.pathname, 1) @@ -90,7 +98,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake for { n, err := fd.Read(b) - glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.pathname, n, err) + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.sourcename, n, err) if ps.staleTimer != nil { ps.staleTimer.Stop() @@ -98,7 +106,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if n > 0 { total += n - ps.decodeAndSend(ctx, ps.pathname, n, b[:n], partial) + ps.decodeAndSend(ctx, n, b[:n], partial) // Update the last read time if we were able to read anything. ps.mu.Lock() ps.lastReadTime = time.Now() @@ -114,21 +122,21 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Test to see if we should exit. if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - ps.sendLine(ctx, ps.pathname, partial) + ps.sendLine(ctx, partial) } - glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.pathname, err) + glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err) return } // Wait for wakeup or termination. - glog.V(2).Infof("stream(%s): waiting", ps.pathname) + glog.V(2).Infof("stream(%s): waiting", ps.sourcename) select { case <-ctx.Done(): // Exit after next read attempt. glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", ps.pathname) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("stream(%s): Wake received", ps.pathname) + glog.V(2).Infof("stream(%s): Wake received", ps.sourcename) } } }() diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index eb09b8e65..434e6a4de 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -6,6 +6,7 @@ package logstream import ( "bytes" "context" + "fmt" "net" "sync" "time" @@ -35,7 +36,18 @@ func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, return nil, ErrEmptySocketAddress } ctx, cancel := context.WithCancel(ctx) - ss := &socketStream{cancel: cancel, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), streamBase: streamBase{lines: make(chan *logline.LogLine)}} + ss := &socketStream{ + cancel: cancel, + oneShot: oneShot, + scheme: scheme, + address: address, + lastReadTime: time.Now(), + streamBase: streamBase{ + sourcename: fmt.Sprintf("%s://%s", scheme, address), + lines: make(chan *logline.LogLine), + }, + } + if err := ss.stream(ctx, wg, waker); err != nil { return nil, err } @@ -49,7 +61,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa logErrors.Add(ss.address, 1) return err } - glog.V(2).Infof("stream(%s:%s): opened new socket listener %+v", ss.scheme, ss.address, l) + glog.V(2).Infof("stream(%s): opened new socket listener %+v", ss.sourcename, l) // signals when a connection has been opened started := make(chan struct{}) @@ -66,7 +78,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa if !ss.oneShot { <-ctx.Done() } - glog.V(2).Infof("stream(%s:%s): closing listener", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s): closing listener", ss.sourcename) err := l.Close() if err != nil { glog.Info(err) @@ -86,12 +98,12 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa glog.Info(err) return } - glog.V(2).Infof("stream(%s:%s): got new conn %v", ss.scheme, ss.address, c) + glog.V(2).Infof("stream(%s): got new conn %v", ss.sourcename, c) connWg.Add(1) go ss.handleConn(ctx, &connWg, waker, c) connOnce.Do(func() { close(started) }) if ss.oneShot { - glog.Infof("stream(%s:%s): oneshot mode, exiting accept loop", ss.scheme, ss.address) + glog.Infof("stream(%s): oneshot mode, exiting accept loop", ss.sourcename) return } } @@ -106,8 +118,8 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake partial := bytes.NewBufferString("") var total int defer func() { - glog.V(2).Infof("stream(%s:%s): read total %d bytes from %s", ss.scheme, ss.address, c, total) - glog.V(2).Infof("stream(%s:%s): closing connection, %v", ss.scheme, ss.address, c) + glog.V(2).Infof("stream(%s): read total %d bytes from %s", ss.sourcename, c, total) + glog.V(2).Infof("stream(%s): closing connection, %v", ss.sourcename, c) err := c.Close() if err != nil { logErrors.Add(ss.address, 1) @@ -121,7 +133,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake for { n, err := c.Read(b) - glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err) + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ss.sourcename, n, err) if ss.staleTimer != nil { ss.staleTimer.Stop() @@ -130,7 +142,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if n > 0 { total += n //nolint:contextcheck - ss.decodeAndSend(ctx, ss.address, n, b[:n], partial) + ss.decodeAndSend(ctx, n, b[:n], partial) ss.mu.Lock() ss.lastReadTime = time.Now() ss.mu.Unlock() @@ -144,22 +156,22 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - ss.sendLine(ctx, ss.address, partial) + ss.sendLine(ctx, partial) } - glog.V(2).Infof("stream(%s:%s): exiting, conn has error %s", ss.scheme, ss.address, err) + glog.V(2).Infof("stream(%s): exiting, conn has error %s", ss.sourcename, err) return } // Yield and wait - glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s): waiting", ss.sourcename) select { case <-ctx.Done(): // Exit after next read attempt. glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s): Wake received", ss.sourcename) } } } diff --git a/internal/tailer/logstream/socketstream_unix_test.go b/internal/tailer/logstream/socketstream_unix_test.go index 68ce5f9c8..0ffad74df 100644 --- a/internal/tailer/logstream/socketstream_unix_test.go +++ b/internal/tailer/logstream/socketstream_unix_test.go @@ -47,7 +47,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { testutil.FatalIfErr(t, err) expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: addr, Line: "1"}, + {Context: context.TODO(), Filename: sockName, Line: "1"}, } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ss.Lines()) @@ -96,7 +96,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { testutil.FatalIfErr(t, err) expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: addr, Line: "1"}, + {Context: context.TODO(), Filename: sockName, Line: "1"}, } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ss.Lines()) From aaf929d4cc1d0a47f5e740e2db94e87f5a45834f Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 6 Jul 2024 13:34:35 +0200 Subject: [PATCH 05/13] refactor: Move the stale timer to `streamBase` Remove the lastReadTime as we don't use it. --- internal/tailer/logstream/base.go | 4 ++++ internal/tailer/logstream/dgramstream.go | 16 ++++------------ internal/tailer/logstream/filestream.go | 13 ++----------- internal/tailer/logstream/pipestream.go | 14 ++------------ internal/tailer/logstream/socketstream.go | 17 ++++------------- 5 files changed, 16 insertions(+), 48 deletions(-) diff --git a/internal/tailer/logstream/base.go b/internal/tailer/logstream/base.go index 4188886f9..ee728a815 100644 --- a/internal/tailer/logstream/base.go +++ b/internal/tailer/logstream/base.go @@ -4,6 +4,8 @@ package logstream import ( + "time" + "github.com/google/mtail/internal/logline" ) @@ -11,6 +13,8 @@ type streamBase struct { sourcename string // human readable name of the logstream source lines chan *logline.LogLine // outbound channel for lines + + staleTimer *time.Timer // Expire the stream if no read in 24h. } // Lines returns the output log line channel for this stream. The stream is diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index f1691c3a0..ccdc32ea5 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -23,11 +23,6 @@ type dgramStream struct { scheme string // Datagram scheme, either "unixgram" or "udp". address string // Given name for the underlying socket path on the filesystem or hostport. - - mu sync.RWMutex // protects following fields - lastReadTime time.Time // Last time a log line was read from this named pipe - - staleTimer *time.Timer // Expire the stream if no read in 24h } func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) { @@ -35,10 +30,10 @@ func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, return nil, ErrEmptySocketAddress } ctx, cancel := context.WithCancel(ctx) - ss := &dgramStream{cancel: cancel, - scheme: scheme, - address: address, - lastReadTime: time.Now(), + ss := &dgramStream{ + cancel: cancel, + scheme: scheme, + address: address, streamBase: streamBase{ sourcename: fmt.Sprintf("%s://%s", scheme, address), lines: make(chan *logline.LogLine), @@ -116,9 +111,6 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak total += n //nolint:contextcheck ds.decodeAndSend(ctx, n, b[:n], partial) - ds.mu.Lock() - ds.lastReadTime = time.Now() - ds.mu.Unlock() ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel) // No error implies more to read, so restart the loop. diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index c844ea2c1..4fe92b164 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -39,20 +39,14 @@ type fileStream struct { cancel context.CancelFunc pathname string // Given name for the underlying file on the filesystem - - mu sync.RWMutex // protects following fields. - lastReadTime time.Time // Last time a log line was read from this file - - staleTimer *time.Timer // Expire the stream if no read in 24h } // newFileStream creates a new log stream from a regular file. func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, oneShot OneShotMode) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) fs := &fileStream{ - cancel: cancel, - pathname: pathname, - lastReadTime: time.Now(), + cancel: cancel, + pathname: pathname, streamBase: streamBase{ sourcename: pathname, lines: make(chan *logline.LogLine), @@ -125,9 +119,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } else { lastBytes = []byte{} } - fs.mu.Lock() - fs.lastReadTime = time.Now() - fs.mu.Unlock() fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel) // No error implies there is more to read so restart the loop. diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index a9a13ae83..606bb6d1f 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -22,11 +22,6 @@ type pipeStream struct { cancel context.CancelFunc pathname string // Given name for the underlying named pipe on the filesystem - - mu sync.RWMutex // protects following fields - lastReadTime time.Time // Last time a log line was read from this named pipe - - staleTimer *time.Timer // Expire the stream if no read in 24h } // newPipeStream creates a new stream reader for Unix Pipes. @@ -34,9 +29,8 @@ type pipeStream struct { func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) ps := &pipeStream{ - cancel: cancel, - pathname: pathname, - lastReadTime: time.Now(), + cancel: cancel, + pathname: pathname, streamBase: streamBase{ sourcename: pathname, lines: make(chan *logline.LogLine), @@ -107,10 +101,6 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if n > 0 { total += n ps.decodeAndSend(ctx, n, b[:n], partial) - // Update the last read time if we were able to read anything. - ps.mu.Lock() - ps.lastReadTime = time.Now() - ps.mu.Unlock() ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel) // No error implies there is more to read so restart the loop. diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 434e6a4de..64ba0327e 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -24,11 +24,6 @@ type socketStream struct { oneShot OneShotMode scheme string // URL Scheme to listen with, either tcp or unix address string // Given name for the underlying socket path on the filesystem or host/port. - - mu sync.RWMutex // protects following fields - lastReadTime time.Time // Last time a log line was read from this socket - - staleTimer *time.Timer // Expire the stream if no read in 24h } func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) { @@ -37,11 +32,10 @@ func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, } ctx, cancel := context.WithCancel(ctx) ss := &socketStream{ - cancel: cancel, - oneShot: oneShot, - scheme: scheme, - address: address, - lastReadTime: time.Now(), + cancel: cancel, + oneShot: oneShot, + scheme: scheme, + address: address, streamBase: streamBase{ sourcename: fmt.Sprintf("%s://%s", scheme, address), lines: make(chan *logline.LogLine), @@ -143,9 +137,6 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake total += n //nolint:contextcheck ss.decodeAndSend(ctx, n, b[:n], partial) - ss.mu.Lock() - ss.lastReadTime = time.Now() - ss.mu.Unlock() ss.staleTimer = time.AfterFunc(time.Hour*24, ss.cancel) // No error implies more to read, so restart the loop. From 62b2bede0573de8940857eee0dee3a55dfb4eac2 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Mon, 8 Jul 2024 14:23:10 +0200 Subject: [PATCH 06/13] test: Check the number of the received lines in `ExpectLinesReceivedNoDiff` --- internal/testutil/lines.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/testutil/lines.go b/internal/testutil/lines.go index d98dfc603..7292e05b5 100644 --- a/internal/testutil/lines.go +++ b/internal/testutil/lines.go @@ -33,6 +33,9 @@ func ExpectLinesReceivedNoDiff(tb testing.TB, wantLines []*logline.LogLine, gotL return func() { tb.Helper() wg.Wait() + if len(received) != len(wantLines) { + tb.Errorf("unexpected line count, want %d got %d", len(wantLines), len(received)) + } ExpectNoDiff(tb, wantLines, received, IgnoreFields(logline.LogLine{}, "Context")) } } From 5d119abc910f8251e60229c84f5ddef8ab01bdae Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Mon, 8 Jul 2024 14:23:32 +0200 Subject: [PATCH 07/13] test: Remove the expected and received test output. These are noisy and hard to read. --- internal/testutil/diff.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/testutil/diff.go b/internal/testutil/diff.go index 7feb810f6..d185b9b37 100644 --- a/internal/testutil/diff.go +++ b/internal/testutil/diff.go @@ -39,8 +39,6 @@ func ExpectNoDiff(tb testing.TB, a, b interface{}, opts ...cmp.Option) bool { tb.Helper() if diff := Diff(a, b, opts...); diff != "" { tb.Errorf("Unexpected diff, -want +got:\n%s", diff) - tb.Logf("expected:\n%#v", a) - tb.Logf("received:\n%#v", b) return false } return true From d056dcdad9cdaf18dc219348f24083164b4e28cb Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Mon, 8 Jul 2024 14:33:17 +0200 Subject: [PATCH 08/13] refactor: Handle nil errors in `IsExitableError`. --- internal/tailer/logstream/cancel.go | 10 +++++++++- internal/tailer/logstream/dgramstream.go | 2 +- internal/tailer/logstream/pipestream.go | 2 +- internal/tailer/logstream/socketstream.go | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/internal/tailer/logstream/cancel.go b/internal/tailer/logstream/cancel.go index 7fd9053da..5d7cbfba4 100644 --- a/internal/tailer/logstream/cancel.go +++ b/internal/tailer/logstream/cancel.go @@ -11,10 +11,14 @@ import ( "github.com/golang/glog" ) +// ReadDeadliner has a SetReadDeadline function to be used for interrupting reads. type ReadDeadliner interface { SetReadDeadline(t time.Time) error } +// SetReadDeadlineOnDone waits for the context to be done, and then sets an +// immediate read deadline on the flie descriptor `d`. This causes any blocked +// reads on that descriptor to return with an i/o timeout error. func SetReadDeadlineOnDone(ctx context.Context, d ReadDeadliner) { go func() { <-ctx.Done() @@ -25,7 +29,11 @@ func SetReadDeadlineOnDone(ctx context.Context, d ReadDeadliner) { }() } -func IsEndOrCancel(err error) bool { +// IsExitableError returns true if a stream should exit because of this error. +func IsExitableError(err error) bool { + if err == nil { + return false + } if errors.Is(err, io.EOF) { return true } diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index ccdc32ea5..bf21ce142 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -119,7 +119,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak } } - if err != nil && IsEndOrCancel(err) { + if IsExitableError(err) { if partial.Len() > 0 { ds.sendLine(ctx, partial) } diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index 606bb6d1f..43fd4f74d 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -110,7 +110,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } // Test to see if we should exit. - if err != nil && IsEndOrCancel(err) { + if IsExitableError(err) { if partial.Len() > 0 { ps.sendLine(ctx, partial) } diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 64ba0327e..7efab14bf 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -145,7 +145,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake } } - if err != nil && IsEndOrCancel(err) { + if IsExitableError(err) { if partial.Len() > 0 { ss.sendLine(ctx, partial) } From 5d33f1b2beb387a00bd0b90364de7753e77c4e91 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Wed, 10 Jul 2024 14:11:17 +0200 Subject: [PATCH 09/13] fix: Named pipe streams can exit after a zero byte read. According to `pipe(7)` and `read(2)` the read will return 0 when all writer FDs are closed, with no `errno`, signalling EOF. In order to use the `io.ReadFrom` interface we won't be able to rely on the return of `io.EOF` anymore. --- internal/tailer/logstream/pipestream.go | 13 ++++++++++--- internal/tailer/logstream/pipestream_unix_test.go | 13 +++++++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index 43fd4f74d..394368a8b 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -85,6 +85,9 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.Info(err) } logCloses.Add(ps.pathname, 1) + if partial.Len() > 0 { + ps.sendLine(ctx, partial) + } close(ps.lines) ps.cancel() }() @@ -107,13 +110,17 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if err == nil && ctx.Err() == nil { continue } + } else if n == 0 { + // `pipe(7)` tells us "If all file descriptors referring to the + // write end of a pipe have been closed, then an attempt to + // read(2) from the pipe will see end-of-file (read(2) will + // return 0)." + glog.V(2).Infof("stream(%s): exiting, 0 bytes read", ps.sourcename) + return } // Test to see if we should exit. if IsExitableError(err) { - if partial.Len() > 0 { - ps.sendLine(ctx, partial) - } glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err) return } diff --git a/internal/tailer/logstream/pipestream_unix_test.go b/internal/tailer/logstream/pipestream_unix_test.go index a6ed66623..7fa924d6d 100644 --- a/internal/tailer/logstream/pipestream_unix_test.go +++ b/internal/tailer/logstream/pipestream_unix_test.go @@ -116,6 +116,7 @@ func TestPipeStreamReadURL(t *testing.T) { expected := []*logline.LogLine{ {Context: context.TODO(), Filename: name, Line: "1"}, + {Context: context.TODO(), Filename: name, Line: "2"}, } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) @@ -123,6 +124,11 @@ func TestPipeStreamReadURL(t *testing.T) { testutil.FatalIfErr(t, err) testutil.WriteString(t, f, "1\n") + // Give the stream a chance to wake and read + time.Sleep(10 * time.Millisecond) + + testutil.WriteString(t, f, "2\n") + // Pipes need to be closed to signal to the pipeStream to finish up. testutil.FatalIfErr(t, f.Close()) @@ -147,7 +153,7 @@ func TestPipeStreamReadStdin(t *testing.T) { f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) testutil.FatalIfErr(t, err) testutil.OverrideStdin(t, f) - testutil.WriteString(t, f, "content\n") + testutil.WriteString(t, f, "1\n") ctx, cancel := context.WithCancel(context.Background()) // The stream is not shut down by cancel in this test. @@ -158,10 +164,13 @@ func TestPipeStreamReadStdin(t *testing.T) { testutil.FatalIfErr(t, err) expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: "-", Line: "content"}, + {Context: context.TODO(), Filename: "-", Line: "1"}, + {Context: context.TODO(), Filename: "-", Line: "2"}, } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) + testutil.WriteString(t, f, "2\n") + // Give the stream a chance to wake and read time.Sleep(10 * time.Millisecond) From 180aff9a305a379b8e7c8472260a8151e6be984d Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Wed, 10 Jul 2024 16:03:21 +0200 Subject: [PATCH 10/13] test: Use `NewTestAlways` for a stream waker if the count is 0. --- internal/mtail/testing.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/mtail/testing.go b/internal/mtail/testing.go index edd70f2c5..a8be50214 100644 --- a/internal/mtail/testing.go +++ b/internal/mtail/testing.go @@ -63,7 +63,12 @@ func TestMakeServer(tb testing.TB, patternWakers int, streamWakers int, options tb: tb, cancel: cancel, } - ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams") + if streamWakers == 0 { + ts.streamWaker = waker.NewTestAlways() + ts.AwakenLogStreams = func(int, int) {} + } else { + ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams") + } ts.patternWaker, ts.AwakenPatternPollers = waker.NewTest(ctx, patternWakers, "patterns") options = append(options, LogstreamPollWaker(ts.streamWaker), From 08c5cd28d98539cd61732092913d640da6ea865d Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Fri, 12 Jul 2024 13:52:44 +0200 Subject: [PATCH 11/13] refactor: Move error handling of `pipeOpen` into that function. --- internal/tailer/logstream/pipestream.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index 394368a8b..f8fbdb572 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -47,7 +47,14 @@ func pipeOpen(pathname string) (*os.File, error) { return os.Stdin, nil } // Open in nonblocking mode because the write end of the pipe may not have started yet. - return os.OpenFile(pathname, os.O_RDONLY|syscall.O_NONBLOCK, 0o600) // #nosec G304 -- path already validated by caller + fd, err := os.OpenFile(pathname, os.O_RDONLY|syscall.O_NONBLOCK, 0o600) // #nosec G304 -- path already validated by caller + if err != nil { + glog.Warningf("pipeOpen(%s): open failed: %v", pathname, err) + logErrors.Add(pathname, 1) + return nil, err + } + glog.V(2).Infof("pipeOpen(%s): opened new pipe %v", pathname, fd) + return fd, nil } // The read buffer size for pipes. @@ -66,10 +73,8 @@ const defaultPipeReadBufferSize = 131072 func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, _ os.FileInfo) error { fd, err := pipeOpen(ps.pathname) if err != nil { - logErrors.Add(ps.pathname, 1) return err } - glog.V(2).Infof("stream(%s): opened new pipe %v", ps.sourcename, fd) b := make([]byte, defaultPipeReadBufferSize) partial := bytes.NewBufferString("") var total int From 2e17520651a2d403f7bbdda1644362b2ef97e401 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Fri, 12 Jul 2024 15:20:49 +0200 Subject: [PATCH 12/13] fix: Handle startup conditions for fifos. When opening a fifo in nonblocking mode to allow for delays in the writer, we also turn the Read into a nonblocking read. This is desirable for context cancellation but makes the Read return before bytes may be ready. `pipe(7)` and `read(2)` POSIX manpages tell us that if there are no writers connected, `read` will return end-of-file. In Go, this means we get a (0, EOF) pair. We shouldn't necessarily exit straight away though, because the writer may not have started writing yet. In test this is visible as race conditions when the `Read` is scheduled before the test performs a `Write`. We can decide that a stream is active once a single byte has been read, and only allow EOF to trigger shutdown on an active stream. Thus we check here for `total > 0` for 0 byte and EOF err responses on `Read` before exiting the stream. This fixes known races in `pipestream` so far. --- internal/tailer/logstream/pipestream.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index f8fbdb572..599119e3c 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -6,6 +6,8 @@ package logstream import ( "bytes" "context" + "errors" + "io" "os" "sync" "syscall" @@ -46,7 +48,7 @@ func pipeOpen(pathname string) (*os.File, error) { if IsStdinPattern(pathname) { return os.Stdin, nil } - // Open in nonblocking mode because the write end of the pipe may not have started yet. + // Open in nonblocking mode because the write end of the pipe may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842 fd, err := os.OpenFile(pathname, os.O_RDONLY|syscall.O_NONBLOCK, 0o600) // #nosec G304 -- path already validated by caller if err != nil { glog.Warningf("pipeOpen(%s): open failed: %v", pathname, err) @@ -99,6 +101,10 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake SetReadDeadlineOnDone(ctx, fd) for { + // Because we've opened in nonblocking mode, this Read can return + // straight away. If there are no writers, it'll return EOF (per + // `pipe(7)` and `read(2)`.) This is expected when `mtail` is + // starting at system init as the writer may not be ready yet. n, err := fd.Read(b) glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.sourcename, n, err) @@ -115,19 +121,23 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if err == nil && ctx.Err() == nil { continue } - } else if n == 0 { + } else if n == 0 && total > 0 { // `pipe(7)` tells us "If all file descriptors referring to the // write end of a pipe have been closed, then an attempt to // read(2) from the pipe will see end-of-file (read(2) will - // return 0)." + // return 0)." To avoid shutting down the stream at startup + // before any writer has connected to the fifo, condition on + // having read any bytes previously. glog.V(2).Infof("stream(%s): exiting, 0 bytes read", ps.sourcename) return } // Test to see if we should exit. if IsExitableError(err) { - glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err) - return + if !(errors.Is(err, io.EOF) && total == 0) { + glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err) + return + } } // Wait for wakeup or termination. From 47e973f6810540fe0eefd443e0460397cf3f493f Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Fri, 12 Jul 2024 15:32:33 +0200 Subject: [PATCH 13/13] refactor: Rename pipestream to fifostream. This keeps confusing me, but we're reading unix named pipes aka fifos, not pipes. --- .../mtail/read_pipe_integration_unix_test.go | 2 +- .../{pipestream.go => fifostream.go} | 36 +++++++++---------- ...m_unix_test.go => fifostream_unix_test.go} | 0 internal/tailer/logstream/logstream.go | 4 +-- 4 files changed, 21 insertions(+), 21 deletions(-) rename internal/tailer/logstream/{pipestream.go => fifostream.go} (80%) rename internal/tailer/logstream/{pipestream_unix_test.go => fifostream_unix_test.go} (100%) diff --git a/internal/mtail/read_pipe_integration_unix_test.go b/internal/mtail/read_pipe_integration_unix_test.go index 5034dce08..2f34fc89b 100644 --- a/internal/mtail/read_pipe_integration_unix_test.go +++ b/internal/mtail/read_pipe_integration_unix_test.go @@ -17,7 +17,7 @@ import ( "golang.org/x/sys/unix" ) -func TestReadFromPipe(t *testing.T) { +func TestReadFromFifo(t *testing.T) { testutil.SkipIfShort(t) tmpDir := testutil.TestTempDir(t) diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/fifostream.go similarity index 80% rename from internal/tailer/logstream/pipestream.go rename to internal/tailer/logstream/fifostream.go index 599119e3c..ec3e8d9f5 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/fifostream.go @@ -18,7 +18,7 @@ import ( "github.com/google/mtail/internal/waker" ) -type pipeStream struct { +type fifoStream struct { streamBase cancel context.CancelFunc @@ -26,11 +26,11 @@ type pipeStream struct { pathname string // Given name for the underlying named pipe on the filesystem } -// newPipeStream creates a new stream reader for Unix Pipes. +// newFifoStream creates a new stream reader for Unix Fifos. // `pathname` must already be verified as clean. -func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) { +func newFifoStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) - ps := &pipeStream{ + ps := &fifoStream{ cancel: cancel, pathname: pathname, streamBase: streamBase{ @@ -44,40 +44,40 @@ func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, p return ps, nil } -func pipeOpen(pathname string) (*os.File, error) { +func fifoOpen(pathname string) (*os.File, error) { if IsStdinPattern(pathname) { return os.Stdin, nil } - // Open in nonblocking mode because the write end of the pipe may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842 + // Open in nonblocking mode because the write end of the fifo may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842 fd, err := os.OpenFile(pathname, os.O_RDONLY|syscall.O_NONBLOCK, 0o600) // #nosec G304 -- path already validated by caller if err != nil { - glog.Warningf("pipeOpen(%s): open failed: %v", pathname, err) + glog.Warningf("fifoOpen(%s): open failed: %v", pathname, err) logErrors.Add(pathname, 1) return nil, err } - glog.V(2).Infof("pipeOpen(%s): opened new pipe %v", pathname, fd) + glog.V(2).Infof("fifoOpen(%s): opened new fifo %v", pathname, fd) return fd, nil } -// The read buffer size for pipes. +// The read buffer size for fifos. // -// Before Linux 2.6.11, the capacity of a pipe was the same as the +// Before Linux 2.6.11, the capacity of a fifo was the same as the // system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, -// the pipe capacity is 16 pages (i.e., 65,536 bytes in a system +// the fifo capacity is 16 pages (i.e., 65,536 bytes in a system // with a page size of 4096 bytes). Since Linux 2.6.35, the default -// pipe capacity is 16 pages, but the capacity can be queried and +// fifo capacity is 16 pages, but the capacity can be queried and // set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations. // See fcntl(2) for more information. // // https://man7.org/linux/man-pages/man7/pipe.7.html -const defaultPipeReadBufferSize = 131072 +const defaultFifoReadBufferSize = 131072 -func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, _ os.FileInfo) error { - fd, err := pipeOpen(ps.pathname) +func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, _ os.FileInfo) error { + fd, err := fifoOpen(ps.pathname) if err != nil { return err } - b := make([]byte, defaultPipeReadBufferSize) + b := make([]byte, defaultFifoReadBufferSize) partial := bytes.NewBufferString("") var total int wg.Add(1) @@ -123,8 +123,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } } else if n == 0 && total > 0 { // `pipe(7)` tells us "If all file descriptors referring to the - // write end of a pipe have been closed, then an attempt to - // read(2) from the pipe will see end-of-file (read(2) will + // write end of a fifo have been closed, then an attempt to + // read(2) from the fifo will see end-of-file (read(2) will // return 0)." To avoid shutting down the stream at startup // before any writer has connected to the fifo, condition on // having read any bytes previously. diff --git a/internal/tailer/logstream/pipestream_unix_test.go b/internal/tailer/logstream/fifostream_unix_test.go similarity index 100% rename from internal/tailer/logstream/pipestream_unix_test.go rename to internal/tailer/logstream/fifostream_unix_test.go diff --git a/internal/tailer/logstream/logstream.go b/internal/tailer/logstream/logstream.go index 520278e86..7dbd5314a 100644 --- a/internal/tailer/logstream/logstream.go +++ b/internal/tailer/logstream/logstream.go @@ -92,7 +92,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st logErrors.Add(path, 1) return nil, err } - return newPipeStream(ctx, wg, waker, path, fi) + return newFifoStream(ctx, wg, waker, path, fi) } fi, err := os.Stat(path) if err != nil { @@ -103,7 +103,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st case m.IsRegular(): return newFileStream(ctx, wg, waker, path, fi, oneShot) case m&os.ModeType == os.ModeNamedPipe: - return newPipeStream(ctx, wg, waker, path, fi) + return newFifoStream(ctx, wg, waker, path, fi) // TODO(jaq): in order to listen on an existing socket filepath, we must unlink and recreate it // case m&os.ModeType == os.ModeSocket: // return newSocketStream(ctx, wg, waker, pathname)