Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change feature detection for CRDB watch to not require waiting #2205

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,24 +577,26 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
return nil, err
}

// streams don't return at all if they succeed, so the only way to know
// it was created successfully is to wait a bit and then cancel
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
time.AfterFunc(1*time.Second, cancel)

_ = cds.writePool.ExecFunc(streamCtx, func(ctx context.Context, tag pgconn.CommandTag, err error) error {
if err != nil && errors.Is(err, context.Canceled) {
features.Watch.Status = datastore.FeatureSupported
features.Watch.Reason = ""
} else if err != nil {
features.Watch.Status = datastore.FeatureUnsupported
features.Watch.Reason = fmt.Sprintf("Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: %s", err.Error())
// Start a changefeed with an invalid value. If we get back an invalid value error (SQLSTATE 22023)
// then we know that the datastore supports watch. If we get back any other error, then we know that
// the datastore does not support watch emits or there is a permissions issue.
_ = cds.writePool.ExecFunc(ctx, func(ctx context.Context, tag pgconn.CommandTag, err error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changestream does not run through any connection pools based on a recommendation from CRL; why is it being done that way here?

not sure if this runs in a loop or a certain cadence, but that would cause connections to be terminated (because of the error)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because its going to fail either way; the reason to not use a pool is because it can be a long-running operation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that it is using a pool and I don't think it should. It could cause connection churn on a pool that is serving write traffic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its only called on startup, so it should be fine

if err == nil {
return spiceerrors.MustBugf("expected an error, but got none")
}
return nil
}, fmt.Sprintf(cds.beginChangefeedQuery, cds.schema.RelationshipTableName, head, "1s"))

<-streamCtx.Done()
var pgerr *pgconn.PgError
if errors.As(err, &pgerr) {
if pgerr.Code == "22023" {
features.Watch.Status = datastore.FeatureSupported
return nil
}
}

features.Watch.Status = datastore.FeatureUnsupported
features.Watch.Reason = fmt.Sprintf("Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: %s", err.Error())
return nil
}, fmt.Sprintf(cds.beginChangefeedQuery, cds.schema.RelationshipTableName, head, "-1s"))
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved

return &features, nil
}
Expand Down
Loading