Skip to content

Commit

Permalink
Merge pull request #897 from google/logstream-refactors
Browse files Browse the repository at this point in the history
refactor: Clean up internals of the logstreams
  • Loading branch information
jaqx0r authored Jul 12, 2024
2 parents 92215e6 + 47e973f commit 0494356
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 274 deletions.
2 changes: 1 addition & 1 deletion internal/mtail/read_pipe_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"golang.org/x/sys/unix"
)

func TestReadFromPipe(t *testing.T) {
func TestReadFromFifo(t *testing.T) {
testutil.SkipIfShort(t)
tmpDir := testutil.TestTempDir(t)

Expand Down
7 changes: 6 additions & 1 deletion internal/mtail/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ func TestMakeServer(tb testing.TB, patternWakers int, streamWakers int, options
tb: tb,
cancel: cancel,
}
ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams")
if streamWakers == 0 {
ts.streamWaker = waker.NewTestAlways()
ts.AwakenLogStreams = func(int, int) {}
} else {
ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams")
}
ts.patternWaker, ts.AwakenPatternPollers = waker.NewTest(ctx, patternWakers, "patterns")
options = append(options,
LogstreamPollWaker(ts.streamWaker),
Expand Down
24 changes: 24 additions & 0 deletions internal/tailer/logstream/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2024 Google Inc. All Rights Reserved.
// This file is available under the Apache license.

package logstream

import (
"time"

"github.com/google/mtail/internal/logline"
)

type streamBase struct {
sourcename string // human readable name of the logstream source

lines chan *logline.LogLine // outbound channel for lines

staleTimer *time.Timer // Expire the stream if no read in 24h.
}

// Lines returns the output log line channel for this stream. The stream is
// completed when this channel closes.
func (s *streamBase) Lines() <-chan *logline.LogLine {
return s.lines
}
10 changes: 9 additions & 1 deletion internal/tailer/logstream/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import (
"github.com/golang/glog"
)

// ReadDeadliner has a SetReadDeadline function to be used for interrupting reads.
type ReadDeadliner interface {
SetReadDeadline(t time.Time) error
}

// SetReadDeadlineOnDone waits for the context to be done, and then sets an
// immediate read deadline on the flie descriptor `d`. This causes any blocked
// reads on that descriptor to return with an i/o timeout error.
func SetReadDeadlineOnDone(ctx context.Context, d ReadDeadliner) {
go func() {
<-ctx.Done()
Expand All @@ -25,7 +29,11 @@ func SetReadDeadlineOnDone(ctx context.Context, d ReadDeadliner) {
}()
}

func IsEndOrCancel(err error) bool {
// IsExitableError returns true if a stream should exit because of this error.
func IsExitableError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) {
return true
}
Expand Down
10 changes: 5 additions & 5 deletions internal/tailer/logstream/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
var logLines = expvar.NewMap("log_lines_total")

// decodeAndSend transforms the byte array `b` into unicode in `partial`, sending to the llp as each newline is decoded.
func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) int {
func (s *streamBase) decodeAndSend(ctx context.Context, n int, b []byte, partial *bytes.Buffer) int {
var (
r rune
width int
Expand Down Expand Up @@ -50,7 +50,7 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname
case r == '\r':
// nom
case r == '\n':
sendLine(ctx, pathname, partial, lines)
s.sendLine(ctx, partial)
default:
partial.WriteRune(r)
}
Expand All @@ -60,9 +60,9 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname
return count
}

func sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) {
func (s *streamBase) sendLine(ctx context.Context, partial *bytes.Buffer) {
glog.V(2).Infof("sendline")
logLines.Add(pathname, 1)
lines <- logline.New(ctx, pathname, partial.String())
logLines.Add(s.sourcename, 1)
s.lines <- logline.New(ctx, s.sourcename, partial.String())
partial.Reset()
}
56 changes: 26 additions & 30 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package logstream
import (
"bytes"
"context"
"fmt"
"net"
"sync"
"time"
Expand All @@ -16,25 +17,28 @@ import (
)

type dgramStream struct {
cancel context.CancelFunc
streamBase

lines chan *logline.LogLine
cancel context.CancelFunc

scheme string // Datagram scheme, either "unixgram" or "udp".
address string // Given name for the underlying socket path on the filesystem or hostport.

mu sync.RWMutex // protects following fields
lastReadTime time.Time // Last time a log line was read from this named pipe

staleTimer *time.Timer // Expire the stream if no read in 24h
}

func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) {
if address == "" {
return nil, ErrEmptySocketAddress
}
ctx, cancel := context.WithCancel(ctx)
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)}
ss := &dgramStream{
cancel: cancel,
scheme: scheme,
address: address,
streamBase: streamBase{
sourcename: fmt.Sprintf("%s://%s", scheme, address),
lines: make(chan *logline.LogLine),
},
}
if err := ss.stream(ctx, wg, waker, oneShot); err != nil {
return nil, err
}
Expand All @@ -50,16 +54,16 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
logErrors.Add(ds.address, 1)
return err
}
glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ds.scheme, ds.address, c)
glog.V(2).Infof("stream(%s): opened new datagram socket %v", ds.sourcename, c)
b := make([]byte, datagramReadBufferSize)
partial := bytes.NewBufferString("")
var total int
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
glog.V(2).Infof("stream(%s:%s): read total %d bytes", ds.scheme, ds.address, total)
glog.V(2).Infof("stream(%s:%s): closing connection", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): read total %d bytes", ds.sourcename, total)
glog.V(2).Infof("stream(%s): closing connection", ds.sourcename)
err := c.Close()
if err != nil {
logErrors.Add(ds.address, 1)
Expand All @@ -75,7 +79,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak

for {
n, _, err := c.ReadFrom(b)
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ds.scheme, ds.address, n, err)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ds.sourcename, n, err)

if ds.staleTimer != nil {
ds.staleTimer.Stop()
Expand All @@ -86,17 +90,17 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
// equivalent to an "EOF" in connection and file oriented streams.
if n == 0 {
if oneShot {
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): exiting because zero byte read and one shot", ds.sourcename)
if partial.Len() > 0 {
sendLine(ctx, ds.address, partial, ds.lines)
ds.sendLine(ctx, partial)
}
return
}
select {
case <-ctx.Done():
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): exiting because zero byte read after cancellation", ds.sourcename)
if partial.Len() > 0 {
sendLine(ctx, ds.address, partial, ds.lines)
ds.sendLine(ctx, partial)
}
return
default:
Expand All @@ -106,10 +110,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
if n > 0 {
total += n
//nolint:contextcheck
decodeAndSend(ctx, ds.lines, ds.address, n, b[:n], partial)
ds.mu.Lock()
ds.lastReadTime = time.Now()
ds.mu.Unlock()
ds.decodeAndSend(ctx, n, b[:n], partial)
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)

// No error implies more to read, so restart the loop.
Expand All @@ -118,16 +119,16 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
}
}

if err != nil && IsEndOrCancel(err) {
if IsExitableError(err) {
if partial.Len() > 0 {
sendLine(ctx, ds.address, partial, ds.lines)
ds.sendLine(ctx, partial)
}
glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ds.scheme, ds.address, err)
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ds.sourcename, err)
return
}

// Yield and wait
glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): waiting", ds.sourcename)
select {
case <-ctx.Done():
// Exit after next read attempt.
Expand All @@ -139,14 +140,9 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
glog.V(2).Infof("stream(%s): context cancelled, exiting after next zero byte read", ds.scheme, ds.address)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): Wake received", ds.sourcename)
}
}
}()
return nil
}

// Lines implements the LogStream interface, returning the output lines channel.
func (ds *dgramStream) Lines() <-chan *logline.LogLine {
return ds.lines
}
4 changes: 2 additions & 2 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
{Context: context.TODO(), Filename: sockName, Line: "1"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines())

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
{Context: context.TODO(), Filename: sockName, Line: "1"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines())

Expand Down
Loading

0 comments on commit 0494356

Please sign in to comment.