From 66c76d47f816de47b74e20b6ae4215f46ef32245 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 24 Sep 2024 11:20:55 -0700 Subject: [PATCH] client: unexport the watcher type and update NewUpdater (#118) The watcher semantics are still useful for plumbing, but it's hard to use correctly, so unexport it. Now that watcher is unexported, move its tests from a separate package into an internal package test. It is now no longer necessary to have the "watcher" method, so remove it. An Updater now no longer accepts a watcher (which is unexported), but instead takes a *Store and the desired secret name directly. Rework the constructor to allow reporting an error. Add a StaticUpdater constructor to make an Updater that vends a static value, analogous to StaticSecret. --- client/setec/fields.go | 4 +- client/setec/fields_test.go | 24 ++--- client/setec/internal_test.go | 78 +++++++++++++++ client/setec/store.go | 177 ++++++++++++++-------------------- client/setec/store_test.go | 158 +++++++++++------------------- client/setec/watcher.go | 63 ++++++++++++ 6 files changed, 282 insertions(+), 222 deletions(-) create mode 100644 client/setec/internal_test.go create mode 100644 client/setec/watcher.go diff --git a/client/setec/fields.go b/client/setec/fields.go index e4edf8c..37d5632 100644 --- a/client/setec/fields.go +++ b/client/setec/fields.go @@ -166,7 +166,7 @@ func (f fieldInfo) apply(ctx context.Context, s *Store, fullName string) error { } if f.vtype == watcherType { - w, err := s.LookupWatcher(ctx, fullName) + w, err := s.lookupWatcher(ctx, fullName) if err != nil { return err } @@ -198,7 +198,7 @@ var ( bytesType = reflect.TypeOf([]byte(nil)) secretType = reflect.TypeOf(Secret(nil)) stringType = reflect.TypeOf(string("")) - watcherType = reflect.TypeOf(Watcher{}) + watcherType = reflect.TypeOf(watcher{}) binaryType = reflect.TypeOf((*encoding.BinaryUnmarshaler)(nil)).Elem() ) diff --git a/client/setec/fields_test.go b/client/setec/fields_test.go index 0a2b50c..7c931cd 100644 --- a/client/setec/fields_test.go +++ b/client/setec/fields_test.go @@ -66,15 +66,14 @@ func TestFields(t *testing.T) { // Verify that if we parse secrets with a store enabled, we correctly plumb // the values from the service into the tagged fields. type testTarget struct { - A string `setec:"apple"` - B binValue `setec:"bin-value"` - BP *binValue `setec:"bin-value-ptr"` - P []byte `setec:"pear"` - L setec.Secret `setec:"plum"` - C setec.Watcher `setec:"cherry"` - X string // untagged, not affected - J testObj `setec:"object-value,json"` - Z int `setec:"int-value,json"` + A string `setec:"apple"` + B binValue `setec:"bin-value"` + BP *binValue `setec:"bin-value-ptr"` + P []byte `setec:"pear"` + L setec.Secret `setec:"plum"` + X string // untagged, not affected + J testObj `setec:"object-value,json"` + Z int `setec:"int-value,json"` } var obj testTarget @@ -98,7 +97,7 @@ func TestFields(t *testing.T) { // Check that secret names respect prefixing. if diff := cmp.Diff(f.Secrets(), []string{ "test/apple", "test/bin-value", "test/bin-value-ptr", - "test/pear", "test/plum", "test/cherry", + "test/pear", "test/plum", "test/object-value", "test/int-value", }); diff != "" { t.Errorf("Prefixed secret names (-got, +want):\n%s", diff) @@ -110,7 +109,7 @@ func TestFields(t *testing.T) { } // Don't try to compare complex plumbing; see below. - opt := cmpopts.IgnoreFields(testTarget{}, "L", "C") + opt := cmpopts.IgnoreFields(testTarget{}, "L") if diff := cmp.Diff(obj, testTarget{ A: secrets["apple"], B: binValue{"kumquat", "quince"}, @@ -126,9 +125,6 @@ func TestFields(t *testing.T) { if got, want := string(obj.L.Get()), secrets["plum"]; got != want { t.Errorf("Secret field: got %q, want %q", got, want) } - if got, want := string(obj.C.Get()), secrets["cherry"]; got != want { - t.Errorf("Secret field: got %q, want %q", got, want) - } } func TestParseErrors(t *testing.T) { diff --git a/client/setec/internal_test.go b/client/setec/internal_test.go new file mode 100644 index 0000000..130ccd5 --- /dev/null +++ b/client/setec/internal_test.go @@ -0,0 +1,78 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package setec + +import ( + "context" + "net/http/httptest" + "testing" + "time" + + "github.com/tailscale/setec/setectest" +) + +func TestWatcher(t *testing.T) { + d := setectest.NewDB(t, nil) + d.MustPut(d.Superuser, "green", "eggs and ham") // active + v2 := d.MustPut(d.Superuser, "green", "grow the rushes oh") + + ts := setectest.NewServer(t, d, nil) + hs := httptest.NewServer(ts.Mux) + defer hs.Close() + + ctx := context.Background() + cli := Client{Server: hs.URL, DoHTTP: hs.Client().Do} + + pollTicker := setectest.NewFakeTicker() + st, err := NewStore(ctx, StoreConfig{ + Client: cli, + Secrets: []string{"green"}, + PollTicker: pollTicker, + }) + if err != nil { + t.Fatalf("NewStore: unexpected error: %v", err) + } + defer st.Close() + + // With lookups disabled, an unknown watcher reports an error. + if w, err := st.lookupWatcher(ctx, "nonesuch"); err == nil { + t.Errorf("Lookup: got %v, want error", w) + } + + // Observe the initial value of the secret. + w, err := st.lookupWatcher(ctx, "green") + if err != nil { + t.Errorf("Initial value: unexpected error: %v", err) + } else if got, want := string(w.Get()), "eggs and ham"; got != want { + t.Errorf("Initial value: got %q, want %q", got, want) + } + + // The secret gets updated... + if err := cli.Activate(ctx, "green", v2); err != nil { + t.Fatalf("Activate to %v: unexpected error: %v", v2, err) + } + + // The next poll occurs... + pollTicker.Poll() + + // The watcher should get notified in a timely manner. + select { + case <-w.Ready(): + t.Logf("✓ A new version of the secret is available") + case <-time.After(5 * time.Second): + t.Fatal("Timed out waiting for a watcher update") + } + + if got, want := string(w.Get()), "grow the rushes oh"; got != want { + t.Errorf("Updated value: got %q, want %q", got, want) + } + + // With no updates, the watchers should not appear ready. + select { + case <-w.Ready(): + t.Error("Watcher is unexpectedly ready after no update") + case <-time.After(100 * time.Millisecond): + // OK + } +} diff --git a/client/setec/store.go b/client/setec/store.go index f5a90bb..c7e5bee 100644 --- a/client/setec/store.go +++ b/client/setec/store.go @@ -10,6 +10,7 @@ import ( "errors" "expvar" "fmt" + "io" "log" "math/rand" "os" @@ -43,7 +44,7 @@ type Store struct { m map[string]*cachedSecret // :: secret name → active value f map[string]Secret // :: secret name → fetch function - w map[string][]Watcher // :: secret name → watchers + w map[string][]watcher // :: secret name → watchers } ctx context.Context // governs the polling task and lookups @@ -92,8 +93,7 @@ type StoreConfig struct { // AllowLookup instructs the store to allow the caller to look up secrets // not known to the store at the time of construction. If false, only // secrets pre-declared in the Secrets and Structs slices can be fetched, - // and the Lookup and LookupWatcher methods will report an error for all - // un-listed secrets. + // and the Lookup method will report an error for all un-listed secrets. AllowLookup bool // Cache, if non-nil, is a cache that persists secrets locally. @@ -198,7 +198,7 @@ func NewStore(ctx context.Context, cfg StoreConfig) (*Store, error) { // Initialize the active versions maps. s.active.m = make(map[string]*cachedSecret) s.active.f = make(map[string]Secret) - s.active.w = make(map[string][]Watcher) + s.active.w = make(map[string][]watcher) // If we have a cache, try to load data from there first. data, err := s.loadCache() @@ -335,7 +335,7 @@ func (s *Store) secretLocked(name string) Secret { // Since the caller is actively requesting the value of the secret, // update the last-accessed timestamp. This also applies to accesses - // via a Watcher, since the watchers wrap the underlying Secret. + // via a watcher, since the watchers wrap the underlying Secret. s.countSecretFetch.Add(1) cs := s.active.m[name] cs.LastAccess = s.timeNow().Unix() @@ -414,57 +414,6 @@ func (s *Store) lookupSecretInternal(ctx context.Context, name string) (Secret, } } -// Watcher returns a watcher for the named secret. -// -// If s has lookups enabled, Watcher returns a zero Watcher for an unknown name. -// Otherwise, Watcher panics for an unknown name. -func (s *Store) Watcher(name string) Watcher { - s.active.Lock() - defer s.active.Unlock() - secret := s.secretLocked(name) - if secret == nil { - if s.allowLookup { - return Watcher{} - } - panic(fmt.Sprintf("secret %q not found in StoreConfig with lookup disabled", name)) - } - w := Watcher{ready: make(chan struct{}, 1), secret: secret} - s.active.w[name] = append(s.active.w[name], w) - return w -} - -// LookupWatcher returns a watcher for the named secret. If name is already -// known by s, this is equivalent to Watcher; otherwise s attempts to fetch the -// latest active version of the secret from the service and either adds it to -// the collection or reports an error. -// LookupWatcher does not automatically retry in case of errors. -func (s *Store) LookupWatcher(ctx context.Context, name string) (Watcher, error) { - s.active.Lock() - defer s.active.Unlock() - var secret Secret - if _, ok := s.active.m[name]; ok { - secret = s.secretLocked(name) // OK, we already have it - } else if !s.allowLookup { - return Watcher{}, errors.New("lookup is not enabled") - } else { - // We must release the lock to fetch from the server; do this in a - // closure to ensure lock discipline is restored in case of a panic. - got, err := func() (Secret, error) { - s.active.Unlock() // NOTE: This order is intended. - defer s.active.Lock() - return s.lookupSecretInternal(ctx, name) - }() - if err != nil { - return Watcher{}, err - } - secret = got - } - - w := Watcher{ready: make(chan struct{}, 1), secret: secret} - s.active.w[name] = append(s.active.w[name], w) - return w, nil -} - // A Secret is a function that fetches the current active value of a secret. // The caller should not cache the value returned; the function does not block // and will always report a valid (if possibly stale) result. @@ -513,6 +462,17 @@ func StaticFile(path string) (Secret, error) { return func() []byte { return bs }, nil } +func panicOnUpdate[T any]([]byte) (T, error) { panic("unexpected value update") } + +// StaticUpdater returns an [Updater] that vends the specified fixed value. +// The value reported by the updater never changes. +func StaticUpdater[T any](fixedValue T) *Updater[T] { + return &Updater[T]{ + newValue: panicOnUpdate[T], + value: fixedValue, + } +} + // StaticTextFile returns a secret that vends the contents of path, which are // treated as text with leading and trailing whitespace trimmed. // @@ -767,78 +727,87 @@ type stdTicker struct{ *time.Ticker } func (s stdTicker) Chan() <-chan time.Time { return s.Ticker.C } func (stdTicker) Done() {} -// A Watcher monitors the current active value of a secret, and allows the user -// to be notified when the value of the secret changes. -type Watcher struct { - ready chan struct{} - secret Secret -} - -// Get returns the current active value of the secret. -// A zero-valued Watcher returns nil. -func (w Watcher) Get() []byte { return w.secret.Get() } - -// Ready returns a channel that delivers a value when the current active -// version of the secret has changed. The channel is never closed. +// NewUpdater creates a new Updater that maintains a value based on the +// specified secret in s. The newValue function constructs a value of type T +// from the bytes of a secret. // -// The ready channel is a level trigger. The Watcher does not queue multiple -// notifications, and if the caller does not drain the channel subsequent -// notifications will be dropped. -func (w Watcher) Ready() <-chan struct{} { return w.ready } - -func (w Watcher) notify() { - select { - case w.ready <- struct{}{}: - default: - } -} - -// IsValid reports whether w is valid, meaning that it has a secret available. -func (w Watcher) IsValid() bool { return w.secret != nil } - -// NewUpdater creates a new Updater that tracks updates to a value based on new -// secret versions delivered to w. The newValue function returns a new value -// of the type based on its argument, a secret value. +// The initial value is constructed using newValue on the current secret +// version when NewUpdater is called. If this initial call reports an error, +// NewUpdater returns nil and that error. Otherwise, the Updater begins with +// that value. // -// The initial value is constructed by calling newValue with the current secret -// version in w at the time NewUpdater is called. Calls to the Get method -// update the value as needed when w changes. +// Once constructed, call the Get method to fetch the current value. It is safe +// to call Get concurrently from multiple goroutines. See [Updater.Get] for +// details of how updates are handled. // -// The updater synchronizes calls to Get and newValue, so the callback can -// safely interact with shared state without additional locking. -func NewUpdater[T any](w Watcher, newValue func([]byte) T) *Updater[T] { +// If s has lookups enabled, NewWatcher will attempt to look up name if it is +// not already declared in s. If lookups are not enabled, or of the secret is +// not found, NewUpdater reports an error. It does not retry in case of lookup +// errors. +func NewUpdater[T any](ctx context.Context, s *Store, name string, newValue func([]byte) (T, error)) (*Updater[T], error) { + w, err := s.lookupWatcher(ctx, name) + if err != nil { + return nil, err + } + init, err := newValue(w.Get()) + if err != nil { + return nil, err + } return &Updater[T]{ newValue: newValue, w: w, - value: newValue(w.Get()), - } + value: init, + logf: s.logf, // same place as the underlying store + }, nil } -// An Updater tracks a value whose state depends on a secret, together with a -// watcher for updates to the secret. The caller provides a function to update -// the value when a new version of the secret is delivered, and the Updater -// manages access and updates to the value. +// An Updater tracks a value whose state depends on a secret. It watches for +// updates to the secret, and invokes a caller-provided function to update the +// value when a new version of the secret is delivered. type Updater[T any] struct { - newValue func([]byte) T - w Watcher + newValue func([]byte) (T, error) + w watcher mu sync.Mutex - value T + value T // the current value + err error // if non-nil, the error from the last update attempt + logf logger.Logf } // Get fetches the current value of u, first updating it if the secret has -// changed. It is safe to call Get concurrently from multiple goroutines. +// changed. It is safe to call Get concurrently from multiple goroutines. +// +// If Get receives an error while trying to update u, it returns the previous +// value. Use the Err method to check for an update error. If T implements the +// [io.Closer] interface, Get calls Close on the old value before updating. func (u *Updater[T]) Get() T { u.mu.Lock() defer u.mu.Unlock() select { case <-u.w.Ready(): - u.value = u.newValue(u.w.Get()) + nv, err := u.newValue(u.w.Get()) + if err != nil { + u.logf("WARNING: Error updating value: %v (keeping old value)", err) + } else { + if c, ok := any(u.value).(io.Closer); ok { + c.Close() + } + u.value = nv + } + u.err = err + return u.value default: // no change, use the existing value } return u.value } +// Err reports the error, if any, from the latest update to the value of u. +func (u *Updater[T]) Err() error { + u.mu.Lock() + defer u.mu.Unlock() + return u.err +} + type cachedSecret struct { Secret *api.SecretValue `json:"secret"` LastAccess int64 `json:"lastAccess,string"` diff --git a/client/setec/store_test.go b/client/setec/store_test.go index 198d4bc..bc76a14 100644 --- a/client/setec/store_test.go +++ b/client/setec/store_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "errors" - "fmt" "net/http/httptest" "os" "path/filepath" @@ -115,9 +114,6 @@ func TestStore(t *testing.T) { if s, err := st.LookupSecret(ctx, "bravo"); err == nil { t.Errorf("Lookup(bravo): got %q, want error", s.Get()) } - if w, err := st.LookupWatcher(ctx, "bravo"); err == nil { - t.Errorf("Lookup(bravo): got %q, want error", w.Get()) - } }) } @@ -291,71 +287,6 @@ func TestSlowInit(t *testing.T) { checkSecretValue(t, st, "boo", "go for the eyes") } -func TestWatcher(t *testing.T) { - d := setectest.NewDB(t, nil) - d.MustPut(d.Superuser, "green", "eggs and ham") // active - v2 := d.MustPut(d.Superuser, "green", "grow the rushes oh") - - ts := setectest.NewServer(t, d, nil) - hs := httptest.NewServer(ts.Mux) - defer hs.Close() - - ctx := context.Background() - cli := setec.Client{Server: hs.URL, DoHTTP: hs.Client().Do} - - pollTicker := setectest.NewFakeTicker() - st, err := setec.NewStore(ctx, setec.StoreConfig{ - Client: cli, - Secrets: []string{"green"}, - PollTicker: pollTicker, - }) - if err != nil { - t.Fatalf("NewStore: unexpected error: %v", err) - } - defer st.Close() - - // With lookups disabled, an unknown watcher panics. - mtest.MustPanicf(t, func() { st.Watcher("nonesuch") }, - "Expected panic for an unknown watcher") - - // Observe the initial value of the secret. - w := st.Watcher("green") - if got, want := string(w.Get()), "eggs and ham"; got != want { - t.Errorf("Initial value: got %q, want %q", got, want) - } - if !w.IsValid() { - t.Error("Watcher should be valid, but is not") - } - - // The secret gets updated... - if err := cli.Activate(ctx, "green", v2); err != nil { - t.Fatalf("Activate to %v: unexpected error: %v", v2, err) - } - - // The next poll occurs... - pollTicker.Poll() - - // The watcher should get notified in a timely manner. - select { - case <-w.Ready(): - t.Logf("✓ A new version of the secret is available") - case <-time.After(5 * time.Second): - t.Fatal("Timed out waiting for a watcher update") - } - - if got, want := string(w.Get()), "grow the rushes oh"; got != want { - t.Errorf("Updated value: got %q, want %q", got, want) - } - - // With no updates, the watchers should not appear ready. - select { - case <-w.Ready(): - t.Error("Watcher is unexpectedly ready after no update") - case <-time.After(100 * time.Millisecond): - // OK - } -} - func TestUpdater(t *testing.T) { d := setectest.NewDB(t, nil) d.MustPut(d.Superuser, "label", "malarkey") // active @@ -380,30 +311,57 @@ func TestUpdater(t *testing.T) { defer st.Close() // Set up an updater that tracks a string against the secret named "label". - u := setec.NewUpdater(st.Watcher("label"), func(secret []byte) string { - return fmt.Sprintf("value: %q", secret) - }) - checkValue := func(label, want string) { - if got := u.Get(); got != want { - t.Errorf("%s: got %q, want %q", label, got, want) + t.Run("Dynamic", func(t *testing.T) { + u, err := setec.NewUpdater(ctx, st, "label", func(secret []byte) (*closeable[string], error) { + return &closeable[string]{Value: string(secret)}, nil + }) + if err != nil { + t.Fatalf("NewUpdater: unexpected error: %v", err) + } + checkValue := func(label, want string) { + if got := u.Get().Value; got != want { + t.Errorf("%s: got %q, want %q", label, got, want) + } } - } - checkValue("Initial value", `value: "malarkey"`) + checkValue("Initial value", `malarkey`) + last := u.Get() + if last.Closed { + t.Error("Initial value is closed early") + } - // The secret gets updated... - if err := cli.Activate(ctx, "label", v2); err != nil { - t.Fatalf("Activate to %v: unexpected error: %v", v2, err) - } - pollTicker.Poll() + // The secret gets updated... + if err := cli.Activate(ctx, "label", v2); err != nil { + t.Fatalf("Activate to %v: unexpected error: %v", v2, err) + } + pollTicker.Poll() - // The next get should see the updated value. - checkValue("Updated value", `value: "dog-faced pony soldier"`) + // The next get should see the updated value. + checkValue("Updated value", `dog-faced pony soldier`) - pollTicker.Poll() + // The previous value should have been closed. + if !last.Closed { + t.Errorf("Initial value was not closed: %v", last) + } + + last = u.Get() + pollTicker.Poll() + + // The next get should not see a change. + checkValue("Updated value", `dog-faced pony soldier`) + if last.Closed { + t.Errorf("Update value was closed: %v", last) + } + }) + + t.Run("Static", func(t *testing.T) { + const testValue = "I am the chosen one" + u := setec.StaticUpdater(testValue) - // The next get should not see a change. - checkValue("Updated value", `value: "dog-faced pony soldier"`) + if got := u.Get(); got != testValue { + t.Errorf("Get: got %q, want %q", got, testValue) + } + }) } func TestLookup(t *testing.T) { @@ -453,14 +411,7 @@ func TestLookup(t *testing.T) { t.Errorf("Lookup(green): got %q, want %q", got, want) } - // Case 4: We can look up a watcher for "blue". - if w, err := st.LookupWatcher(ctx, "blue"); err != nil { - t.Errorf("Lookup(blue): unexpected error: %v", err) - } else if got, want := string(w.Get()), "dolphins"; got != want { - t.Errorf("Lookup(blue): got %q, want %q", got, want) - } - - // Case 5: We still can't lookup a non-existent secret. + // Case 4: We still can't lookup a non-existent secret. if s, err := st.LookupSecret(ctx, "orange"); err == nil { t.Errorf("Lookup(orange): got %q, want error", s.Get()) } else { @@ -471,9 +422,6 @@ func TestLookup(t *testing.T) { if f := st.Secret("nonesuch"); f != nil { t.Errorf("Lookup(nonesuch): got %v, want nil", f) } - if w := st.Watcher("nonesuch"); w.IsValid() { - t.Errorf("Watcher(nonesuch): got %v, want invalid", w) - } } func TestCacheExpiry(t *testing.T) { @@ -625,7 +573,6 @@ func TestNewFileCache(t *testing.T) { func TestNilSecret(t *testing.T) { var s setec.Secret - var w setec.Watcher if got := s.Get(); got != nil { t.Errorf("(nil).Get: got %v, want nil", got) @@ -633,12 +580,19 @@ func TestNilSecret(t *testing.T) { if got := s.GetString(); got != "" { t.Errorf(`(nil).GetString: got %q, want ""`, got) } - if got := w.Get(); got != nil { - t.Errorf("(zero).Get: got %v, want nil", got) - } } type badCache struct{} func (badCache) Write([]byte) error { return errors.New("write failed") } func (badCache) Read() ([]byte, error) { return nil, errors.New("read failed") } + +type closeable[T any] struct { + Value T + Closed bool +} + +func (c *closeable[T]) Close() error { + c.Closed = true + return nil +} diff --git a/client/setec/watcher.go b/client/setec/watcher.go new file mode 100644 index 0000000..0b2600f --- /dev/null +++ b/client/setec/watcher.go @@ -0,0 +1,63 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package setec + +import ( + "context" + "errors" +) + +// A watcher monitors the current active value of a secret, and allows the user +// to be notified when the value of the secret changes. +type watcher struct { + ready chan struct{} + Secret +} + +// Ready returns a channel that delivers a value when the current active +// version of the secret has changed. The channel is never closed. +// +// The ready channel is a level trigger. The watcher does not queue multiple +// notifications, and if the caller does not drain the channel subsequent +// notifications will be dropped. +func (w watcher) Ready() <-chan struct{} { return w.ready } + +func (w watcher) notify() { + select { + case w.ready <- struct{}{}: + default: + } +} + +// lookupWatcher returns a watcher for the named secret. If name is already +// known by s, this is equivalent to watcher; otherwise s attempts to fetch the +// latest active version of the secret from the service and either adds it to +// the collection or reports an error. +// lookupWatcher does not automatically retry in case of errors. +func (s *Store) lookupWatcher(ctx context.Context, name string) (watcher, error) { + s.active.Lock() + defer s.active.Unlock() + var secret Secret + if _, ok := s.active.m[name]; ok { + secret = s.secretLocked(name) // OK, we already have it + } else if !s.allowLookup { + return watcher{}, errors.New("lookup is not enabled") + } else { + // We must release the lock to fetch from the server; do this in a + // closure to ensure lock discipline is restored in case of a panic. + got, err := func() (Secret, error) { + s.active.Unlock() // NOTE: This order is intended. + defer s.active.Lock() + return s.lookupSecretInternal(ctx, name) + }() + if err != nil { + return watcher{}, err + } + secret = got + } + + w := watcher{ready: make(chan struct{}, 1), Secret: secret} + s.active.w[name] = append(s.active.w[name], w) + return w, nil +}