Skip to content

Commit

Permalink
db: protect against concurrent database creations (#581)
Browse files Browse the repository at this point in the history
This could previously cause panics of the type "duplicate metrics registration
attempted".
  • Loading branch information
asubiotto authored Nov 2, 2023
1 parent ea05d56 commit 6d50083
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 18 deletions.
58 changes: 40 additions & 18 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ const (
)

type ColumnStore struct {
mtx *sync.RWMutex
mtx sync.RWMutex
dbs map[string]*DB
dbReplaysInProgress map[string]chan struct{}
reg prometheus.Registerer
logger log.Logger
tracer trace.Tracer
Expand Down Expand Up @@ -89,16 +90,16 @@ func New(
options ...Option,
) (*ColumnStore, error) {
s := &ColumnStore{
mtx: &sync.RWMutex{},
dbs: map[string]*DB{},
reg: prometheus.NewRegistry(),
logger: log.NewNopLogger(),
tracer: trace.NewNoopTracerProvider().Tracer(""),
indexConfig: DefaultIndexConfig(),
indexDegree: 2,
splitSize: 2,
granuleSizeBytes: 1 * MiB,
activeMemorySize: 512 * MiB,
dbs: make(map[string]*DB),
dbReplaysInProgress: make(map[string]chan struct{}),
reg: prometheus.NewRegistry(),
logger: log.NewNopLogger(),
tracer: trace.NewNoopTracerProvider().Tracer(""),
indexConfig: DefaultIndexConfig(),
indexDegree: 2,
splitSize: 2,
granuleSizeBytes: 1 * MiB,
activeMemorySize: 512 * MiB,
}

for _, option := range options {
Expand Down Expand Up @@ -443,12 +444,26 @@ func (s *ColumnStore) DB(ctx context.Context, name string, opts ...DBOption) (*D

// Need to double-check that in the meantime a database with the same name
// wasn't concurrently created.
db, ok = s.dbs[name]
if ok {
if err := applyOptsToDB(db); err != nil {
return nil, err
for {
db, ok = s.dbs[name]
if ok {
if err := applyOptsToDB(db); err != nil {
return nil, err
}
return db, nil
}
return db, nil

// DB has not yet been created. However, another goroutine might be
// replaying the WAL in the background (the store mutex is released
// during replay.).
waitForReplay, ok := s.dbReplaysInProgress[name]
if !ok {
// No replay in progress, it is safe to create the DB.
break
}
s.mtx.Unlock()
<-waitForReplay
s.mtx.Lock()
}

reg := prometheus.WrapRegistererWith(prometheus.Labels{"db": name}, s.reg)
Expand Down Expand Up @@ -502,9 +517,16 @@ func (s *ColumnStore) DB(ctx context.Context, name string, opts ...DBOption) (*D
if err := func() error {
// Unlock the store mutex while the WAL is replayed, otherwise
// if multiple DBs are opened in parallel, WAL replays will not
// happen in parallel.
// happen in parallel. However, create a channel for any
// goroutines that might concurrently try to open the same DB
// to listen on.
s.dbReplaysInProgress[name] = make(chan struct{})
s.mtx.Unlock()
defer s.mtx.Lock()
defer func() {
s.mtx.Lock()
close(s.dbReplaysInProgress[name])
delete(s.dbReplaysInProgress, name)
}()
var err error
db.wal, err = db.openWAL(ctx)
return err
Expand Down
36 changes: 36 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

"github.com/polarsignals/frostdb/dynparquet"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/polarsignals/frostdb/query"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/query/physicalplan"
"github.com/polarsignals/frostdb/recovery"
)

func TestDBWithWALAndBucket(t *testing.T) {
Expand Down Expand Up @@ -2220,3 +2222,37 @@ func Test_DB_PrehashedStorage(t *testing.T) {
})
require.NoError(t, err)
}

// TestDBConcurrentOpen verifies that concurrent calls to open a DB do not
// result in a panic (most likely due to duplicate metrics registration).
func TestDBConcurrentOpen(t *testing.T) {
const (
concurrency = 16
dbName = "test"
)

bucket := objstore.NewInMemBucket()
sinksource := NewDefaultObjstoreBucket(bucket)
logger := newTestLogger(t)
tempDir := t.TempDir()

c, err := New(
WithLogger(logger),
WithReadWriteStorage(sinksource),
WithWAL(),
WithStoragePath(tempDir),
)
require.NoError(t, err)
defer c.Close()

var errg errgroup.Group
for i := 0; i < concurrency; i++ {
errg.Go(func() error {
return recovery.Do(func() error {
_, err := c.DB(context.Background(), dbName)
return err
})()
})
}
require.NoError(t, errg.Wait())
}

0 comments on commit 6d50083

Please sign in to comment.