From e0ae6642766a955cd4603bf749e010c7c689e0c9 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Wed, 25 Oct 2023 21:18:19 +0000 Subject: [PATCH] Fix issue with gap fill records clobbering inserts under load on pgsql Signed-off-by: Brad Davidson --- main.go | 4 ++++ pkg/drivers/generic/generic.go | 25 ++++++++++++++++++++++--- pkg/drivers/pgsql/pgsql.go | 8 ++++++++ pkg/logstructured/sqllog/sql.go | 6 ++++++ pkg/server/types.go | 1 + 5 files changed, 41 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 12f52f56..5b4f944f 100644 --- a/main.go +++ b/main.go @@ -108,6 +108,10 @@ func main() { } func run(c *cli.Context) error { + logrus.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + TimestampFormat: time.RFC3339Nano, + }) if c.Bool("debug") { logrus.SetLevel(logrus.TraceLevel) } diff --git a/pkg/drivers/generic/generic.go b/pkg/drivers/generic/generic.go index 54e71344..43b53712 100644 --- a/pkg/drivers/generic/generic.go +++ b/pkg/drivers/generic/generic.go @@ -102,8 +102,10 @@ type Generic struct { InsertLastInsertIDSQL string GetSizeSQL string Retry ErrRetry + InsertRetry ErrRetry TranslateErr TranslateErr ErrCode ErrCode + FillRetryDuration time.Duration } func q(sql, param string, numbered bool) string { @@ -422,9 +424,22 @@ func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, c return row.LastInsertId() } - row := d.queryRow(ctx, d.InsertSQL, key, cVal, dVal, createRevision, previousRevision, ttl, value, prevValue) - err = row.Scan(&id) - return id, err + // Drivers without LastInsertID support may conflict on the serial id key when inserting rows, + // as the ID is reserved at the begining of the implicit transaction, but does not become + // visible until the transaction completes, at which point we may have already created a gap fill record. + // Retry the insert if the driver indicates a retriable insert error, to avoid presenting a spurious + // duplicate key error to the client. + wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond)) + for i := uint(0); i < 20; i++ { + row := d.queryRow(ctx, d.InsertSQL, key, cVal, dVal, createRevision, previousRevision, ttl, value, prevValue) + err = row.Scan(&id) + if err != nil && d.InsertRetry != nil && d.InsertRetry(err) { + wait(i) + continue + } + return id, err + } + return } func (d *Generic) GetSize(ctx context.Context) (int64, error) { @@ -438,3 +453,7 @@ func (d *Generic) GetSize(ctx context.Context) (int64, error) { } return size, nil } + +func (d *Generic) FillRetryDelay(ctx context.Context) { + time.Sleep(d.FillRetryDuration) +} diff --git a/pkg/drivers/pgsql/pgsql.go b/pkg/drivers/pgsql/pgsql.go index 4781f087..d3524272 100644 --- a/pkg/drivers/pgsql/pgsql.go +++ b/pkg/drivers/pgsql/pgsql.go @@ -7,6 +7,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" @@ -80,6 +81,13 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo kd.id <= $2 ) AS ks WHERE kv.id = ks.id` + dialect.FillRetryDuration = time.Millisecond + 5 + dialect.InsertRetry = func(err error) bool { + if err, ok := err.(*pgconn.PgError); ok && err.Code == pgerrcode.UniqueViolation && err.ConstraintName == "kine_pkey" { + return true + } + return false + } dialect.TranslateErr = func(err error) error { if err, ok := err.(*pgconn.PgError); ok && err.Code == pgerrcode.UniqueViolation { return server.ErrKeyExists diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index 928526f0..05fcf10f 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -443,6 +443,12 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { case s.notify <- next: default: } + // Some drivers increment the revision sequence at the start of the insert + // transaction, but the row does not become visible to us until the transaction + // completes. This looks like a skip, but creating a fill record too quickly + // will cause the insert to fail and the transaction to roll back. Allow the + // driver to inject an extra delay into the retry before filling. + s.d.FillRetryDelay(s.ctx) break } else { if err := s.d.Fill(s.ctx, next); err == nil { diff --git a/pkg/server/types.go b/pkg/server/types.go index aad34313..c3f28dbb 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -41,6 +41,7 @@ type Dialect interface { IsFill(key string) bool BeginTx(ctx context.Context, opts *sql.TxOptions) (Transaction, error) GetSize(ctx context.Context) (int64, error) + FillRetryDelay(ctx context.Context) } type Transaction interface {