Skip to content

Commit

Permalink
Merge pull request #868 from google/tailer-shutdown
Browse files Browse the repository at this point in the history
feat: Handle `stdin` closure and shut down the tailer.
  • Loading branch information
jaqx0r authored May 26, 2024
2 parents d8f9c09 + e99f7e5 commit 291f5a1
Show file tree
Hide file tree
Showing 34 changed files with 487 additions and 432 deletions.
23 changes: 12 additions & 11 deletions .github/workflows/automerge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ jobs:
enable-automerge:
if: github.event.pull_request.user.login == 'dependabot[bot]' && contains(github.event.pull_request.labels.*.name, 'dependencies')
runs-on: ubuntu-latest
# https://github.com/orgs/community/discussions/24686
permissions:
# enable-automerge is a graphql query, not REST, so isn't documented,
# except in a mention in
# https://github.blog/changelog/2021-02-04-pull-request-auto-merge-is-now-generally-available/
# which says "can only be enabled by users with permissino to merge"; the
# REST documentation says you need contents: write to perform a merge.
# https://github.community/t/what-permission-does-a-github-action-need-to-call-graphql-enablepullrequestautomerge/197708
# says this is it
contents: write
pull-requests: write
steps:
# Enable auto-merge *before* issuing an approval.
- uses: alexwilson/enable-github-automerge-action@main
with:
github-token: "${{ secrets.GITHUB_TOKEN }}"
- run: |
gh api graphql -f pullRequestId="${{ github.event.pull_request.node_id }}" -f query='
mutation EnablePullRequestAutoMerge($pullRequestId: ID!) {
enablePullRequestAutoMerge(input: {pullRequestId: $pullRequestId}) {
clientMutationId
}
}
'
env:
GH_TOKEN: ${{ github.token }}
wait-on-checks:
needs: enable-automerge
Expand Down
2 changes: 1 addition & 1 deletion cmd/mtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func main() {
}
if *staleLogGcTickInterval > 0 {
staleLogGcWaker := waker.NewTimed(ctx, *staleLogGcTickInterval)
opts = append(opts, mtail.StaleLogGcWaker(staleLogGcWaker))
opts = append(opts, mtail.GcWaker(staleLogGcWaker))
}
if *pollInterval > 0 {
logStreamPollWaker := waker.NewTimed(ctx, *pollInterval)
Expand Down
13 changes: 7 additions & 6 deletions internal/mtail/basic_tail_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
func TestBasicTail(t *testing.T) {
testutil.SkipIfShort(t)
if testing.Verbose() {
testutil.SetFlag(t, "vmodule", "tail=2,log_watcher=2")
testutil.SetFlag(t, "vmodule", "tail=2,filestream=2")
}
logDir := testutil.TestTempDir(t)

m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail"))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail"))
defer stopM()

logFile := filepath.Join(logDir, "log")
Expand All @@ -31,12 +31,13 @@ func TestBasicTail(t *testing.T) {

f := testutil.TestOpenFile(t, logFile)
defer f.Close()
m.PollWatched(1) // Force sync to EOF
m.AwakenPatternPollers(1, 1) // Find `logFile`
m.AwakenLogStreams(1, 1) // Force a sync to EOF

for i := 1; i <= 3; i++ {
testutil.WriteString(t, f, fmt.Sprintf("%d\n", i))
}
m.PollWatched(1) // Expect to read 3 lines here.
m.AwakenLogStreams(1, 1) // Expect to read 3 lines here.

var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -57,7 +58,7 @@ func TestNewLogDoesNotMatchIsIgnored(t *testing.T) {

// Start mtail
logFilepath := filepath.Join(workdir, "log")
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(logFilepath))
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.LogPathPatterns(logFilepath))
defer stopM()

logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", 0)
Expand All @@ -68,7 +69,7 @@ func TestNewLogDoesNotMatchIsIgnored(t *testing.T) {
logFile, err := os.Create(newLogFilepath)
testutil.FatalIfErr(t, err)
defer logFile.Close()
m.PollWatched(0) // No streams so don't wait for any.
// No streams so don't wait for any.

logCountCheck()
}
6 changes: 3 additions & 3 deletions internal/mtail/examples_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestExamplePrograms(t *testing.T) {
t.Run(fmt.Sprintf("%s on %s", tc.programfile, tc.logfile),
testutil.TimeoutTest(exampleTimeout, func(t *testing.T) { //nolint:thelper
ctx, cancel := context.WithCancel(context.Background())
waker, _ := waker.NewTest(ctx, 0) // oneshot means we should never need to wake the stream
waker, _ := waker.NewTest(ctx, 0, "waker") // oneshot means we should never need to wake the stream
store := metrics.NewStore()
programFile := filepath.Join("../..", tc.programfile)
mtail, err := mtail.New(ctx, store, mtail.ProgramPath(programFile), mtail.LogPathPatterns(tc.logfile), mtail.OneShot, mtail.OmitMetricSource, mtail.DumpAstTypes, mtail.DumpBytecode, mtail.LogPatternPollWaker(waker), mtail.LogstreamPollWaker(waker))
Expand Down Expand Up @@ -155,7 +155,7 @@ func BenchmarkProgram(b *testing.B) {
logFile := filepath.Join(logDir, "test.log")
log := testutil.TestOpenFile(b, logFile)
ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1)
waker, awaken := waker.NewTest(ctx, 1, "streams")
store := metrics.NewStore()
programFile := filepath.Join("../..", bm.programfile)
mtail, err := mtail.New(ctx, store, mtail.ProgramPath(programFile), mtail.LogPathPatterns(log.Name()), mtail.LogstreamPollWaker(waker))
Expand All @@ -176,7 +176,7 @@ func BenchmarkProgram(b *testing.B) {
count, err := io.Copy(log, l)
testutil.FatalIfErr(b, err)
total += count
awaken(1)
awaken(1, 1)
}
cancel()
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion internal/mtail/examples_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestFileSocketStreamComparison(t *testing.T) {
defer wg.Done()
source, err := os.OpenFile(tc.logfile, os.O_RDONLY, 0)
testutil.FatalIfErr(t, err)
s, err := net.DialUnix(scheme, nil, &net.UnixAddr{sockName, scheme})
s, err := net.DialUnix(scheme, nil, &net.UnixAddr{Name: sockName, Net: scheme})
testutil.FatalIfErr(t, err)
n, err := io.Copy(s, source)
testutil.FatalIfErr(t, err)
Expand Down
10 changes: 6 additions & 4 deletions internal/mtail/log_deletion_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@ func TestLogDeletion(t *testing.T) {
logFile := testutil.TestOpenFile(t, logFilepath)
defer logFile.Close()

m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logFilepath))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logFilepath))
defer stopM()

logCloseCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)

m.PollWatched(1) // Force sync to EOF
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1) // Force read to EOF

glog.Info("remove")
testutil.FatalIfErr(t, os.Remove(logFilepath))

m.PollWatched(0) // one pass to stop
m.AwakenLogStreams(1, 0) // run stream to observe it's missing
logCloseCheck()
m.PollWatched(0) // one pass to remove completed stream
m.AwakenGcPoller(1, 1)
logCountCheck()
}
33 changes: 19 additions & 14 deletions internal/mtail/log_glob_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestGlobBeforeStart(t *testing.T) {
testutil.WriteString(t, log, "\n")
log.Close()
}
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
stopM()

if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
Expand Down Expand Up @@ -75,10 +75,10 @@ func TestGlobAfterStart(t *testing.T) {
false,
},
}
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
defer stopM()

m.PollWatched(0) // Force sync to EOF
m.AwakenPatternPollers(1, 1)

var count int64
for _, tt := range globTests {
Expand All @@ -90,9 +90,8 @@ func TestGlobAfterStart(t *testing.T) {
for _, tt := range globTests {
log := testutil.TestOpenFile(t, tt.name)
defer log.Close()
m.PollWatched(0) // Force sync to EOF
m.AwakenPatternPollers(1, 1)
}
// m.PollWatched(2)
logCountCheck()
}

Expand Down Expand Up @@ -142,7 +141,7 @@ func TestGlobIgnoreFolder(t *testing.T) {
testutil.WriteString(t, log, "\n")
}

m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))

stopM()

Expand Down Expand Up @@ -184,7 +183,7 @@ func TestFilenameRegexIgnore(t *testing.T) {
testutil.WriteString(t, log, "\n")
}

m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))

stopM()

Expand All @@ -207,7 +206,7 @@ func TestGlobRelativeAfterStart(t *testing.T) {
// Move to logdir to make relative paths
testutil.Chdir(t, logDir)

m, stopM := mtail.TestStartServer(t, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns("log.*"))
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.ProgramPath(progDir), mtail.LogPathPatterns("log.*"))
defer stopM()

{
Expand All @@ -217,9 +216,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
f := testutil.TestOpenFile(t, logFile)
defer f.Close()

m.PollWatched(1) // Force sync to EOF
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(0, 1) // Force read to EOF

testutil.WriteString(t, f, "line 1\n")
m.PollWatched(1)
m.AwakenLogStreams(1, 1)

logCountCheck()
}
Expand All @@ -232,9 +233,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
f := testutil.TestOpenFile(t, logFile)
defer f.Close()

m.PollWatched(2)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 2) // Force read to EOF

testutil.WriteString(t, f, "line 1\n")
m.PollWatched(2)
m.AwakenLogStreams(2, 2)

logCountCheck()
}
Expand All @@ -245,9 +248,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
f := testutil.TestOpenFile(t, logFile)
defer f.Close()

m.PollWatched(2)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(2, 2) // Force read to EOF

testutil.WriteString(t, f, "line 2\n")
m.PollWatched(2)
m.AwakenLogStreams(2, 2)

logCountCheck()
}
Expand Down
31 changes: 20 additions & 11 deletions internal/mtail/log_rotation_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/google/mtail/internal/testutil"
)

func TestLogSoftLinkChange(t *testing.T) {
func TestLogRotationBySoftLinkChange(t *testing.T) {
testutil.SkipIfShort(t)

for _, tc := range []bool{false, true} {
Expand All @@ -29,7 +29,7 @@ func TestLogSoftLinkChange(t *testing.T) {

logFilepath := filepath.Join(workdir, "log")

m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logFilepath))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logFilepath))
defer stopM()

logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", 1)
Expand All @@ -40,33 +40,42 @@ func TestLogSoftLinkChange(t *testing.T) {

testutil.FatalIfErr(t, os.Symlink(logFilepath+".true1", logFilepath))
glog.Info("symlinked")
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)

inputLines := []string{"hi1", "hi2", "hi3"}
for _, x := range inputLines {
testutil.WriteString(t, trueLog1, x+"\n")
}
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)

trueLog2 := testutil.TestOpenFile(t, logFilepath+".true2")
defer trueLog2.Close()
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)
m.AwakenGcPoller(1, 1)
logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
testutil.FatalIfErr(t, os.Remove(logFilepath))
if tc {
m.PollWatched(0) // simulate race condition with this poll.
logClosedCheck() // sync when filestream closes fd
m.PollWatched(0) // invoke the GC
logCompletedCheck() // sync to when the logstream is removed from tailer
// Simulate a race where we poll for a pattern and remove the
// existing stream.
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
m.AwakenLogStreams(1, 0)
logClosedCheck() // barrier until filestream closes fd
m.AwakenGcPoller(1, 1)
logCompletedCheck() // barrier until the logstream is removed from tailer
}
testutil.FatalIfErr(t, os.Symlink(logFilepath+".true2", logFilepath))
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(0, 1)

for _, x := range inputLines {
testutil.WriteString(t, trueLog2, x+"\n")
}
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)

var wg sync.WaitGroup
wg.Add(2)
Expand Down
33 changes: 19 additions & 14 deletions internal/mtail/log_rotation_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"github.com/google/mtail/internal/testutil"
)

// TestLogRotation is a unix-specific test because on Windows, files cannot be removed
// or renamed while there is an open read handle on them. Instead, log rotation would
// have to be implemented by copying and then truncating the original file. That test
// case is already covered by TestLogTruncation.
func TestLogRotation(t *testing.T) {
// TestLogRotationByRename is a unix-specific test because on Windows, files
// cannot be removed or renamed while there is an open read handle on
// them. Instead, log rotation would have to be implemented by copying and then
// truncating the original file. That test case is already covered by
// TestLogTruncation.
func TestLogRotationByRename(t *testing.T) {
testutil.SkipIfShort(t)

for _, tc := range []bool{false, true} {
Expand All @@ -45,34 +46,38 @@ func TestLogRotation(t *testing.T) {
f := testutil.TestOpenFile(t, logFile)
defer f.Close()

m, stopM := mtail.TestStartServer(t, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns(logDir+"/log"))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns(logDir+"/log"))
defer stopM()

logOpensTotalCheck := m.ExpectMapExpvarDeltaWithDeadline("log_opens_total", logFile, 1)
logLinesTotalCheck := m.ExpectMapExpvarDeltaWithDeadline("log_lines_total", logFile, 3)

testutil.WriteString(t, f, "line 1\n")
m.PollWatched(1)
m.AwakenLogStreams(1, 1)

testutil.WriteString(t, f, "line 2\n")
m.PollWatched(1)
m.AwakenLogStreams(1, 1)

logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFile, 1)
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
glog.Info("rename")
err = os.Rename(logFile, logFile+".1")
testutil.FatalIfErr(t, err)
if tc {
m.PollWatched(0) // simulate race condition with this poll.
logClosedCheck() // sync when filestream closes fd
m.PollWatched(0) // invoke the GC
logCompletedCheck() // sync to when the logstream is removed from tailer
// Simulate a race where we poll for a pattern and remove the
// existing stream.
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
m.AwakenLogStreams(1, 0)
logClosedCheck() // barrier until filestream closes fd
m.AwakenGcPoller(1, 1)
logCompletedCheck() // barrier until the logstream is removed from tailer
}
glog.Info("create")
f = testutil.TestOpenFile(t, logFile)
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(0, 1)
testutil.WriteString(t, f, "line 1\n")
m.PollWatched(1)
m.AwakenLogStreams(1, 1)

var wg sync.WaitGroup
wg.Add(2)
Expand Down
Loading

0 comments on commit 291f5a1

Please sign in to comment.