From f76f19a2dd2b760d0dbc3c5ea192cc49d49fe845 Mon Sep 17 00:00:00 2001 From: Mark Tully Date: Fri, 10 Jan 2020 17:44:14 +0000 Subject: [PATCH] added ScanNow() ScanNow() causes the file watch loop to run once, and ensures all events have been posted before returning. It allows the caller to "synchronize" with the pending watcher events, so is useful for batch updating after it is known file events have probably occurred. --- watcher.go | 61 +++++++++++++++++++++++++++++++++-- watcher_test.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 3 deletions(-) diff --git a/watcher.go b/watcher.go index 4da4dfe..ff73df4 100644 --- a/watcher.go +++ b/watcher.go @@ -22,6 +22,11 @@ var ( // from previously calling Start and not yet calling Close. ErrWatcherRunning = errors.New("error: watcher is already running") + // ErrWatcherNotRunning occurs when trying to perform a ScanNow + // when the watcher is not running. It will also occur if Close is called + // whilst a ScanNow() is running / pending. + ErrWatcherNotRunning = errors.New("error: watcher is not running") + // ErrWatchedFileDeleted is an error that occurs when a file or folder that was // being watched has been deleted. ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted") @@ -129,6 +134,7 @@ type Watcher struct { ops map[Op]struct{} // Op filtering. ignoreHidden bool // ignore hidden files or not. maxEvents int // max sent events per cycle + scanNow chan chan struct{} // allows requests for immediate synchronous scans } // New creates a new Watcher. @@ -147,6 +153,7 @@ func New() *Watcher { files: make(map[string]os.FileInfo), ignored: make(map[string]struct{}), names: make(map[string]bool), + scanNow: make(chan chan struct{}), } } @@ -550,6 +557,8 @@ func (w *Watcher) Start(d time.Duration) error { // Unblock w.Wait(). w.wg.Done() + var scanNowRequest chan struct{} + for { // done lets the inner polling cycle loop know when the // current cycle's method has finished executing. @@ -589,7 +598,7 @@ func (w *Watcher) Start(d time.Duration) error { } } numEvents++ - if w.maxEvents > 0 && numEvents > w.maxEvents { + if scanNowRequest == nil && w.maxEvents > 0 && numEvents > w.maxEvents { close(cancel) break inner } @@ -604,8 +613,52 @@ func (w *Watcher) Start(d time.Duration) error { w.files = fileList w.mu.Unlock() + if scanNowRequest != nil { + close(scanNowRequest) + scanNowRequest = nil + } + // Sleep and then continue to the next loop iteration. - time.Sleep(d) + // If a request to do a full scan is received, handle it and then signal to the requester it is complete. + select { + case <-w.close: // break out of wait early if we get a Close + case scanNowRequest = <-w.scanNow: // sync scan request received + case <-time.After(d): // periodic re-roll time elapsed + } + } +} + +// ScanNow can be called on a already running watcher to perform an immediate synchronous scan of all watched files +// and generate the events for any changes. When ScanNow() returns to the caller, all events for any changed files +// have been published. ScanNow() can be used when you know FS changes have occurred and you want to ensure all events +// for the changes have been gathered before continuing, for example, to better process batched updates to groups of +// files. +// You can also specify a very long poll duration and then use ScanNow() to break from the poll wait and perform a scan +// before going back to sleep. +func (w *Watcher) ScanNow() error { + w.mu.Lock() + if !w.running { + w.mu.Unlock() + return ErrWatcherNotRunning + } + w.mu.Unlock() + + scanComplete := make(chan struct{}) + select { + case w.scanNow <- scanComplete: + case <-w.close: + // if the watcher is no longer running, or is closed whilst we're waiting for our scan to be accepted, return + // an error + return ErrWatcherNotRunning + } + + select { + case <-w.close: + // if the watcher is closed whilst we're waiting for our scan to complete, return an error + return ErrWatcherNotRunning + case <-scanComplete: + // scan completed ok + return nil } } @@ -700,6 +753,7 @@ func (w *Watcher) Wait() { } // Close stops a Watcher and unlocks its mutex, then sends a close signal. +// Note, it is not safe to Start() a Watcher again after closing it. You must create a new Watcher. func (w *Watcher) Close() { w.mu.Lock() if !w.running { @@ -711,5 +765,6 @@ func (w *Watcher) Close() { w.names = make(map[string]bool) w.mu.Unlock() // Send a close signal to the Start method. - w.close <- struct{}{} + // Use a channel close() rather than a channel write, so that ScanNow() can react to the closure also. + close(w.close) } diff --git a/watcher_test.go b/watcher_test.go index 4453b99..7d484f4 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -539,6 +539,90 @@ func TestTriggerEvent(t *testing.T) { wg.Wait() } +func TestScanNow(t *testing.T) { + testDir, teardown := setup(t) + defer teardown() + + w := New() + w.FilterOps(Create) + + // Add the testDir to the watchlist. + if err := w.AddRecursive(testDir); err != nil { + t.Fatal(err) + } + + // should not be able to ScanNow() before the watcher is started + if err := w.ScanNow(); err != ErrWatcherNotRunning { + t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one") + } + + testFilePath := filepath.Join(testDir, "test_file1.txt") + done := make(chan struct{}) + go func() { + evt := <-w.Event + if evt.Op == Create && evt.Path == testFilePath { + close(done) + } else { + t.Fatal("unexpected event") + } + }() + + // Start scanning with a very long poll duration + go func() { + if err := w.Start(time.Hour); err != nil { + t.Fatal(err) + } + }() + + w.Wait() + defer w.Close() + + // perform initial scan, which should yield no changes + // this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again + if err := w.ScanNow(); err != nil { + t.Error(err) + } + + // wait for a short period just to ensure no unexpected events arrive + select { + case <-time.After(time.Millisecond * 100): + case <-done: + t.Fatal("should not have received an event as no changes have occurred since ScanNow() completed") + } + + // create the test file, we will not receive events due to the 1hr poll duration + if err := ioutil.WriteFile(testFilePath, []byte{}, 0755); err != nil { + t.Error(err) + } + + // wait for a short period just to ensure no unexpected events arrive now we've changed a file + select { + case <-time.After(time.Millisecond * 100): + case <-done: + t.Fatal("should not have received an event as a poll duration of 1 hour is used") + } + + // issue a scan now, and we will receive the events while ScanNow() is running. + if err := w.ScanNow(); err != nil { + t.Error(err) + } + + // all events should have been received *whilst* ScanNow() was running, so the done channel should already be + // closed + select { + case <-done: + default: + t.Fatal("events from ScanNow() should have been received before ScanNow() returned") + } + + w.Close() + + // issue a scan now after closing, should error + if err := w.ScanNow(); err != ErrWatcherNotRunning { + t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one") + } +} + func TestEventAddFile(t *testing.T) { testDir, teardown := setup(t) defer teardown()