Skip to content

Commit

Permalink
fix: races
Browse files Browse the repository at this point in the history
  • Loading branch information
alanshaw committed Oct 29, 2024
1 parent a2eccab commit 2417fe4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
22 changes: 12 additions & 10 deletions internal/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
httpfind "github.com/alanshaw/storetheindex/server/find"
httpingest "github.com/alanshaw/storetheindex/server/ingest"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipni/go-indexer-core/engine"
"github.com/ipni/go-indexer-core/store/memory"
"github.com/ipni/go-libipni/maurl"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/storacha/storage/pkg/service/storage"
"github.com/storacha/storage/pkg/store/blobstore"
"github.com/storacha/testthenetwork/internal/redis"
rsync "github.com/storacha/testthenetwork/internal/redis/sync"
"github.com/storacha/testthenetwork/internal/testutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -41,7 +43,7 @@ func StartIPNIService(
reg, err := registry.New(
context.Background(),
config.NewDiscovery(),
datastore.NewMapDatastore(),
dssync.MutexWrap(datastore.NewMapDatastore()),
)
require.NoError(t, err)

Expand All @@ -55,8 +57,8 @@ func StartIPNIService(
p2pHost,
indexerCore,
reg,
datastore.NewMapDatastore(),
datastore.NewMapDatastore(),
dssync.MutexWrap(datastore.NewMapDatastore()),
dssync.MutexWrap(datastore.NewMapDatastore()),
)
require.NoError(t, err)

Expand Down Expand Up @@ -104,10 +106,10 @@ func StartIndexingService(
indexer, err := construct.Construct(
cfg,
construct.WithStartIPNIServer(true),
construct.WithDatastore(datastore.NewMapDatastore()),
construct.WithProvidersClient(redis.NewMapRedis()),
construct.WithClaimsClient(redis.NewMapRedis()),
construct.WithIndexesClient(redis.NewMapRedis()),
construct.WithDatastore(dssync.MutexWrap(datastore.NewMapDatastore())),
construct.WithProvidersClient(rsync.MutexWrap(redis.NewMapRedis())),
construct.WithClaimsClient(rsync.MutexWrap(redis.NewMapRedis())),
construct.WithIndexesClient(rsync.MutexWrap(redis.NewMapRedis())),
)
require.NoError(t, err)

Expand Down Expand Up @@ -135,9 +137,9 @@ func StartStorageNode(
svc, err := storage.New(
storage.WithIdentity(id),
storage.WithBlobstore(blobstore.NewMapBlobstore()),
storage.WithAllocationDatastore(datastore.NewMapDatastore()),
storage.WithClaimDatastore(datastore.NewMapDatastore()),
storage.WithPublisherDatastore(datastore.NewMapDatastore()),
storage.WithAllocationDatastore(dssync.MutexWrap(datastore.NewMapDatastore())),
storage.WithClaimDatastore(dssync.MutexWrap(datastore.NewMapDatastore())),
storage.WithPublisherDatastore(dssync.MutexWrap(datastore.NewMapDatastore())),
storage.WithPublicURL(publicURL),
storage.WithPublisherDirectAnnounce(announceURL),
storage.WithPublisherIndexingServiceConfig(indexingServiceDID, *indexingServiceURL.JoinPath("claims")),
Expand Down
45 changes: 45 additions & 0 deletions internal/redis/sync/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package sync

import (
"context"
"sync"
"time"

goredis "github.com/redis/go-redis/v9"
"github.com/storacha/indexing-service/pkg/redis"
)

type MutexRedis struct {
sync.RWMutex
child redis.Client
}

func (m *MutexRedis) Expire(ctx context.Context, key string, expiration time.Duration) *goredis.BoolCmd {
m.Lock()
defer m.Unlock()
return m.child.Expire(ctx, key, expiration)
}

func (m *MutexRedis) Get(ctx context.Context, key string) *goredis.StringCmd {
m.RLock()
defer m.RUnlock()
return m.child.Get(ctx, key)
}

func (m *MutexRedis) Persist(ctx context.Context, key string) *goredis.BoolCmd {
m.Lock()
defer m.Unlock()
return m.child.Persist(ctx, key)
}

func (m *MutexRedis) Set(ctx context.Context, key string, value any, expiration time.Duration) *goredis.StatusCmd {
m.Lock()
defer m.Unlock()
return m.child.Set(ctx, key, value, expiration)
}

var _ redis.Client = (*MutexRedis)(nil)

func MutexWrap(c redis.Client) *MutexRedis {
return &MutexRedis{child: c}
}

0 comments on commit 2417fe4

Please sign in to comment.