Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ScanNow() #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ var (
// from previously calling Start and not yet calling Close.
ErrWatcherRunning = errors.New("error: watcher is already running")

// ErrWatcherNotRunning occurs when trying to perform a ScanNow
// when the watcher is not running. It will also occur if Close is called
// whilst a ScanNow() is running / pending.
ErrWatcherNotRunning = errors.New("error: watcher is not running")

// ErrWatchedFileDeleted is an error that occurs when a file or folder that was
// being watched has been deleted.
ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted")
Expand Down Expand Up @@ -129,6 +134,7 @@ type Watcher struct {
ops map[Op]struct{} // Op filtering.
ignoreHidden bool // ignore hidden files or not.
maxEvents int // max sent events per cycle
scanNow chan chan struct{} // allows requests for immediate synchronous scans
}

// New creates a new Watcher.
Expand All @@ -147,6 +153,7 @@ func New() *Watcher {
files: make(map[string]os.FileInfo),
ignored: make(map[string]struct{}),
names: make(map[string]bool),
scanNow: make(chan chan struct{}),
}
}

Expand Down Expand Up @@ -550,6 +557,8 @@ func (w *Watcher) Start(d time.Duration) error {
// Unblock w.Wait().
w.wg.Done()

var scanNowRequest chan struct{}

for {
// done lets the inner polling cycle loop know when the
// current cycle's method has finished executing.
Expand Down Expand Up @@ -589,7 +598,7 @@ func (w *Watcher) Start(d time.Duration) error {
}
}
numEvents++
if w.maxEvents > 0 && numEvents > w.maxEvents {
if scanNowRequest == nil && w.maxEvents > 0 && numEvents > w.maxEvents {
close(cancel)
break inner
}
Expand All @@ -604,8 +613,52 @@ func (w *Watcher) Start(d time.Duration) error {
w.files = fileList
w.mu.Unlock()

if scanNowRequest != nil {
close(scanNowRequest)
scanNowRequest = nil
}

// Sleep and then continue to the next loop iteration.
time.Sleep(d)
// If a request to do a full scan is received, handle it and then signal to the requester it is complete.
select {
case <-w.close: // break out of wait early if we get a Close
case scanNowRequest = <-w.scanNow: // sync scan request received
case <-time.After(d): // periodic re-roll time elapsed
}
}
}

// ScanNow can be called on a already running watcher to perform an immediate synchronous scan of all watched files
// and generate the events for any changes. When ScanNow() returns to the caller, all events for any changed files
// have been published. ScanNow() can be used when you know FS changes have occurred and you want to ensure all events
// for the changes have been gathered before continuing, for example, to better process batched updates to groups of
// files.
// You can also specify a very long poll duration and then use ScanNow() to break from the poll wait and perform a scan
// before going back to sleep.
func (w *Watcher) ScanNow() error {
w.mu.Lock()
if !w.running {
w.mu.Unlock()
return ErrWatcherNotRunning
}
w.mu.Unlock()

scanComplete := make(chan struct{})
select {
case w.scanNow <- scanComplete:
case <-w.close:
// if the watcher is no longer running, or is closed whilst we're waiting for our scan to be accepted, return
// an error
return ErrWatcherNotRunning
}

select {
case <-w.close:
// if the watcher is closed whilst we're waiting for our scan to complete, return an error
return ErrWatcherNotRunning
case <-scanComplete:
// scan completed ok
return nil
}
}

Expand Down Expand Up @@ -700,6 +753,7 @@ func (w *Watcher) Wait() {
}

// Close stops a Watcher and unlocks its mutex, then sends a close signal.
// Note, it is not safe to Start() a Watcher again after closing it. You must create a new Watcher.
func (w *Watcher) Close() {
w.mu.Lock()
if !w.running {
Expand All @@ -711,5 +765,6 @@ func (w *Watcher) Close() {
w.names = make(map[string]bool)
w.mu.Unlock()
// Send a close signal to the Start method.
w.close <- struct{}{}
// Use a channel close() rather than a channel write, so that ScanNow() can react to the closure also.
close(w.close)
}
84 changes: 84 additions & 0 deletions watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,90 @@ func TestTriggerEvent(t *testing.T) {
wg.Wait()
}

func TestScanNow(t *testing.T) {
testDir, teardown := setup(t)
defer teardown()

w := New()
w.FilterOps(Create)

// Add the testDir to the watchlist.
if err := w.AddRecursive(testDir); err != nil {
t.Fatal(err)
}

// should not be able to ScanNow() before the watcher is started
if err := w.ScanNow(); err != ErrWatcherNotRunning {
t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one")
}

testFilePath := filepath.Join(testDir, "test_file1.txt")
done := make(chan struct{})
go func() {
evt := <-w.Event
if evt.Op == Create && evt.Path == testFilePath {
close(done)
} else {
t.Fatal("unexpected event")
}
}()

// Start scanning with a very long poll duration
go func() {
if err := w.Start(time.Hour); err != nil {
t.Fatal(err)
}
}()

w.Wait()
defer w.Close()

// perform initial scan, which should yield no changes
// this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again
if err := w.ScanNow(); err != nil {
t.Error(err)
}

// wait for a short period just to ensure no unexpected events arrive
select {
case <-time.After(time.Millisecond * 100):
case <-done:
t.Fatal("should not have received an event as no changes have occurred since ScanNow() completed")
}

// create the test file, we will not receive events due to the 1hr poll duration
if err := ioutil.WriteFile(testFilePath, []byte{}, 0755); err != nil {
t.Error(err)
}

// wait for a short period just to ensure no unexpected events arrive now we've changed a file
select {
case <-time.After(time.Millisecond * 100):
case <-done:
t.Fatal("should not have received an event as a poll duration of 1 hour is used")
}

// issue a scan now, and we will receive the events while ScanNow() is running.
if err := w.ScanNow(); err != nil {
t.Error(err)
}

// all events should have been received *whilst* ScanNow() was running, so the done channel should already be
// closed
select {
case <-done:
default:
t.Fatal("events from ScanNow() should have been received before ScanNow() returned")
}

w.Close()

// issue a scan now after closing, should error
if err := w.ScanNow(); err != ErrWatcherNotRunning {
t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one")
}
}

func TestEventAddFile(t *testing.T) {
testDir, teardown := setup(t)
defer teardown()
Expand Down