Skip to content

Commit

Permalink
Fix issue with gap fill records clobbering inserts under load on pgsql
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Oct 30, 2023
1 parent 6d65d05 commit e0ae664
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 3 deletions.
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func main() {
}

func run(c *cli.Context) error {
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: time.RFC3339Nano,
})
if c.Bool("debug") {
logrus.SetLevel(logrus.TraceLevel)
}
Expand Down
25 changes: 22 additions & 3 deletions pkg/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ type Generic struct {
InsertLastInsertIDSQL string
GetSizeSQL string
Retry ErrRetry
InsertRetry ErrRetry
TranslateErr TranslateErr
ErrCode ErrCode
FillRetryDuration time.Duration
}

func q(sql, param string, numbered bool) string {
Expand Down Expand Up @@ -422,9 +424,22 @@ func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, c
return row.LastInsertId()
}

row := d.queryRow(ctx, d.InsertSQL, key, cVal, dVal, createRevision, previousRevision, ttl, value, prevValue)
err = row.Scan(&id)
return id, err
// Drivers without LastInsertID support may conflict on the serial id key when inserting rows,
// as the ID is reserved at the begining of the implicit transaction, but does not become
// visible until the transaction completes, at which point we may have already created a gap fill record.
// Retry the insert if the driver indicates a retriable insert error, to avoid presenting a spurious
// duplicate key error to the client.
wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond))
for i := uint(0); i < 20; i++ {
row := d.queryRow(ctx, d.InsertSQL, key, cVal, dVal, createRevision, previousRevision, ttl, value, prevValue)
err = row.Scan(&id)
if err != nil && d.InsertRetry != nil && d.InsertRetry(err) {
wait(i)
continue
}
return id, err
}
return
}

func (d *Generic) GetSize(ctx context.Context) (int64, error) {
Expand All @@ -438,3 +453,7 @@ func (d *Generic) GetSize(ctx context.Context) (int64, error) {
}
return size, nil
}

func (d *Generic) FillRetryDelay(ctx context.Context) {
time.Sleep(d.FillRetryDuration)
}
8 changes: 8 additions & 0 deletions pkg/drivers/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
Expand Down Expand Up @@ -80,6 +81,13 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
kd.id <= $2
) AS ks
WHERE kv.id = ks.id`
dialect.FillRetryDuration = time.Millisecond + 5
dialect.InsertRetry = func(err error) bool {
if err, ok := err.(*pgconn.PgError); ok && err.Code == pgerrcode.UniqueViolation && err.ConstraintName == "kine_pkey" {
return true
}
return false
}
dialect.TranslateErr = func(err error) error {
if err, ok := err.(*pgconn.PgError); ok && err.Code == pgerrcode.UniqueViolation {
return server.ErrKeyExists
Expand Down
6 changes: 6 additions & 0 deletions pkg/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
case s.notify <- next:
default:
}
// Some drivers increment the revision sequence at the start of the insert
// transaction, but the row does not become visible to us until the transaction
// completes. This looks like a skip, but creating a fill record too quickly
// will cause the insert to fail and the transaction to roll back. Allow the
// driver to inject an extra delay into the retry before filling.
s.d.FillRetryDelay(s.ctx)
break
} else {
if err := s.d.Fill(s.ctx, next); err == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Dialect interface {
IsFill(key string) bool
BeginTx(ctx context.Context, opts *sql.TxOptions) (Transaction, error)
GetSize(ctx context.Context) (int64, error)
FillRetryDelay(ctx context.Context)
}

type Transaction interface {
Expand Down

0 comments on commit e0ae664

Please sign in to comment.