diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index 56819e281..acce398f6 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -16,7 +16,8 @@ import ( ) type dgramStream struct { - ctx context.Context + cancel context.CancelFunc + lines chan<- *logline.LogLine scheme string // Datagram scheme, either "unixgram" or "udp". @@ -25,17 +26,15 @@ type dgramStream struct { mu sync.RWMutex // protects following fields completed bool // This pipestream is completed and can no longer be used. lastReadTime time.Time // Last time a log line was read from this named pipe - - stopOnce sync.Once // Ensure stopChan only closed once. - stopChan chan struct{} // Close to start graceful shutdown. } -func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine) (LogStream, error) { +func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { if address == "" { return nil, ErrEmptySocketAddress } - ss := &dgramStream{ctx: ctx, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})} - if err := ss.stream(ctx, wg, waker); err != nil { + ctx, cancel := context.WithCancel(ctx) + ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} + if err := ss.stream(ctx, wg, waker, oneShot); err != nil { return nil, err } return ss, nil @@ -50,13 +49,13 @@ func (ss *dgramStream) LastReadTime() time.Time { // The read buffer size for datagrams. const datagramReadBufferSize = 131072 -func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error { +func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error { c, err := net.ListenPacket(ss.scheme, ss.address) if err != nil { logErrors.Add(ss.address, 1) return err } - glog.V(2).Infof("opened new datagram socket %v", c) + glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ss.scheme, ss.address, c) b := make([]byte, datagramReadBufferSize) partial := bytes.NewBufferString("") var total int @@ -64,8 +63,8 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak go func() { defer wg.Done() defer func() { - glog.V(2).Infof("%v: read total %d bytes from %s", c, total, ss.address) - glog.V(2).Infof("%v: closing connection", c) + glog.V(2).Infof("stream(%s:%s): read total %d bytes", ss.scheme, ss.address, total) + glog.V(2).Infof("stream(%s:%s): closing connection", ss.scheme, ss.address) err := c.Close() if err != nil { logErrors.Add(ss.address, 1) @@ -83,15 +82,25 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak for { n, _, err := c.ReadFrom(b) - glog.V(2).Infof("%v: read %d bytes, err is %v", c, n, err) + glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err) // This is a test-only trick that says if we've already put this // logstream in graceful shutdown, then a zero-byte read is // equivalent to an "EOF" in connection and file oriented streams. if n == 0 { + if oneShot { + glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ss.scheme, ss.address) + if partial.Len() > 0 { + sendLine(ctx, ss.address, partial, ss.lines) + } + return + } select { - case <-ss.stopChan: - glog.V(2).Infof("%v: exiting because zero byte read after Stop", c) + case <-ctx.Done(): + glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ss.scheme, ss.address) + if partial.Len() > 0 { + sendLine(ctx, ss.address, partial, ss.lines) + } return default: } @@ -100,7 +109,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if n > 0 { total += n //nolint:contextcheck - decodeAndSend(ss.ctx, ss.lines, ss.address, n, b[:n], partial) + decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial) ss.mu.Lock() ss.lastReadTime = time.Now() ss.mu.Unlock() @@ -110,28 +119,23 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if partial.Len() > 0 { sendLine(ctx, ss.address, partial, ss.lines) } - glog.V(2).Infof("%v: exiting, stream has error %s", c, err) + glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ss.scheme, ss.address, err) return } // Yield and wait - glog.V(2).Infof("%v: waiting", c) + glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) select { - case <-ss.stopChan: + case <-ctx.Done(): // We may have started waiting here when the stop signal // arrives, but since that wait the file may have been // written to. The file is not technically yet at EOF so // we need to go back and try one more read. We'll exit // the stream in the zero byte handler above. - glog.V(2).Infof("%v: Stopping after next zero byte read", c) - case <-ctx.Done(): - // Exit immediately; a cancelled context will set an immediate - // deadline on the next read which will cause us to exit then, - // so don't bother going around the loop again. - return + glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ss.scheme, ss.address) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("%v: Wake received", c) + glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address) } } }() @@ -145,8 +149,6 @@ func (ss *dgramStream) IsComplete() bool { } func (ss *dgramStream) Stop() { - glog.V(2).Infof("Stop received on datagram stream.") - ss.stopOnce.Do(func() { - close(ss.stopChan) - }) + glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ss.scheme, ss.address) + ss.cancel() } diff --git a/internal/tailer/logstream/dgramstream_unix_test.go b/internal/tailer/logstream/dgramstream_unix_test.go index 29cf41b1e..3bb89abc2 100644 --- a/internal/tailer/logstream/dgramstream_unix_test.go +++ b/internal/tailer/logstream/dgramstream_unix_test.go @@ -43,7 +43,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { waker, awaken := waker.NewTest(ctx, 1, "stream") sockName := scheme + "://" + addr - ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled) + ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled) testutil.FatalIfErr(t, err) s, err := net.Dial(scheme, addr) @@ -54,9 +54,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { awaken(0, 0) // sync past read - ss.Stop() - - // "Close" the socket by sending zero bytes, which after Stop tells the stream to act as if we're done. + // "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done. _, err = s.Write([]byte{}) testutil.FatalIfErr(t, err) @@ -65,7 +63,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), addr, "1"}, + {Context: context.TODO(), Filename: addr, Line: "1"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -118,7 +116,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), addr, "1"}, + {Context: context.TODO(), Filename: addr, Line: "1"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index 708e062dc..0a2ab9317 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -34,7 +34,8 @@ var fileTruncates = expvar.NewMap("file_truncates_total") // a new goroutine and closes itself down. The shared context is used for // cancellation. type fileStream struct { - ctx context.Context + cancel context.CancelFunc + lines chan<- *logline.LogLine pathname string // Given name for the underlying file on the filesystem @@ -42,15 +43,15 @@ type fileStream struct { mu sync.RWMutex // protects following fields. lastReadTime time.Time // Last time a log line was read from this file completed bool // The filestream is completed and can no longer be used. - - stopOnce sync.Once // Ensure stopChan only closed once. - stopChan chan struct{} // Close to start graceful shutdown. } // newFileStream creates a new log stream from a regular file. -func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, streamFromStart OneShotMode) (LogStream, error) { - fs := &fileStream{ctx: ctx, pathname: pathname, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})} - if err := fs.stream(ctx, wg, waker, fi, streamFromStart); err != nil { +func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { + ctx, cancel := context.WithCancel(ctx) + fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: lines} + // Stream from the start of the file when in one shot mode. + streamFromStart := oneShot == OneShotEnabled + if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { return nil, err } return fs, nil @@ -62,24 +63,26 @@ func (fs *fileStream) LastReadTime() time.Time { return fs.lastReadTime } -func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, fi os.FileInfo, streamFromStart OneShotMode) error { +func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, fi os.FileInfo, oneShot OneShotMode, streamFromStart bool) error { fd, err := os.OpenFile(fs.pathname, os.O_RDONLY, 0o600) if err != nil { logErrors.Add(fs.pathname, 1) return err } logOpens.Add(fs.pathname, 1) - glog.V(2).Infof("%v: opened new file", fd) + glog.V(2).Infof("stream(%s): opened new file", fs.pathname) if !streamFromStart { + // Normal operation for first stream is to ignore the past, and seek to + // EOF immediately to start tailing. if _, err := fd.Seek(0, io.SeekEnd); err != nil { logErrors.Add(fs.pathname, 1) if err := fd.Close(); err != nil { logErrors.Add(fs.pathname, 1) - glog.Info(err) + glog.Infof("stream(%s): closing file: %v", fs.pathname, err) } return err } - glog.V(2).Infof("%v: seeked to end", fd) + glog.V(2).Infof("stream(%s): seeked to end", fs.pathname) } b := make([]byte, defaultReadBufferSize) var lastBytes []byte @@ -90,11 +93,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake go func() { defer wg.Done() defer func() { - glog.V(2).Infof("%v: read total %d bytes from %s", fd, total, fs.pathname) - glog.V(2).Infof("%v: closing file descriptor", fd) + glog.V(2).Infof("stream(%s): read total %d bytes", fs.pathname, total) + glog.V(2).Infof("stream(%s): closing file descriptor", fs.pathname) if err := fd.Close(); err != nil { logErrors.Add(fs.pathname, 1) - glog.Info(err) + glog.Infof("stream(%s): closing file: %v", fs.pathname, err) } logCloses.Add(fs.pathname, 1) }() @@ -102,11 +105,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake for { // Blocking read but regular files will return EOF straight away. count, err := fd.Read(b) - glog.V(2).Infof("%v: read %d bytes, err is %v", fd, count, err) + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", fs.pathname, count, err) if count > 0 { total += count - glog.V(2).Infof("%v: decode and send", fd) + glog.V(2).Infof("stream(%s): decode and send", fs.pathname) needSend := lastBytes needSend = append(needSend, b[:count]...) sendCount := decodeAndSend(ctx, fs.lines, fs.pathname, len(needSend), needSend, partial) @@ -126,34 +129,35 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // errors, and end on unretriables; e.g. ESTALE looks // retryable. if errors.Is(err, syscall.ESTALE) { - glog.Infof("%v: reopening stream due to %s", fd, err) - if nerr := fs.stream(ctx, wg, waker, fi, true); nerr != nil { - glog.Info(nerr) + glog.Infof("stream(%s): reopening stream due to %s", fs.pathname, err) + // streamFromStart always true on a stream reopen + if nerr := fs.stream(ctx, wg, waker, fi, oneShot, true); nerr != nil { + glog.Infof("stream(%s): new stream: %v", fs.pathname, nerr) } // Close this stream. return } - glog.Info(err) + glog.Infof("stream(%s): read error: %v", fs.pathname, err) } // If we have read no bytes and are at EOF, check for truncation and rotation. if err == io.EOF && count == 0 { - glog.V(2).Infof("%v: eof an no bytes", fd) + glog.V(2).Infof("stream(%s): eof an no bytes", fs.pathname) // Both rotation and truncation need to stat, so check for // rotation first. It is assumed that rotation is the more // common change pattern anyway. newfi, serr := os.Stat(fs.pathname) if serr != nil { - glog.Info(serr) + glog.Infof("stream(%s): stat error: %v", serr) // If this is a NotExist error, then we should wrap up this // goroutine. The Tailer will create a new logstream if the // file is in the middle of a rotation and gets recreated // in the next moment. We can't rely on the Tailer to tell // us we're deleted because the tailer can only tell us to - // Stop, which ends up causing us to race here against + // cancel, which ends up causing us to race here against // detection of IsCompleted. if os.IsNotExist(serr) { - glog.V(2).Infof("%v: source no longer exists, exiting", fd) + glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.pathname) if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) } @@ -166,9 +170,10 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake goto Sleep } if !os.SameFile(fi, newfi) { - glog.V(2).Infof("%v: adding a new file routine", fd) - if err := fs.stream(ctx, wg, waker, newfi, true); err != nil { - glog.Info(err) + glog.V(2).Infof("stream(%s): adding a new file routine", fs.pathname) + // Stream from start always true on a stream reopen + if err := fs.stream(ctx, wg, waker, newfi, oneShot, true); err != nil { + glog.Info("stream(%s): new stream: %v", fs.pathname, err) } // We're at EOF so there's nothing left to read here. return @@ -179,8 +184,8 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.Info(serr) continue } - glog.V(2).Infof("%v: current seek is %d", fd, currentOffset) - glog.V(2).Infof("%v: new size is %d", fd, newfi.Size()) + glog.V(2).Infof("stream(%s): current seek is %d", fs.pathname, currentOffset) + glog.V(2).Infof("stream(%s): new size is %d", fs.pathname, newfi.Size()) // We know that newfi is from the current file. Truncation can // only be detected if the new file is currently shorter than // the current seek offset. In test this can be a race, but in @@ -188,7 +193,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // than the previous after rotation in the time it takes for // mtail to notice. if newfi.Size() < currentOffset { - glog.V(2).Infof("%v: truncate? currentoffset is %d and size is %d", fd, currentOffset, newfi.Size()) + glog.V(2).Infof("stream(%s): truncate? currentoffset is %d and size is %d", fs.pathname, currentOffset, newfi.Size()) // About to lose all remaining data because of the truncate so flush the accumulator. if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) @@ -196,9 +201,9 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake p, serr := fd.Seek(0, io.SeekStart) if serr != nil { logErrors.Add(fs.pathname, 1) - glog.Info(serr) + glog.Infof("stream(%s): seek: %v", fs.pathname, serr) } - glog.V(2).Infof("%v: Seeked to %d", fd, p) + glog.V(2).Infof("stream(%s): Seeked to %d", fs.pathname, p) fileTruncates.Add(fs.pathname, 1) continue } @@ -213,10 +218,10 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake Sleep: // If we get here it's because we've stalled. First test to see if it's // time to exit. - if err == io.EOF || ctx.Err() != nil { - select { - case <-fs.stopChan: - glog.V(2).Infof("%v: stream has been stopped, exiting", fd) + if err == io.EOF { + if oneShot == OneShotEnabled { + // Exit now, because oneShot means read only to EOF. + glog.V(2).Infof("stream(%s): EOF in one shot mode, exiting", fs.pathname) if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) } @@ -224,8 +229,10 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake fs.completed = true fs.mu.Unlock() return + } + select { case <-ctx.Done(): - glog.V(2).Infof("%v: stream has been cancelled, exiting", fd) + glog.V(2).Infof("stream(%s): context has been cancelled, exiting", fs.pathname) if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) } @@ -240,24 +247,21 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Don't exit, instead yield and wait for a termination signal or // wakeup. - glog.V(2).Infof("%v: waiting", fd) + glog.V(2).Infof("stream(%s): waiting", fs.pathname) select { - case <-fs.stopChan: - // We may have started waiting here when the stop signal + case <-ctx.Done(): + // We may have started waiting here when the cancellation // arrives, but since that wait the file may have been // written to. The file is not technically yet at EOF so // we need to go back and try one more read. We'll exit - // the stream in the select stanza above. - glog.V(2).Infof("%v: Stopping after next read", fd) - case <-ctx.Done(): - // Same for cancellation; this makes tests stable, but + // the stream in the select stanza above. This makes tests stable, but // could argue exiting immediately is less surprising. // Assumption is that this doesn't make a difference in // production. - glog.V(2).Infof("%v: Cancelled after next read", fd) + glog.V(2).Infof("stream(%s): Cancelled after next read", fs.pathname) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("%v: Wake received", fd) + glog.V(2).Infof("stream(%s): Wake received", fs.pathname) } } }() @@ -274,8 +278,5 @@ func (fs *fileStream) IsComplete() bool { // Stop implements the LogStream interface. func (fs *fileStream) Stop() { - fs.stopOnce.Do(func() { - glog.Info("signalling stop at next EOF") - close(fs.stopChan) - }) + fs.cancel() } diff --git a/internal/tailer/logstream/filestream_test.go b/internal/tailer/logstream/filestream_test.go index f672a236c..f3ab76d98 100644 --- a/internal/tailer/logstream/filestream_test.go +++ b/internal/tailer/logstream/filestream_test.go @@ -27,19 +27,48 @@ func TestFileStreamRead(t *testing.T) { lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) - awaken(1, 1) + awaken(1, 1) // synchronise past first read testutil.WriteString(t, f, "yo\n") awaken(1, 1) - fs.Stop() + cancel() wg.Wait() close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "yo"}, + {Context: context.TODO(), Filename: name, Line: "yo"}, + } + testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + + if !fs.IsComplete() { + t.Errorf("expecting filestream to be complete because stopped") + } +} + +func TestFileStreamReadOneShot(t *testing.T) { + var wg sync.WaitGroup + + tmpDir := testutil.TestTempDir(t) + + name := filepath.Join(tmpDir, "log") + f := testutil.TestOpenFile(t, name) + defer f.Close() + testutil.WriteString(t, f, "yo\n") + + lines := make(chan *logline.LogLine, 1) + ctx, cancel := context.WithCancel(context.Background()) + waker := waker.NewTestAlways() + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + testutil.FatalIfErr(t, err) + + wg.Wait() + close(lines) + received := testutil.LinesReceived(lines) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "yo"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -62,7 +91,7 @@ func TestFileStreamReadNonSingleByteEnd(t *testing.T) { lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) @@ -80,7 +109,7 @@ func TestFileStreamReadNonSingleByteEnd(t *testing.T) { close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, s}, + {Context: context.TODO(), Filename: name, Line: s}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -103,7 +132,7 @@ func TestStreamDoesntBreakOnCorruptRune(t *testing.T) { lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) @@ -126,7 +155,7 @@ func TestStreamDoesntBreakOnCorruptRune(t *testing.T) { close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, s[1:]}, + {Context: context.TODO(), Filename: name, Line: s[1:]}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -149,7 +178,7 @@ func TestFileStreamTruncation(t *testing.T) { lines := make(chan *logline.LogLine, 3) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) // fs.Stop() is also called explicitly further down but a failed test // and early return would lead to the handle staying open defer fs.Stop() @@ -174,9 +203,9 @@ func TestFileStreamTruncation(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "1"}, - {context.TODO(), name, "2"}, - {context.TODO(), name, "3"}, + {Context: context.TODO(), Filename: name, Line: "1"}, + {Context: context.TODO(), Filename: name, Line: "2"}, + {Context: context.TODO(), Filename: name, Line: "3"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -184,7 +213,7 @@ func TestFileStreamTruncation(t *testing.T) { wg.Wait() } -func TestFileStreamFinishedBecauseCancel(t *testing.T) { +func TestFileStreamPartialRead(t *testing.T) { var wg sync.WaitGroup tmpDir := testutil.TestTempDir(t) @@ -197,30 +226,34 @@ func TestFileStreamFinishedBecauseCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) - awaken(1, 1) // Synchronise past first read after seekToEnd + awaken(1, 1) - testutil.WriteString(t, f, "yo\n") + testutil.WriteString(t, f, "yo") + awaken(1, 1) + + testutil.WriteString(t, f, "\n") awaken(1, 1) cancel() wg.Wait() - close(lines) // Signal it's time to go. + close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "yo"}, + {Context: context.TODO(), Filename: name, Line: "yo"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) if !fs.IsComplete() { - t.Errorf("expecting filestream to be complete because stream was cancelled") + t.Errorf("expecting filestream to be complete because cancellation") } } -func TestFileStreamPartialRead(t *testing.T) { +func TestFileStreamReadToEOFOnCancel(t *testing.T) { var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) tmpDir := testutil.TestTempDir(t) @@ -228,37 +261,30 @@ func TestFileStreamPartialRead(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 1) - ctx, cancel := context.WithCancel(context.Background()) + lines := make(chan *logline.LogLine, 2) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) - testutil.WriteString(t, f, "yo") + testutil.WriteString(t, f, "line 1\n") awaken(1, 1) - // received := testutil.LinesReceived(lines) - // expected := []*logline.LogLine{} - // testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) - - testutil.WriteString(t, f, "\n") - awaken(1, 1) + testutil.WriteString(t, f, "line 2\n") + cancel() // cancel wakes the stream - fs.Stop() wg.Wait() - close(lines) + + close(lines) // Signal it's time to go. received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "yo"}, + {Context: context.TODO(), Filename: name, Line: "line 1"}, + {Context: context.TODO(), Filename: name, Line: "line 2"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because cancellation") } - - cancel() - wg.Wait() } diff --git a/internal/tailer/logstream/filestream_unix_test.go b/internal/tailer/logstream/filestream_unix_test.go index 9b6cc9ae3..ae63173e5 100644 --- a/internal/tailer/logstream/filestream_unix_test.go +++ b/internal/tailer/logstream/filestream_unix_test.go @@ -37,13 +37,11 @@ func TestFileStreamRotation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) - // fs.Stop() is also called explicitly further down but a failed test - // and early return would lead to the handle staying open - defer fs.Stop() + // OneShotDisabled because we hit EOF and need to wait. + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) - awaken(1, 1) + awaken(1, 1) // sync to eof glog.Info("write 1") testutil.WriteString(t, f, "1\n") @@ -61,19 +59,20 @@ func TestFileStreamRotation(t *testing.T) { testutil.WriteString(t, f, "2\n") awaken(1, 1) - fs.Stop() + cancel() wg.Wait() - close(lines) + close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "1"}, - {context.TODO(), name, "2"}, + {Context: context.TODO(), Filename: name, Line: "1"}, + {Context: context.TODO(), Filename: name, Line: "2"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) - cancel() - wg.Wait() + if !fs.IsComplete() { + t.Errorf("expecting filestream to be complete because stopped") + } } func TestFileStreamURL(t *testing.T) { @@ -88,27 +87,26 @@ func TestFileStreamURL(t *testing.T) { lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, "file://"+name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, "file://"+name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) testutil.WriteString(t, f, "yo\n") awaken(1, 1) - fs.Stop() + cancel() wg.Wait() + close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "yo"}, + {Context: context.TODO(), Filename: name, Line: "yo"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") } - cancel() - wg.Wait() } // TestFileStreamOpenFailure is a unix-specific test because on Windows, it is not possible to create a file diff --git a/internal/tailer/logstream/logstream.go b/internal/tailer/logstream/logstream.go index 54b926257..a60a9f416 100644 --- a/internal/tailer/logstream/logstream.go +++ b/internal/tailer/logstream/logstream.go @@ -80,13 +80,13 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st default: glog.V(2).Infof("%v: %q in path pattern %q, treating as path", ErrUnsupportedURLScheme, u.Scheme, pathname) case "unixgram": - return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, lines) + return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot) case "unix": return newSocketStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot) case "tcp": return newSocketStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot) case "udp": - return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, lines) + return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot) case "", "file": path = u.Path } diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index f9493bfb8..f362c2d80 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -69,7 +69,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake logErrors.Add(ps.pathname, 1) return err } - glog.V(2).Infof("opened new pipe %v", fd) + glog.V(2).Infof("stream(%s): opened new pipe %v", ps.pathname, fd) b := make([]byte, defaultPipeReadBufferSize) partial := bytes.NewBufferString("") var total int @@ -77,8 +77,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake go func() { defer wg.Done() defer func() { - glog.V(2).Infof("%v: read total %d bytes from %s", fd, total, ps.pathname) - glog.V(2).Infof("%v: closing file descriptor", fd) + glog.V(2).Infof("stream(%s): read total %d bytes", ps.pathname, fd, total) + glog.V(2).Infof("stream(%s): closing file descriptor %v", ps.pathname, fd) err := fd.Close() if err != nil { logErrors.Add(ps.pathname, 1) @@ -95,7 +95,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake for { n, err := fd.Read(b) - glog.V(2).Infof("%v: read %d bytes, err is %v", fd, n, err) + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.pathname, n, err) if n > 0 { total += n @@ -111,12 +111,12 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if partial.Len() > 0 { sendLine(ctx, ps.pathname, partial, ps.lines) } - glog.V(2).Infof("%v: exiting, stream has error %s", fd, err) + glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.pathname, err) return } // Wait for wakeup or termination. - glog.V(2).Infof("%v: waiting", fd) + glog.V(2).Infof("stream(%s): waiting", ps.pathname) select { case <-ctx.Done(): // Exit immediately; cancelled context is going to cause the @@ -125,7 +125,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake return case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("%v: Wake received", fd) + glog.V(2).Infof("stream(%s): Wake received", ps.pathname) } } }() diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 2e7f860d3..f50d1ef05 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -16,7 +16,8 @@ import ( ) type socketStream struct { - ctx context.Context + cancel context.CancelFunc + lines chan<- *logline.LogLine oneShot OneShotMode @@ -26,16 +27,14 @@ type socketStream struct { mu sync.RWMutex // protects following fields completed bool // This socketStream is completed and can no longer be used. lastReadTime time.Time // Last time a log line was read from this socket - - stopOnce sync.Once // Ensure stopChan only closed once. - stopChan chan struct{} // Close to start graceful shutdown. } func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { if address == "" { return nil, ErrEmptySocketAddress } - ss := &socketStream{ctx: ctx, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})} + ctx, cancel := context.WithCancel(ctx) + ss := &socketStream{cancel: cancel, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} if err := ss.stream(ctx, wg, waker); err != nil { return nil, err } @@ -55,7 +54,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa logErrors.Add(ss.address, 1) return err } - glog.V(2).Infof("opened new socket listener %v", l) + glog.V(2).Infof("stream(%s:%s): opened new socket listener %v", ss.scheme, ss.address, l) initDone := make(chan struct{}) // Set up for shutdown @@ -67,10 +66,9 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa if !ss.oneShot { select { case <-ctx.Done(): - case <-ss.stopChan: } } - glog.V(2).Infof("%v: closing listener", l) + glog.V(2).Infof("stream(%s:%s): closing listener", ss.scheme, ss.address, l) err := l.Close() if err != nil { glog.Info(err) @@ -86,7 +84,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa glog.Info(err) return err } - glog.V(2).Infof("%v: got new conn %v", l, c) + glog.V(2).Infof("stream(%s:%s): got new conn %v", ss.scheme, ss.address, c) wg.Add(1) go ss.handleConn(ctx, wg, waker, c) return nil @@ -99,7 +97,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa if err := acceptConn(); err != nil { glog.Info(err) } - glog.Info("oneshot mode, retuning") + glog.Info("stream(%s:%s): oneshot mode, returning", ss.scheme, ss.address) close(initDone) }() return nil @@ -124,8 +122,8 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake partial := bytes.NewBufferString("") var total int defer func() { - glog.V(2).Infof("%v: read total %d bytes from %s", c, total, ss.address) - glog.V(2).Infof("%v: closing connection", c) + glog.V(2).Infof("stream(%s:%s): read total %d bytes from %s", ss.scheme, ss.address, c, total) + glog.V(2).Infof("stream(%s:%s): closing connection, %v", ss.scheme, ss.address, c) err := c.Close() if err != nil { logErrors.Add(ss.address, 1) @@ -139,12 +137,12 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake for { n, err := c.Read(b) - glog.V(2).Infof("%v: read %d bytes, err is %v", c, n, err) + glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err) if n > 0 { total += n //nolint:contextcheck - decodeAndSend(ss.ctx, ss.lines, ss.address, n, b[:n], partial) + decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial) ss.mu.Lock() ss.lastReadTime = time.Now() ss.mu.Unlock() @@ -154,23 +152,20 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if partial.Len() > 0 { sendLine(ctx, ss.address, partial, ss.lines) } - glog.V(2).Infof("%v: exiting, conn has error %s", c, err) + glog.V(2).Infof("stream(%s:%s): exiting, conn has error %s", ss.scheme, ss.address, err) return } // Yield and wait - glog.V(2).Infof("%v: waiting", c) + glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) select { case <-ctx.Done(): - // Exit immediately; cancelled context will cause the next read to be interrupted and exit anyway, so no point waiting to loop. - return - case <-ss.stopChan: - // Stop after connection is closed. - glog.V(2).Infof("%v: stopchan closed, exiting after next read timeout", c) + // Cancelled context will cause the next read to be interrupted and exit. + glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("%v: Wake received", c) + glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address) } } } @@ -184,8 +179,5 @@ func (ss *socketStream) IsComplete() bool { // Stop implements the Logstream interface. // Stop will close the listener so no new connections will be accepted, and close all current connections once they have been closed by their peers. func (ss *socketStream) Stop() { - ss.stopOnce.Do(func() { - glog.Info("signalling stop at next EOF") - close(ss.stopChan) - }) + ss.cancel() } diff --git a/internal/tailer/logstream/socketstream_unix_test.go b/internal/tailer/logstream/socketstream_unix_test.go index e2b6c9a20..d6f5f3c24 100644 --- a/internal/tailer/logstream/socketstream_unix_test.go +++ b/internal/tailer/logstream/socketstream_unix_test.go @@ -41,7 +41,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { waker, awaken := waker.NewTest(ctx, 1, "stream") sockName := scheme + "://" + addr - ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled) + ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled) testutil.FatalIfErr(t, err) s, err := net.Dial(scheme, addr) @@ -55,22 +55,21 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { // Close the socket to signal to the socketStream to shut down. testutil.FatalIfErr(t, s.Close()) - ss.Stop() // stop after connection closes - wg.Wait() + close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), addr, "1"}, + {Context: context.TODO(), Filename: addr, Line: "1"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) - cancel() - if !ss.IsComplete() { t.Errorf("expecting socketstream to be complete because socket closed") } + + cancel() // stop after connection closes })) } } @@ -108,13 +107,13 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { awaken(0, 0) // Sync past read to ensure we read cancel() // This cancellation should cause the stream to shut down immediately. - wg.Wait() + close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), addr, "1"}, + {Context: context.TODO(), Filename: addr, Line: "1"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) diff --git a/internal/tailer/tail.go b/internal/tailer/tail.go index f1e59c2e4..f60e69694 100644 --- a/internal/tailer/tail.go +++ b/internal/tailer/tail.go @@ -287,10 +287,6 @@ func (t *Tailer) TailPath(pathname string) error { if err != nil { return err } - if t.oneShot { - glog.V(2).Infof("Starting oneshot read at startup of %q", pathname) - l.Stop() - } t.logstreams[pathname] = l glog.Infof("Tailing %s", pathname) logCount.Add(1)