diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index f8fbdb572..599119e3c 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -6,6 +6,8 @@ package logstream import ( "bytes" "context" + "errors" + "io" "os" "sync" "syscall" @@ -46,7 +48,7 @@ func pipeOpen(pathname string) (*os.File, error) { if IsStdinPattern(pathname) { return os.Stdin, nil } - // Open in nonblocking mode because the write end of the pipe may not have started yet. + // Open in nonblocking mode because the write end of the pipe may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842 fd, err := os.OpenFile(pathname, os.O_RDONLY|syscall.O_NONBLOCK, 0o600) // #nosec G304 -- path already validated by caller if err != nil { glog.Warningf("pipeOpen(%s): open failed: %v", pathname, err) @@ -99,6 +101,10 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake SetReadDeadlineOnDone(ctx, fd) for { + // Because we've opened in nonblocking mode, this Read can return + // straight away. If there are no writers, it'll return EOF (per + // `pipe(7)` and `read(2)`.) This is expected when `mtail` is + // starting at system init as the writer may not be ready yet. n, err := fd.Read(b) glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.sourcename, n, err) @@ -115,19 +121,23 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if err == nil && ctx.Err() == nil { continue } - } else if n == 0 { + } else if n == 0 && total > 0 { // `pipe(7)` tells us "If all file descriptors referring to the // write end of a pipe have been closed, then an attempt to // read(2) from the pipe will see end-of-file (read(2) will - // return 0)." + // return 0)." To avoid shutting down the stream at startup + // before any writer has connected to the fifo, condition on + // having read any bytes previously. glog.V(2).Infof("stream(%s): exiting, 0 bytes read", ps.sourcename) return } // Test to see if we should exit. if IsExitableError(err) { - glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err) - return + if !(errors.Is(err, io.EOF) && total == 0) { + glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err) + return + } } // Wait for wakeup or termination.