Skip to content

Commit

Permalink
fix: Handle startup conditions for fifos.
Browse files Browse the repository at this point in the history
When opening a fifo in nonblocking mode to allow for delays in the writer, we
also turn the Read into a nonblocking read.  This is desirable for context
cancellation but makes the Read return before bytes may be ready.

`pipe(7)` and `read(2)` POSIX manpages tell us that if there are no writers
connected, `read` will return end-of-file.  In Go, this means we get a (0,
EOF) pair.  We shouldn't necessarily exit straight away though, because the
writer may not have started writing yet.  In test this is visible as race
conditions when the `Read` is scheduled before the test performs a `Write`.

We can decide that a stream is active once a single byte has been read, and
only allow EOF to trigger shutdown on an active stream.  Thus we check here for
`total > 0` for 0 byte and EOF err responses on `Read` before exiting the
stream.

This fixes known races in `pipestream` so far.
  • Loading branch information
jaqx0r committed Jul 12, 2024
1 parent 08c5cd2 commit 2e17520
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions internal/tailer/logstream/pipestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package logstream
import (
"bytes"
"context"
"errors"
"io"
"os"
"sync"
"syscall"
Expand Down Expand Up @@ -46,7 +48,7 @@ func pipeOpen(pathname string) (*os.File, error) {
if IsStdinPattern(pathname) {
return os.Stdin, nil
}
// Open in nonblocking mode because the write end of the pipe may not have started yet.
// Open in nonblocking mode because the write end of the pipe may not have started yet; this also gives us the ability to set a read deadline when the context is cancelled. https://github.com/golang/go/issues/24842
fd, err := os.OpenFile(pathname, os.O_RDONLY|syscall.O_NONBLOCK, 0o600) // #nosec G304 -- path already validated by caller
if err != nil {
glog.Warningf("pipeOpen(%s): open failed: %v", pathname, err)
Expand Down Expand Up @@ -99,6 +101,10 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
SetReadDeadlineOnDone(ctx, fd)

for {
// Because we've opened in nonblocking mode, this Read can return
// straight away. If there are no writers, it'll return EOF (per
// `pipe(7)` and `read(2)`.) This is expected when `mtail` is
// starting at system init as the writer may not be ready yet.
n, err := fd.Read(b)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.sourcename, n, err)

Expand All @@ -115,19 +121,23 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
if err == nil && ctx.Err() == nil {
continue
}
} else if n == 0 {
} else if n == 0 && total > 0 {
// `pipe(7)` tells us "If all file descriptors referring to the
// write end of a pipe have been closed, then an attempt to
// read(2) from the pipe will see end-of-file (read(2) will
// return 0)."
// return 0)." To avoid shutting down the stream at startup
// before any writer has connected to the fifo, condition on
// having read any bytes previously.
glog.V(2).Infof("stream(%s): exiting, 0 bytes read", ps.sourcename)
return
}

// Test to see if we should exit.
if IsExitableError(err) {
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err)
return
if !(errors.Is(err, io.EOF) && total == 0) {
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err)
return
}
}

// Wait for wakeup or termination.
Expand Down

0 comments on commit 2e17520

Please sign in to comment.