diff --git a/watcher.go b/watcher.go index 4da4dfe..ff73df4 100644 --- a/watcher.go +++ b/watcher.go @@ -22,6 +22,11 @@ var ( // from previously calling Start and not yet calling Close. ErrWatcherRunning = errors.New("error: watcher is already running") + // ErrWatcherNotRunning occurs when trying to perform a ScanNow + // when the watcher is not running. It will also occur if Close is called + // whilst a ScanNow() is running / pending. + ErrWatcherNotRunning = errors.New("error: watcher is not running") + // ErrWatchedFileDeleted is an error that occurs when a file or folder that was // being watched has been deleted. ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted") @@ -129,6 +134,7 @@ type Watcher struct { ops map[Op]struct{} // Op filtering. ignoreHidden bool // ignore hidden files or not. maxEvents int // max sent events per cycle + scanNow chan chan struct{} // allows requests for immediate synchronous scans } // New creates a new Watcher. @@ -147,6 +153,7 @@ func New() *Watcher { files: make(map[string]os.FileInfo), ignored: make(map[string]struct{}), names: make(map[string]bool), + scanNow: make(chan chan struct{}), } } @@ -550,6 +557,8 @@ func (w *Watcher) Start(d time.Duration) error { // Unblock w.Wait(). w.wg.Done() + var scanNowRequest chan struct{} + for { // done lets the inner polling cycle loop know when the // current cycle's method has finished executing. @@ -589,7 +598,7 @@ func (w *Watcher) Start(d time.Duration) error { } } numEvents++ - if w.maxEvents > 0 && numEvents > w.maxEvents { + if scanNowRequest == nil && w.maxEvents > 0 && numEvents > w.maxEvents { close(cancel) break inner } @@ -604,8 +613,52 @@ func (w *Watcher) Start(d time.Duration) error { w.files = fileList w.mu.Unlock() + if scanNowRequest != nil { + close(scanNowRequest) + scanNowRequest = nil + } + // Sleep and then continue to the next loop iteration. - time.Sleep(d) + // If a request to do a full scan is received, handle it and then signal to the requester it is complete. + select { + case <-w.close: // break out of wait early if we get a Close + case scanNowRequest = <-w.scanNow: // sync scan request received + case <-time.After(d): // periodic re-roll time elapsed + } + } +} + +// ScanNow can be called on a already running watcher to perform an immediate synchronous scan of all watched files +// and generate the events for any changes. When ScanNow() returns to the caller, all events for any changed files +// have been published. ScanNow() can be used when you know FS changes have occurred and you want to ensure all events +// for the changes have been gathered before continuing, for example, to better process batched updates to groups of +// files. +// You can also specify a very long poll duration and then use ScanNow() to break from the poll wait and perform a scan +// before going back to sleep. +func (w *Watcher) ScanNow() error { + w.mu.Lock() + if !w.running { + w.mu.Unlock() + return ErrWatcherNotRunning + } + w.mu.Unlock() + + scanComplete := make(chan struct{}) + select { + case w.scanNow <- scanComplete: + case <-w.close: + // if the watcher is no longer running, or is closed whilst we're waiting for our scan to be accepted, return + // an error + return ErrWatcherNotRunning + } + + select { + case <-w.close: + // if the watcher is closed whilst we're waiting for our scan to complete, return an error + return ErrWatcherNotRunning + case <-scanComplete: + // scan completed ok + return nil } } @@ -700,6 +753,7 @@ func (w *Watcher) Wait() { } // Close stops a Watcher and unlocks its mutex, then sends a close signal. +// Note, it is not safe to Start() a Watcher again after closing it. You must create a new Watcher. func (w *Watcher) Close() { w.mu.Lock() if !w.running { @@ -711,5 +765,6 @@ func (w *Watcher) Close() { w.names = make(map[string]bool) w.mu.Unlock() // Send a close signal to the Start method. - w.close <- struct{}{} + // Use a channel close() rather than a channel write, so that ScanNow() can react to the closure also. + close(w.close) } diff --git a/watcher_test.go b/watcher_test.go index 4453b99..7d484f4 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -539,6 +539,90 @@ func TestTriggerEvent(t *testing.T) { wg.Wait() } +func TestScanNow(t *testing.T) { + testDir, teardown := setup(t) + defer teardown() + + w := New() + w.FilterOps(Create) + + // Add the testDir to the watchlist. + if err := w.AddRecursive(testDir); err != nil { + t.Fatal(err) + } + + // should not be able to ScanNow() before the watcher is started + if err := w.ScanNow(); err != ErrWatcherNotRunning { + t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one") + } + + testFilePath := filepath.Join(testDir, "test_file1.txt") + done := make(chan struct{}) + go func() { + evt := <-w.Event + if evt.Op == Create && evt.Path == testFilePath { + close(done) + } else { + t.Fatal("unexpected event") + } + }() + + // Start scanning with a very long poll duration + go func() { + if err := w.Start(time.Hour); err != nil { + t.Fatal(err) + } + }() + + w.Wait() + defer w.Close() + + // perform initial scan, which should yield no changes + // this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again + if err := w.ScanNow(); err != nil { + t.Error(err) + } + + // wait for a short period just to ensure no unexpected events arrive + select { + case <-time.After(time.Millisecond * 100): + case <-done: + t.Fatal("should not have received an event as no changes have occurred since ScanNow() completed") + } + + // create the test file, we will not receive events due to the 1hr poll duration + if err := ioutil.WriteFile(testFilePath, []byte{}, 0755); err != nil { + t.Error(err) + } + + // wait for a short period just to ensure no unexpected events arrive now we've changed a file + select { + case <-time.After(time.Millisecond * 100): + case <-done: + t.Fatal("should not have received an event as a poll duration of 1 hour is used") + } + + // issue a scan now, and we will receive the events while ScanNow() is running. + if err := w.ScanNow(); err != nil { + t.Error(err) + } + + // all events should have been received *whilst* ScanNow() was running, so the done channel should already be + // closed + select { + case <-done: + default: + t.Fatal("events from ScanNow() should have been received before ScanNow() returned") + } + + w.Close() + + // issue a scan now after closing, should error + if err := w.ScanNow(); err != ErrWatcherNotRunning { + t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one") + } +} + func TestEventAddFile(t *testing.T) { testDir, teardown := setup(t) defer teardown()