diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index b4643a960..058b8174f 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -111,6 +111,11 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak ds.lastReadTime = time.Now() ds.mu.Unlock() ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel) + + // No error implies more to read, so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } } if err != nil && IsEndOrCancel(err) { @@ -125,12 +130,13 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address) select { case <-ctx.Done(): + // Exit after next read attempt. // We may have started waiting here when the stop signal // arrives, but since that wait the file may have been // written to. The file is not technically yet at EOF so // we need to go back and try one more read. We'll exit // the stream in the zero byte handler above. - glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ds.scheme, ds.address) + glog.V(2).Infof("stream(%s): context cancelled, exiting after next zero byte read", ds.scheme, ds.address) case <-waker.Wake(): // sleep until next Wake() glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address) diff --git a/internal/tailer/logstream/dgramstream_unix_test.go b/internal/tailer/logstream/dgramstream_unix_test.go index 29a90e60e..c9e2347dd 100644 --- a/internal/tailer/logstream/dgramstream_unix_test.go +++ b/internal/tailer/logstream/dgramstream_unix_test.go @@ -42,7 +42,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // Stream is not shut down with cancel in this test defer cancel() - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() sockName := scheme + "://" + addr ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled) @@ -59,8 +59,6 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { _, err = s.Write([]byte("1\n")) testutil.FatalIfErr(t, err) - awaken(0, 0) // sync past read - // "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done. _, err = s.Write([]byte{}) testutil.FatalIfErr(t, err) @@ -94,7 +92,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() sockName := scheme + "://" + addr ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled) @@ -111,7 +109,8 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { _, err = s.Write([]byte("1\n")) testutil.FatalIfErr(t, err) - awaken(0, 0) // Synchronise past read. + // Yield to give the stream a chance to read. + time.Sleep(10 * time.Millisecond) cancel() // This cancellation should cause the stream to shut down. wg.Wait() diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index b8baf97be..ad2807baa 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -121,6 +121,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake fs.lastReadTime = time.Now() fs.mu.Unlock() fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel) + + // No error implies there is more to read so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } } if err != nil && err != io.EOF { @@ -206,12 +211,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } } - // No error implies there is more to read in this file so go - // straight back to read unless it looks like context is Done. - if err == nil && ctx.Err() == nil { - continue - } - Sleep: // If we get here it's because we've stalled. First test to see if it's // time to exit. @@ -243,6 +242,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.V(2).Infof("stream(%s): waiting", fs.pathname) select { case <-ctx.Done(): + // Exit after next read attempt. // We may have started waiting here when the cancellation // arrives, but since that wait the file may have been // written to. The file is not technically yet at EOF so @@ -251,7 +251,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // could argue exiting immediately is less surprising. // Assumption is that this doesn't make a difference in // production. - glog.V(2).Infof("stream(%s): Cancelled after next read", fs.pathname) + glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", fs.pathname) case <-waker.Wake(): // sleep until next Wake() glog.V(2).Infof("stream(%s): Wake received", fs.pathname) diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index 67237e262..240ae99f1 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -84,9 +84,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } logCloses.Add(ps.pathname, 1) close(ps.lines) + ps.cancel() }() - ctx, cancel := context.WithCancel(ctx) - defer cancel() SetReadDeadlineOnDone(ctx, fd) for { @@ -105,6 +104,11 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake ps.lastReadTime = time.Now() ps.mu.Unlock() ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel) + + // No error implies there is more to read so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } } // Test to see if we should exit. @@ -120,10 +124,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.V(2).Infof("stream(%s): waiting", ps.pathname) select { case <-ctx.Done(): - // Exit immediately; cancelled context is going to cause the - // next read to be interrupted and exit, so don't bother going - // around the loop again. - return + // Exit after next read attempt. + glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", ps.pathname) case <-waker.Wake(): // sleep until next Wake() glog.V(2).Infof("stream(%s): Wake received", ps.pathname) diff --git a/internal/tailer/logstream/pipestream_unix_test.go b/internal/tailer/logstream/pipestream_unix_test.go index b55ea086f..a6ed66623 100644 --- a/internal/tailer/logstream/pipestream_unix_test.go +++ b/internal/tailer/logstream/pipestream_unix_test.go @@ -73,7 +73,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) { testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666)) ctx, cancel := context.WithCancel(context.Background()) - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) testutil.FatalIfErr(t, err) @@ -87,9 +87,6 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) { testutil.WriteString(t, f, "1\n") - // Avoid a race with cancellation if we can synchronise with waker.Wake() - awaken(0, 0) - cancel() // Cancellation here should cause the stream to shut down. wg.Wait() @@ -155,7 +152,7 @@ func TestPipeStreamReadStdin(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // The stream is not shut down by cancel in this test. defer cancel() - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() ps, err := logstream.New(ctx, &wg, waker, "-", logstream.OneShotDisabled) testutil.FatalIfErr(t, err) @@ -165,7 +162,8 @@ func TestPipeStreamReadStdin(t *testing.T) { } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) - awaken(0, 0) + // Give the stream a chance to wake and read + time.Sleep(10 * time.Millisecond) testutil.FatalIfErr(t, f.Close()) diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 6f275b67f..81b30d2d8 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -135,6 +135,11 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake ss.lastReadTime = time.Now() ss.mu.Unlock() ss.staleTimer = time.AfterFunc(time.Hour*24, ss.cancel) + + // No error implies more to read, so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } } if err != nil && IsEndOrCancel(err) { @@ -150,7 +155,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) select { case <-ctx.Done(): - // Cancelled context will cause the next read to be interrupted and exit. + // Exit after next read attempt. glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address) case <-waker.Wake(): // sleep until next Wake() diff --git a/internal/tailer/logstream/socketstream_unix_test.go b/internal/tailer/logstream/socketstream_unix_test.go index e89d18bc6..68ce5f9c8 100644 --- a/internal/tailer/logstream/socketstream_unix_test.go +++ b/internal/tailer/logstream/socketstream_unix_test.go @@ -40,7 +40,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // The stream is not shut down with cancel in this test. defer cancel() - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() sockName := scheme + "://" + addr ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled) @@ -57,8 +57,6 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { _, err = s.Write([]byte("1\n")) testutil.FatalIfErr(t, err) - awaken(0, 0) // Sync past read - // Close the socket to signal to the socketStream to shut down. testutil.FatalIfErr(t, s.Close()) @@ -91,7 +89,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() sockName := scheme + "://" + addr ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled) @@ -108,7 +106,8 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { _, err = s.Write([]byte("1\n")) testutil.FatalIfErr(t, err) - awaken(0, 0) // Sync past read to ensure we read + // Yield to give the stream a chance to read. + time.Sleep(10 * time.Millisecond) cancel() // This cancellation should cause the stream to shut down immediately. wg.Wait()