Skip to content

Commit

Permalink
Merge pull request #902 from google/decode-fix
Browse files Browse the repository at this point in the history
fix: Correctly handle lines that cross buffer boundaries.
  • Loading branch information
jaqx0r authored Jul 17, 2024
2 parents fa07f61 + b35d739 commit abe92eb
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 167 deletions.
4 changes: 0 additions & 4 deletions internal/tailer/logstream/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@
package logstream

import (
"time"

"github.com/google/mtail/internal/logline"
)

type streamBase struct {
sourcename string // human readable name of the logstream source

lines chan *logline.LogLine // outbound channel for lines

staleTimer *time.Timer // Expire the stream if no read in 24h.
}

// Lines returns the output log line channel for this stream. The stream is
Expand Down
68 changes: 0 additions & 68 deletions internal/tailer/logstream/decode.go

This file was deleted.

35 changes: 14 additions & 21 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
package logstream

import (
"bytes"
"context"
"fmt"
"net"
"sync"
"time"

"github.com/golang/glog"
"github.com/google/mtail/internal/logline"
Expand Down Expand Up @@ -55,8 +53,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
return err
}
glog.V(2).Infof("stream(%s): opened new datagram socket %v", ds.sourcename, c)
b := make([]byte, datagramReadBufferSize)
partial := bytes.NewBufferString("")
lr := NewLineReader(ds.sourcename, ds.lines, &dgramConn{c}, datagramReadBufferSize, ds.cancel)
var total int
wg.Add(1)
go func() {
Expand All @@ -70,6 +67,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
glog.Info(err)
}
logCloses.Add(ds.address, 1)
lr.Finish(ctx)
close(ds.lines)
ds.cancel()
}()
Expand All @@ -78,40 +76,27 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
SetReadDeadlineOnDone(ctx, c)

for {
n, _, err := c.ReadFrom(b)
n, err := lr.ReadAndSend(ctx)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ds.sourcename, n, err)

if ds.staleTimer != nil {
ds.staleTimer.Stop()
}

// This is a test-only trick that says if we've already put this
// logstream in graceful shutdown, then a zero-byte read is
// equivalent to an "EOF" in connection and file oriented streams.
if n == 0 {
if oneShot {
glog.V(2).Infof("stream(%s): exiting because zero byte read and one shot", ds.sourcename)
if partial.Len() > 0 {
ds.sendLine(ctx, partial)
}
return
}
select {
case <-ctx.Done():
glog.V(2).Infof("stream(%s): exiting because zero byte read after cancellation", ds.sourcename)
if partial.Len() > 0 {
ds.sendLine(ctx, partial)
}
return
default:
}
}

if n > 0 {
total += n
//nolint:contextcheck
ds.decodeAndSend(ctx, n, b[:n], partial)
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)

// No error implies more to read, so restart the loop.
if err == nil && ctx.Err() == nil {
Expand All @@ -120,9 +105,6 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
}

if IsExitableError(err) {
if partial.Len() > 0 {
ds.sendLine(ctx, partial)
}
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ds.sourcename, err)
return
}
Expand All @@ -146,3 +128,14 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
}()
return nil
}

// dgramConn wraps a PacketConn to add a Read method.
type dgramConn struct {
net.PacketConn
}

// Read satisfies io.Reader
func (d *dgramConn) Read(p []byte) (count int, err error) {
count, _, err = d.ReadFrom(p)
return
}
26 changes: 7 additions & 19 deletions internal/tailer/logstream/fifostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
package logstream

import (
"bytes"
"context"
"errors"
"io"
"os"
"sync"
"syscall"
"time"

"github.com/golang/glog"
"github.com/google/mtail/internal/logline"
Expand Down Expand Up @@ -77,8 +75,7 @@ func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
if err != nil {
return err
}
b := make([]byte, defaultFifoReadBufferSize)
partial := bytes.NewBufferString("")
lr := NewLineReader(ps.sourcename, ps.lines, fd, defaultFifoReadBufferSize, ps.cancel)
var total int
wg.Add(1)
go func() {
Expand All @@ -92,30 +89,17 @@ func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.Info(err)
}
logCloses.Add(ps.pathname, 1)
if partial.Len() > 0 {
ps.sendLine(ctx, partial)
}
lr.Finish(ctx)
close(ps.lines)
ps.cancel()
}()
SetReadDeadlineOnDone(ctx, fd)

for {
// Because we've opened in nonblocking mode, this Read can return
// straight away. If there are no writers, it'll return EOF (per
// `pipe(7)` and `read(2)`.) This is expected when `mtail` is
// starting at system init as the writer may not be ready yet.
n, err := fd.Read(b)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.sourcename, n, err)

if ps.staleTimer != nil {
ps.staleTimer.Stop()
}
n, err := lr.ReadAndSend(ctx)

if n > 0 {
total += n
ps.decodeAndSend(ctx, n, b[:n], partial)
ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel)

// No error implies there is more to read so restart the loop.
if err == nil && ctx.Err() == nil {
Expand All @@ -134,6 +118,10 @@ func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake

// Test to see if we should exit.
if IsExitableError(err) {
// Because we've opened in nonblocking mode, this Read can return
// straight away. If there are no writers, it'll return EOF (per
// `pipe(7)` and `read(2)`.) This is expected when `mtail` is
// starting at system init as the writer may not be ready yet.
if !(errors.Is(err, io.EOF) && total == 0) {
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err)
return
Expand Down
8 changes: 4 additions & 4 deletions internal/tailer/logstream/fifostream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"golang.org/x/sys/unix"
)

func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) {
func TestFifoStreamReadCompletedBecauseClosed(t *testing.T) {
testutil.TimeoutTest(1*time.Second, func(t *testing.T) { //nolint:thelper
var wg sync.WaitGroup

Expand Down Expand Up @@ -63,7 +63,7 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) {
})(t)
}

func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
func TestFifoStreamReadCompletedBecauseCancel(t *testing.T) {
testutil.TimeoutTest(1*time.Second, func(t *testing.T) { // nolint:thelper
var wg sync.WaitGroup

Expand Down Expand Up @@ -98,7 +98,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
})(t)
}

func TestPipeStreamReadURL(t *testing.T) {
func TestFifoStreamReadURL(t *testing.T) {
var wg sync.WaitGroup

tmpDir := testutil.TestTempDir(t)
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestPipeStreamReadURL(t *testing.T) {
cancel() // no-op for pipes
}

func TestPipeStreamReadStdin(t *testing.T) {
func TestFifoStreamReadStdin(t *testing.T) {
var wg sync.WaitGroup

tmpDir := testutil.TestTempDir(t)
Expand Down
Loading

0 comments on commit abe92eb

Please sign in to comment.