Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of NULL caveats in Postgres watch #1668

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package postgres

import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/samber/lo"
"github.com/scylladb/go-set/strset"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -196,6 +198,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
))

t.Run("TestNullCaveatWatch", createDatastoreTest(
b,
NullCaveatWatchTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))
}

t.Run("OTelTracing", createDatastoreTest(
Expand Down Expand Up @@ -1417,3 +1428,126 @@ func RepairTransactionsTest(t *testing.T, ds datastore.Datastore) {
require.NoError(t, err)
require.Greater(t, currentMaximumID, 12345)
}

func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) {
require := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

// Run the watch API.
changes, errchan := ds.Watch(ctx, lowestRevision)
require.Zero(len(errchan))

// Manually insert a relationship with a NULL caveat. This is allowed, but can only happen due to
// bulk import (normal write rels will make it empty instead)
pds := ds.(*pgDatastore)
_, err = pds.ReadWriteTx(ctx, func(ctx context.Context, drwt datastore.ReadWriteTransaction) error {
rwt := drwt.(*pgReadWriteTXN)

createInserts := writeTuple
valuesToWrite := []interface{}{
"resource",
"someresourceid",
"somerelation",
"subject",
"somesubject",
"...",
nil, // set explicitly to null
nil, // set explicitly to null
}

query := createInserts.Values(valuesToWrite...)
sql, args, err := query.ToSql()
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}

_, err = rwt.tx.Exec(ctx, sql, args...)
return err
})
require.NoError(err)

// Verify the relationship create was tracked by the watch.
verifyUpdates(require, [][]*core.RelationTupleUpdate{
{
tuple.Touch(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")),
},
},
changes,
errchan,
false,
)

// Delete the relationship and ensure it does not raise an error in watch.
deleteUpdate := tuple.Delete(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject"))
_, err = common.UpdateTuplesInDatastore(ctx, ds, deleteUpdate)
require.NoError(err)

// Verify the delete.
verifyUpdates(require, [][]*core.RelationTupleUpdate{
{
tuple.Delete(tuple.Parse("resource:someresourceid#somerelation@subject:somesubject")),
},
},
changes,
errchan,
false,
)
}

const waitForChangesTimeout = 5 * time.Second

// TODO(jschorr): Combine with the same impl in the datastore shared tests
func verifyUpdates(
require *require.Assertions,
testUpdates [][]*core.RelationTupleUpdate,
changes <-chan *datastore.RevisionChanges,
errchan <-chan error,
expectDisconnect bool,
) {
for _, expected := range testUpdates {
changeWait := time.NewTimer(waitForChangesTimeout)
select {
case change, ok := <-changes:
if !ok {
require.True(expectDisconnect, "unexpected disconnect")
errWait := time.NewTimer(waitForChangesTimeout)
select {
case err := <-errchan:
require.True(errors.As(err, &datastore.ErrWatchDisconnected{}))
return
case <-errWait.C:
require.Fail("Timed out waiting for ErrWatchDisconnected")
}
return
}

expectedChangeSet := setOfChanges(expected)
actualChangeSet := setOfChanges(change.Changes)

missingExpected := strset.Difference(expectedChangeSet, actualChangeSet)
unexpected := strset.Difference(actualChangeSet, expectedChangeSet)

require.True(missingExpected.IsEmpty(), "expected changes missing: %s", missingExpected)
require.True(unexpected.IsEmpty(), "unexpected changes: %s", unexpected)

time.Sleep(1 * time.Millisecond)
case <-changeWait.C:
require.Fail("Timed out", "waiting for changes: %s", expected)
}
}

require.False(expectDisconnect, "all changes verified without expected disconnect")
}

func setOfChanges(changes []*core.RelationTupleUpdate) *strset.Set {
changeSet := strset.NewWithSize(len(changes))
for _, change := range changes {
changeSet.Add(fmt.Sprintf("OPERATION_%s(%s)", change.Operation, tuple.StringWithoutCaveat(change.Tuple)))
}
return changeSet
}
6 changes: 3 additions & 3 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit
}

var createdXID, deletedXID xid8
var caveatName string
var caveatName *string
var caveatContext map[string]any
if err := changes.Scan(
&nextTuple.ResourceAndRelation.Namespace,
Expand All @@ -223,13 +223,13 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit
return nil, fmt.Errorf("unable to parse changed tuple: %w", err)
}

if caveatName != "" {
if caveatName != nil && *caveatName != "" {
contextStruct, err := structpb.NewStruct(caveatContext)
if err != nil {
return nil, fmt.Errorf("failed to read caveat context from update: %w", err)
}
nextTuple.Caveat = &core.ContextualizedCaveat{
CaveatName: caveatName,
CaveatName: *caveatName,
Context: contextStruct,
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories)
t.Run("TestWatchCancel", func(t *testing.T) { WatchCancelTest(t, tester) })
t.Run("TestCaveatedRelationshipWatch", func(t *testing.T) { CaveatedRelationshipWatchTest(t, tester) })
t.Run("TestWatchWithTouch", func(t *testing.T) { WatchWithTouchTest(t, tester) })
t.Run("TestWatchWithDelete", func(t *testing.T) { WatchWithDeleteTest(t, tester) })
}
}

Expand Down
64 changes: 64 additions & 0 deletions pkg/datastore/test/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,70 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) {
)
}

func WatchWithDeleteTest(t *testing.T, tester DatastoreTester) {
require := require.New(t)

ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16)
require.NoError(err)

setupDatastore(ds, require)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

// TOUCH a relationship and ensure watch sees it.
changes, errchan := ds.Watch(ctx, lowestRevision)
require.Zero(len(errchan))

afterTouchRevision, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH,
tuple.Parse("document:firstdoc#viewer@user:tom"),
tuple.Parse("document:firstdoc#viewer@user:sarah"),
tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"),
)
require.NoError(err)

ensureTuples(ctx, require, ds,
tuple.Parse("document:firstdoc#viewer@user:tom"),
tuple.Parse("document:firstdoc#viewer@user:sarah"),
tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"),
)

verifyUpdates(require, [][]*core.RelationTupleUpdate{
{
tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:tom")),
tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:sarah")),
tuple.Touch(tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]")),
},
},
changes,
errchan,
false,
)

// DELETE the relationship
changes, errchan = ds.Watch(ctx, afterTouchRevision)
require.Zero(len(errchan))

_, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tuple.Parse("document:firstdoc#viewer@user:tom"))
require.NoError(err)

ensureTuples(ctx, require, ds,
tuple.Parse("document:firstdoc#viewer@user:sarah"),
tuple.Parse("document:firstdoc#viewer@user:fred[thirdcaveat]"),
)

verifyUpdates(require, [][]*core.RelationTupleUpdate{
{tuple.Delete(tuple.Parse("document:firstdoc#viewer@user:tom"))},
},
changes,
errchan,
false,
)
}

func verifyNoUpdates(
require *require.Assertions,
changes <-chan *datastore.RevisionChanges,
Expand Down
Loading