Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Handle stdin closure and shut down the tailer. #868

Merged
merged 25 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
aa0b787
refactor: Extract a stdin-pattern predicate.
jaqx0r Apr 27, 2024
9188e91
fix: Don't add stdin to the glob patterns.
jaqx0r May 8, 2024
45d2de8
refactor: Extract `doPatternGlob` from `PollLogPatterns`.
jaqx0r May 8, 2024
926ef7b
refactor: Store the log pattern poll waker in the Tailer struct.
jaqx0r May 8, 2024
76abfd9
chore: Appease linter, name struct fields.
jaqx0r May 8, 2024
01db877
chore: Make some log messages more useful for debugging.
jaqx0r May 8, 2024
7c233c4
refactor: Immediately tail sockets when discovered.
jaqx0r May 8, 2024
ae2de42
test: Rename log rotation integration tests for clarity
jaqx0r May 8, 2024
2ec6596
refactor: Rename fields in preparation for other wakers in test.
jaqx0r May 8, 2024
4536f9d
test: Improve debug log messages for the TestWaker.
jaqx0r May 8, 2024
3fdefe4
test: Remove `LoadAllPrograms` from `TestServer.PollWatched`.
jaqx0r May 13, 2024
83bd57d
refactor: Extract a separate routine for polling for completed logstr…
jaqx0r May 13, 2024
ff649a5
test: Remove `ExpireStaleLogStreams` from test poll action.
jaqx0r May 13, 2024
dd11ff8
test: Extract `AwakenLogStreams` from `PollWatched` and rename the la…
jaqx0r May 13, 2024
29b9e02
ci: Update the automerge workflow based on the community discussion.
jaqx0r May 25, 2024
ff635f9
Extract pattern polling into a goroutine per pattern.
jaqx0r May 25, 2024
c172b6c
Move the stale log gc off the subroutine waitgroup.
jaqx0r May 25, 2024
e06012d
Use a cancelfunc instead of another channel to shut down the Tailer.
jaqx0r May 25, 2024
455da56
Cancel the `mtail` context at shutdown, to shut down child goroutines.
jaqx0r May 26, 2024
6e3f971
Move the stream cleanup onto the GC poller.
jaqx0r May 26, 2024
eb29578
Downgrade some log messages.
jaqx0r May 26, 2024
820798f
chore: Lint fixes.
jaqx0r May 26, 2024
95a23a6
Remove context from struct.
jaqx0r May 26, 2024
5e470ac
chore: Fix linter warning.
jaqx0r May 26, 2024
e99f7e5
test: Fix a race condition waiting for the pattern waker attached to …
jaqx0r May 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions .github/workflows/automerge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ jobs:
enable-automerge:
if: github.event.pull_request.user.login == 'dependabot[bot]' && contains(github.event.pull_request.labels.*.name, 'dependencies')
runs-on: ubuntu-latest
# https://github.com/orgs/community/discussions/24686
permissions:
# enable-automerge is a graphql query, not REST, so isn't documented,
# except in a mention in
# https://github.blog/changelog/2021-02-04-pull-request-auto-merge-is-now-generally-available/
# which says "can only be enabled by users with permissino to merge"; the
# REST documentation says you need contents: write to perform a merge.
# https://github.community/t/what-permission-does-a-github-action-need-to-call-graphql-enablepullrequestautomerge/197708
# says this is it
contents: write
pull-requests: write
steps:
# Enable auto-merge *before* issuing an approval.
- uses: alexwilson/enable-github-automerge-action@main
with:
github-token: "${{ secrets.GITHUB_TOKEN }}"
- run: |
gh api graphql -f pullRequestId="${{ github.event.pull_request.node_id }}" -f query='
mutation EnablePullRequestAutoMerge($pullRequestId: ID!) {
enablePullRequestAutoMerge(input: {pullRequestId: $pullRequestId}) {
clientMutationId
}
}
'
env:
GH_TOKEN: ${{ github.token }}

wait-on-checks:
needs: enable-automerge
Expand Down
2 changes: 1 addition & 1 deletion cmd/mtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func main() {
}
if *staleLogGcTickInterval > 0 {
staleLogGcWaker := waker.NewTimed(ctx, *staleLogGcTickInterval)
opts = append(opts, mtail.StaleLogGcWaker(staleLogGcWaker))
opts = append(opts, mtail.GcWaker(staleLogGcWaker))
}
if *pollInterval > 0 {
logStreamPollWaker := waker.NewTimed(ctx, *pollInterval)
Expand Down
13 changes: 7 additions & 6 deletions internal/mtail/basic_tail_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
func TestBasicTail(t *testing.T) {
testutil.SkipIfShort(t)
if testing.Verbose() {
testutil.SetFlag(t, "vmodule", "tail=2,log_watcher=2")
testutil.SetFlag(t, "vmodule", "tail=2,filestream=2")
}
logDir := testutil.TestTempDir(t)

m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail"))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail"))
defer stopM()

logFile := filepath.Join(logDir, "log")
Expand All @@ -31,12 +31,13 @@ func TestBasicTail(t *testing.T) {

f := testutil.TestOpenFile(t, logFile)
defer f.Close()
m.PollWatched(1) // Force sync to EOF
m.AwakenPatternPollers(1, 1) // Find `logFile`
m.AwakenLogStreams(1, 1) // Force a sync to EOF

for i := 1; i <= 3; i++ {
testutil.WriteString(t, f, fmt.Sprintf("%d\n", i))
}
m.PollWatched(1) // Expect to read 3 lines here.
m.AwakenLogStreams(1, 1) // Expect to read 3 lines here.

var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -57,7 +58,7 @@ func TestNewLogDoesNotMatchIsIgnored(t *testing.T) {

// Start mtail
logFilepath := filepath.Join(workdir, "log")
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(logFilepath))
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.LogPathPatterns(logFilepath))
defer stopM()

logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", 0)
Expand All @@ -68,7 +69,7 @@ func TestNewLogDoesNotMatchIsIgnored(t *testing.T) {
logFile, err := os.Create(newLogFilepath)
testutil.FatalIfErr(t, err)
defer logFile.Close()
m.PollWatched(0) // No streams so don't wait for any.
// No streams so don't wait for any.

logCountCheck()
}
6 changes: 3 additions & 3 deletions internal/mtail/examples_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestExamplePrograms(t *testing.T) {
t.Run(fmt.Sprintf("%s on %s", tc.programfile, tc.logfile),
testutil.TimeoutTest(exampleTimeout, func(t *testing.T) { //nolint:thelper
ctx, cancel := context.WithCancel(context.Background())
waker, _ := waker.NewTest(ctx, 0) // oneshot means we should never need to wake the stream
waker, _ := waker.NewTest(ctx, 0, "waker") // oneshot means we should never need to wake the stream
store := metrics.NewStore()
programFile := filepath.Join("../..", tc.programfile)
mtail, err := mtail.New(ctx, store, mtail.ProgramPath(programFile), mtail.LogPathPatterns(tc.logfile), mtail.OneShot, mtail.OmitMetricSource, mtail.DumpAstTypes, mtail.DumpBytecode, mtail.LogPatternPollWaker(waker), mtail.LogstreamPollWaker(waker))
Expand Down Expand Up @@ -155,7 +155,7 @@ func BenchmarkProgram(b *testing.B) {
logFile := filepath.Join(logDir, "test.log")
log := testutil.TestOpenFile(b, logFile)
ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1)
waker, awaken := waker.NewTest(ctx, 1, "streams")
store := metrics.NewStore()
programFile := filepath.Join("../..", bm.programfile)
mtail, err := mtail.New(ctx, store, mtail.ProgramPath(programFile), mtail.LogPathPatterns(log.Name()), mtail.LogstreamPollWaker(waker))
Expand All @@ -176,7 +176,7 @@ func BenchmarkProgram(b *testing.B) {
count, err := io.Copy(log, l)
testutil.FatalIfErr(b, err)
total += count
awaken(1)
awaken(1, 1)
}
cancel()
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion internal/mtail/examples_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestFileSocketStreamComparison(t *testing.T) {
defer wg.Done()
source, err := os.OpenFile(tc.logfile, os.O_RDONLY, 0)
testutil.FatalIfErr(t, err)
s, err := net.DialUnix(scheme, nil, &net.UnixAddr{sockName, scheme})
s, err := net.DialUnix(scheme, nil, &net.UnixAddr{Name: sockName, Net: scheme})
testutil.FatalIfErr(t, err)
n, err := io.Copy(s, source)
testutil.FatalIfErr(t, err)
Expand Down
10 changes: 6 additions & 4 deletions internal/mtail/log_deletion_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@ func TestLogDeletion(t *testing.T) {
logFile := testutil.TestOpenFile(t, logFilepath)
defer logFile.Close()

m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logFilepath))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logFilepath))
defer stopM()

logCloseCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)

m.PollWatched(1) // Force sync to EOF
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1) // Force read to EOF

glog.Info("remove")
testutil.FatalIfErr(t, os.Remove(logFilepath))

m.PollWatched(0) // one pass to stop
m.AwakenLogStreams(1, 0) // run stream to observe it's missing
logCloseCheck()
m.PollWatched(0) // one pass to remove completed stream
m.AwakenGcPoller(1, 1)
logCountCheck()
}
33 changes: 19 additions & 14 deletions internal/mtail/log_glob_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestGlobBeforeStart(t *testing.T) {
testutil.WriteString(t, log, "\n")
log.Close()
}
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
stopM()

if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
Expand Down Expand Up @@ -75,10 +75,10 @@ func TestGlobAfterStart(t *testing.T) {
false,
},
}
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
defer stopM()

m.PollWatched(0) // Force sync to EOF
m.AwakenPatternPollers(1, 1)

var count int64
for _, tt := range globTests {
Expand All @@ -90,9 +90,8 @@ func TestGlobAfterStart(t *testing.T) {
for _, tt := range globTests {
log := testutil.TestOpenFile(t, tt.name)
defer log.Close()
m.PollWatched(0) // Force sync to EOF
m.AwakenPatternPollers(1, 1)
}
// m.PollWatched(2)
logCountCheck()
}

Expand Down Expand Up @@ -142,7 +141,7 @@ func TestGlobIgnoreFolder(t *testing.T) {
testutil.WriteString(t, log, "\n")
}

m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))

stopM()

Expand Down Expand Up @@ -184,7 +183,7 @@ func TestFilenameRegexIgnore(t *testing.T) {
testutil.WriteString(t, log, "\n")
}

m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))

stopM()

Expand All @@ -207,7 +206,7 @@ func TestGlobRelativeAfterStart(t *testing.T) {
// Move to logdir to make relative paths
testutil.Chdir(t, logDir)

m, stopM := mtail.TestStartServer(t, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns("log.*"))
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.ProgramPath(progDir), mtail.LogPathPatterns("log.*"))
defer stopM()

{
Expand All @@ -217,9 +216,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
f := testutil.TestOpenFile(t, logFile)
defer f.Close()

m.PollWatched(1) // Force sync to EOF
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(0, 1) // Force read to EOF

testutil.WriteString(t, f, "line 1\n")
m.PollWatched(1)
m.AwakenLogStreams(1, 1)

logCountCheck()
}
Expand All @@ -232,9 +233,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
f := testutil.TestOpenFile(t, logFile)
defer f.Close()

m.PollWatched(2)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 2) // Force read to EOF

testutil.WriteString(t, f, "line 1\n")
m.PollWatched(2)
m.AwakenLogStreams(2, 2)

logCountCheck()
}
Expand All @@ -245,9 +248,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
f := testutil.TestOpenFile(t, logFile)
defer f.Close()

m.PollWatched(2)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(2, 2) // Force read to EOF

testutil.WriteString(t, f, "line 2\n")
m.PollWatched(2)
m.AwakenLogStreams(2, 2)

logCountCheck()
}
Expand Down
31 changes: 20 additions & 11 deletions internal/mtail/log_rotation_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/google/mtail/internal/testutil"
)

func TestLogSoftLinkChange(t *testing.T) {
func TestLogRotationBySoftLinkChange(t *testing.T) {
testutil.SkipIfShort(t)

for _, tc := range []bool{false, true} {
Expand All @@ -29,7 +29,7 @@ func TestLogSoftLinkChange(t *testing.T) {

logFilepath := filepath.Join(workdir, "log")

m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logFilepath))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logFilepath))
defer stopM()

logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", 1)
Expand All @@ -40,33 +40,42 @@ func TestLogSoftLinkChange(t *testing.T) {

testutil.FatalIfErr(t, os.Symlink(logFilepath+".true1", logFilepath))
glog.Info("symlinked")
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)

inputLines := []string{"hi1", "hi2", "hi3"}
for _, x := range inputLines {
testutil.WriteString(t, trueLog1, x+"\n")
}
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)

trueLog2 := testutil.TestOpenFile(t, logFilepath+".true2")
defer trueLog2.Close()
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)
m.AwakenGcPoller(1, 1)
logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
testutil.FatalIfErr(t, os.Remove(logFilepath))
if tc {
m.PollWatched(0) // simulate race condition with this poll.
logClosedCheck() // sync when filestream closes fd
m.PollWatched(0) // invoke the GC
logCompletedCheck() // sync to when the logstream is removed from tailer
// Simulate a race where we poll for a pattern and remove the
// existing stream.
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
m.AwakenLogStreams(1, 0)
logClosedCheck() // barrier until filestream closes fd
m.AwakenGcPoller(1, 1)
logCompletedCheck() // barrier until the logstream is removed from tailer
}
testutil.FatalIfErr(t, os.Symlink(logFilepath+".true2", logFilepath))
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(0, 1)

for _, x := range inputLines {
testutil.WriteString(t, trueLog2, x+"\n")
}
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)

var wg sync.WaitGroup
wg.Add(2)
Expand Down
33 changes: 19 additions & 14 deletions internal/mtail/log_rotation_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"github.com/google/mtail/internal/testutil"
)

// TestLogRotation is a unix-specific test because on Windows, files cannot be removed
// or renamed while there is an open read handle on them. Instead, log rotation would
// have to be implemented by copying and then truncating the original file. That test
// case is already covered by TestLogTruncation.
func TestLogRotation(t *testing.T) {
// TestLogRotationByRename is a unix-specific test because on Windows, files
// cannot be removed or renamed while there is an open read handle on
// them. Instead, log rotation would have to be implemented by copying and then
// truncating the original file. That test case is already covered by
// TestLogTruncation.
func TestLogRotationByRename(t *testing.T) {
testutil.SkipIfShort(t)

for _, tc := range []bool{false, true} {
Expand All @@ -45,34 +46,38 @@ func TestLogRotation(t *testing.T) {
f := testutil.TestOpenFile(t, logFile)
defer f.Close()

m, stopM := mtail.TestStartServer(t, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns(logDir+"/log"))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns(logDir+"/log"))
defer stopM()

logOpensTotalCheck := m.ExpectMapExpvarDeltaWithDeadline("log_opens_total", logFile, 1)
logLinesTotalCheck := m.ExpectMapExpvarDeltaWithDeadline("log_lines_total", logFile, 3)

testutil.WriteString(t, f, "line 1\n")
m.PollWatched(1)
m.AwakenLogStreams(1, 1)

testutil.WriteString(t, f, "line 2\n")
m.PollWatched(1)
m.AwakenLogStreams(1, 1)

logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFile, 1)
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
glog.Info("rename")
err = os.Rename(logFile, logFile+".1")
testutil.FatalIfErr(t, err)
if tc {
m.PollWatched(0) // simulate race condition with this poll.
logClosedCheck() // sync when filestream closes fd
m.PollWatched(0) // invoke the GC
logCompletedCheck() // sync to when the logstream is removed from tailer
// Simulate a race where we poll for a pattern and remove the
// existing stream.
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
m.AwakenLogStreams(1, 0)
logClosedCheck() // barrier until filestream closes fd
m.AwakenGcPoller(1, 1)
logCompletedCheck() // barrier until the logstream is removed from tailer
}
glog.Info("create")
f = testutil.TestOpenFile(t, logFile)
m.PollWatched(1)
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(0, 1)
testutil.WriteString(t, f, "line 1\n")
m.PollWatched(1)
m.AwakenLogStreams(1, 1)

var wg sync.WaitGroup
wg.Add(2)
Expand Down
Loading
Loading