Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: blockstore: GetMany blockstore method #492

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ The following emojis are used to highlight certain changes:
## [Unreleased]

### Added

* `boxo/blockstore`:
* [GetMany blockstore implementation](https://github.com/ipfs/boxo/pull/492)
* `boxo/gateway`:
* A new `WithResolver(...)` option can be used with `NewBlocksBackend(...)` allowing the user to pass their custom `Resolver` implementation.
* `boxo/bitswap/client`:
Expand Down
228 changes: 228 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package blockstore
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -64,6 +65,12 @@ type Blockstore interface {
HashOnRead(enabled bool)
}

// GetManyBlockstore is a blockstore interface that supports a GetMany method
type GetManyBlockstore interface {
Blockstore
GetMany(context.Context, []cid.Cid) ([]blocks.Block, []cid.Cid, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two recommendations:

  1. It seems like there's no need for []blocks.Block and []cid.Cid since Block contains a .Cid() method https://github.com/ipfs/go-block-format/blob/v0.2.0/blocks.go#L19-L25
  2. You may want to consider a streaming interface so that you don't have to buffer all the blocks in memory

If returning an asynchronous object (e.g. channel or iterator) might be worth taking a look at ipfs/kubo#4592 to make sure you don't run into some common pitfalls. With Go generics now iterators may also make this easier than it used to be.

}

// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
Expand Down Expand Up @@ -310,6 +317,227 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return output, nil
}

// GetManyOption is a getManyBlockStore option implementation
type GetManyOption struct {
f func(bs *getManyBlockStore)
}

// NewGetManyBlockstore returns a default GetManyBlockstore implementation
// using the provided datastore.TxnDatastore backend.
func NewGetManyBlockstore(d ds.TxnDatastore, opts ...GetManyOption) GetManyBlockstore {
Jorropo marked this conversation as resolved.
Show resolved Hide resolved
bs := &getManyBlockStore{
datastore: d,
}

for _, o := range opts {
o.f(bs)
}

if !bs.noPrefix {
bs.datastore = dsns.WrapTxnDatastore(bs.datastore, BlockPrefix)
}
return bs
}

type getManyBlockStore struct {
datastore ds.TxnDatastore

rehash atomic.Bool
writeThrough bool
noPrefix bool
}

func (bs *getManyBlockStore) HashOnRead(enabled bool) {
bs.rehash.Store(enabled)
}

func (bs *getManyBlockStore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised this function needs to be duplicated, I think you could use embeding here.

Copy link
Author

@i-norden i-norden Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possible but would require passing in a ds.Batching to the constructor function in addition to theds.TxnDatastore (or some composite interface of the two) as ds.TxnDatastore can't be used to construct a regular Blockstore for embedding because it doesn't satisfy the ds.Batching interface, even though the Put and Get methods we would like to fall through to don't use the portions of that interface that it doesn't support...

Copy link
Author

@i-norden i-norden Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to break the normal Blockstore up into the components that need ds.Batching and the ones that just need ds.Datastore.ds.TxDatastore satisfies ds.Datastore and we only need ds.Datastore to fulfill the Get and Put methods on either blockstore.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know what you think the best/cleanest approach is here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make it take the union of batching and txn datastores so it has access to all that is needed.
You could also implement a stub, a batch is inferior to a transaction while providing the same features:

Batching datastores support deferred, grouped updates to the database. Batches do NOT have transactional semantics: updates to the underlying datastore are not guaranteed to occur in the same iota of time. Similarly, batched updates will not be flushed to the underlying datastore until Commit has been called. Txns from a TxnDatastore have all the capabilities of a Batch, but the reverse is NOT true.

https://pkg.go.dev/github.com/ipfs/go-datastore#Batching

So if you accept a TxnDatastore you can use a simple stub that implements batches using transactions, this would make thee code work in the only place it's used right now.

if !k.Defined() {
logger.Error("undefined cid in blockstore")
return nil, ipld.ErrNotFound{Cid: k}
}
bdata, err := bs.datastore.Get(ctx, dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return nil, ipld.ErrNotFound{Cid: k}
}
if err != nil {
return nil, err
}
if bs.rehash.Load() {
rbcid, err := k.Prefix().Sum(bdata)
if err != nil {
return nil, err
}

if !rbcid.Equals(k) {
return nil, ErrHashMismatch
}

return blocks.NewBlockWithCid(bdata, rbcid)
}
return blocks.NewBlockWithCid(bdata, k)
}

func (bs *getManyBlockStore) GetMany(ctx context.Context, cs []cid.Cid) ([]blocks.Block, []cid.Cid, error) {
if len(cs) == 1 {
// performance fast-path
block, err := bs.Get(ctx, cs[0])
return []blocks.Block{block}, nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't make sense to not return the CID here given it's in the signature, but also it doesn't seem like []cid.Cid needs to be in the return signature

}

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return nil, nil, err
}
blks := make([]blocks.Block, 0, len(cs))
missingCIDs := make([]cid.Cid, 0, len(cs))
for _, c := range cs {
if !c.Defined() {
logger.Error("undefined cid in blockstore")
return nil, nil, ipld.ErrNotFound{Cid: c}
}
bdata, err := t.Get(ctx, dshelp.MultihashToDsKey(c.Hash()))
if err != nil {
if err == ds.ErrNotFound {
missingCIDs = append(missingCIDs, c)
} else {
return nil, nil, err
}
} else {
if bs.rehash.Load() {
rbcid, err := c.Prefix().Sum(bdata)
if err != nil {
return nil, nil, err
}

if !rbcid.Equals(c) {
return nil, nil, fmt.Errorf("block in storage has different hash (%x) than requested (%x)", rbcid.Hash(), c.Hash())
}

blk, err := blocks.NewBlockWithCid(bdata, rbcid)
if err != nil {
return nil, nil, err
}

blks = append(blks, blk)
} else {
blk, err := blocks.NewBlockWithCid(bdata, c)
if err != nil {
return nil, nil, err
}

blks = append(blks, blk)
}
}
}
return blks, missingCIDs, t.Commit(ctx)
}

func (bs *getManyBlockStore) Put(ctx context.Context, block blocks.Block) error {
k := dshelp.MultihashToDsKey(block.Cid().Hash())

// Has is cheaper than Put, so see if we already have it
if !bs.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
return nil // already stored.
}
}
return bs.datastore.Put(ctx, k, block.RawData())
}

func (bs *getManyBlockStore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if len(blocks) == 1 {
// performance fast-path
return bs.Put(ctx, blocks[0])
}

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return err
}
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())

if !bs.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
continue
}
}

err = t.Put(ctx, k, b.RawData())
if err != nil {
return err
}
}
return t.Commit(ctx)
}

func (bs *getManyBlockStore) Has(ctx context.Context, k cid.Cid) (bool, error) {
return bs.datastore.Has(ctx, dshelp.MultihashToDsKey(k.Hash()))
}

func (bs *getManyBlockStore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
size, err := bs.datastore.GetSize(ctx, dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return -1, ipld.ErrNotFound{Cid: k}
}
return size, err
}

func (bs *getManyBlockStore) DeleteBlock(ctx context.Context, k cid.Cid) error {
return bs.datastore.Delete(ctx, dshelp.MultihashToDsKey(k.Hash()))
}

// AllKeysChan runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
//
// AllKeysChan respects context.
func (bs *getManyBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {

// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
res, err := bs.datastore.Query(ctx, q)
if err != nil {
return nil, err
}

output := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer func() {
res.Close() // ensure exit (signals early exit, too)
close(output)
}()

for {
e, ok := res.NextSync()
if !ok {
return
}
if e.Error != nil {
logger.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
return
}

// need to convert to key.Key using key.KeyFromDsKey.
bk, err := dshelp.BinaryFromDsKey(ds.RawKey(e.Key))
if err != nil {
logger.Warnf("error parsing key from binary: %s", err)
continue
}
k := cid.NewCidV1(cid.Raw, bk)
select {
case <-ctx.Done():
return
case output <- k:
}
}
}()

return output, nil
}

// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
Expand Down
104 changes: 104 additions & 0 deletions blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"testing"

dstest "github.com/ipfs/go-datastore/test"

u "github.com/ipfs/boxo/util"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -72,6 +74,108 @@ func TestCidv0v1(t *testing.T) {
}
}

func TestGetManyWhenKeyNotPresent(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
c1 := cid.NewCidV0(u.Hash([]byte("stuff")))
c2 := cid.NewCidV0(u.Hash([]byte("stuff2")))

blks, missingCIDs, err := bs.GetMany(bg, []cid.Cid{c1, c2})

if len(blks) != 0 {
t.Error("no blocks expected")
}
if len(missingCIDs) != 2 {
t.Error("2 missing cids expected")
}
if err != nil {
t.Error("no error expected")
}
}

func TestGetManyWhenKeyIsNil(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
_, _, err := bs.GetMany(bg, []cid.Cid{{}, {}})
if !ipld.IsNotFound(err) {
t.Fail()
}
}

func TestPutsThenGetManyBlock(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg, []cid.Cid{block1.Cid(), block2.Cid(), block3.Cid(), block4.Cid()})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), block3.Cid().Bytes()) {
t.Fail()
}
}

func TestCidv0v1Many(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg,
[]cid.Cid{cid.NewCidV1(cid.DagProtobuf, block1.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block2.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block4.Cid().Hash())})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()).Bytes()) {
t.Fail()
}
}

func TestPutThenGetSizeBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
Expand Down
Loading