Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fsnotify error monitoring #1191

Merged
14 changes: 14 additions & 0 deletions pkg/workceptor/mock_workceptor/workunitbase.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions pkg/workceptor/workunitbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
type WatcherWrapper interface {
Add(name string) error
Close() error
ErrorChannel() chan error
EventChannel() chan fsnotify.Event
}

Expand All @@ -52,6 +53,10 @@ func (rw *RealWatcher) Close() error {
return rw.watcher.Close()
}

func (rw *RealWatcher) ErrorChannel() chan error {
return rw.watcher.Errors
}

func (rw *RealWatcher) EventChannel() chan fsnotify.Event {
return rw.watcher.Events
}
Expand Down Expand Up @@ -126,6 +131,7 @@ func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs
if err == nil {
bwu.watcher = &RealWatcher{watcher: watcher}
} else {
bwu.w.nc.GetLogger().Info("fsnotify.NewWatcher returned %s", err)
bwu.watcher = nil
}
}
Expand Down Expand Up @@ -392,16 +398,20 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() {
var watcherEvents chan fsnotify.Event
watcherEvents = make(chan fsnotify.Event)

var watcherErrors chan error
watcherErrors = make(chan error)

if bwu.watcher != nil {
err := bwu.watcher.Add(statusFile)
if err == nil {
defer func() {
werr := bwu.watcher.Close()
if werr != nil {
bwu.w.nc.GetLogger().Error("Error closing %s: %s", statusFile, err)
bwu.w.nc.GetLogger().Error("Error in defer closing %s: %s", statusFile, err)
}
}()
watcherEvents = bwu.watcher.EventChannel()
watcherErrors = bwu.watcher.ErrorChannel()
} else {
werr := bwu.watcher.Close()
if werr != nil {
Expand All @@ -412,6 +422,7 @@ func (bwu *BaseWorkUnit) MonitorLocalStatus() {
}
fi, err := bwu.fs.Stat(statusFile)
if err != nil {
bwu.w.nc.GetLogger().Error("Error retrieving stat for %s: %s", statusFile, err)
fi = nil
}

Expand All @@ -424,7 +435,7 @@ loop:
if event.Op&fsnotify.Write == fsnotify.Write {
err = bwu.Load()
if err != nil {
bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err)
bwu.w.nc.GetLogger().Error("Watcher Events Error reading %s: %s", statusFile, err)
}
}
case <-time.After(time.Second):
Expand All @@ -433,9 +444,14 @@ loop:
fi = newFi
err = bwu.Load()
if err != nil {
bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err)
bwu.w.nc.GetLogger().Error("Work unit load Error reading %s: %s", statusFile, err)
}
}
case err, ok := <-watcherErrors:
if !ok {
return
}
bwu.w.nc.GetLogger().Error("fsnotify Error reading %s: %s", statusFile, err)
}
complete := IsComplete(bwu.Status().State)
if complete {
Expand Down
4 changes: 4 additions & 0 deletions pkg/workceptor/workunitbase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ func TestMonitorLocalStatus(t *testing.T) {
eventCh := make(chan fsnotify.Event, 1)
mockWatcher.EXPECT().EventChannel().Return(eventCh).AnyTimes()
go func() { eventCh <- *tc.fsNotifyEvent }()

errorCh := make(chan error, 1)
mockWatcher.EXPECT().ErrorChannel().Return(errorCh).AnyTimes()
// go func() { errorCh <- nil }()
}

go bwu.MonitorLocalStatus()
Expand Down