Skip to content
This repository has been archived by the owner on Nov 22, 2023. It is now read-only.

Commit

Permalink
feat: fetch and list transaction entries without retrieving linked da…
Browse files Browse the repository at this point in the history
…ta (#121)

* retrieve and return info about all entries in a tx

* fix local tx and server post tx
  • Loading branch information
tchardin authored Jun 21, 2021
1 parent 902b9f6 commit 580a109
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 20 deletions.
111 changes: 99 additions & 12 deletions exchange/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/myelnet/pop/retrieval"
"github.com/myelnet/pop/retrieval/client"
"github.com/myelnet/pop/retrieval/deal"
"github.com/rs/zerolog/log"
)

// DefaultHashFunction used for generating CIDs of imported data
Expand All @@ -44,11 +45,11 @@ var ErrNoStrategy = errors.New("no strategy")
// Entry represents a link to an item in the DAG map
type Entry struct {
// Key is string name of the entry
Key string
Key string `json:"key"`
// Value is the CID of the represented content
Value cid.Cid
Value cid.Cid `json:"value"`
// Size is the original file size. Not encoded in the DAG
Size int64
Size int64 `json:"size"`
}

// TxResult returns metadata about the transaction including a potential error if something failed
Expand Down Expand Up @@ -231,6 +232,14 @@ func (tx *Tx) assembleEntries() (ipld.Node, error) {
if err != nil {
return nil, err
}
sas, err := mas.AssembleEntry("Size")
if err != nil {
return nil, err
}
err = sas.AssignInt(int(v.Size))
if err != nil {
return nil, err
}
err = mas.Finish()
if err != nil {
return nil, err
Expand Down Expand Up @@ -363,15 +372,18 @@ func (tx *Tx) IsLocal(key string) bool {
if err != nil {
return false
}
if ref != nil && key == "" {
return true
}
if ref != nil {
return ref.Has(key)
}

return false
}

// GetEntries retrieves all the entries associated with the root of this transaction
func (tx *Tx) GetEntries() ([]string, error) {
// Keys lists the keys for all the entries in the root map of this transaction
func (tx *Tx) Keys() ([]string, error) {
// If this transaction has entries we just return them otherwise
// we're looking at a different transaction
if len(tx.entries) > 0 {
Expand All @@ -384,14 +396,89 @@ func (tx *Tx) GetEntries() ([]string, error) {
return entries, nil
}

if ref, err := tx.index.GetRef(tx.root); err == nil {
return utils.MapKeys(
tx.ctx,
ref.PayloadCID,
storeutil.LoaderForBlockstore(tx.bs),
)
loader := storeutil.LoaderForBlockstore(tx.bs)
if _, err := tx.index.GetRef(tx.root); err != nil {
// Keys might still be in multistore
loader = tx.store.Loader
}
return nil, fmt.Errorf("failed to get entried")

keys, err := utils.MapKeys(
tx.ctx,
tx.root,
loader,
)
if err != nil {
return nil, fmt.Errorf("failed to get keys: %w", err)
}
return keys, nil
}

// Entries returns all the entries in the root map of this transaction
func (tx *Tx) Entries() ([]Entry, error) {
loader := storeutil.LoaderForBlockstore(tx.bs)
if _, err := tx.index.GetRef(tx.root); err != nil {
// Keys might still be in multistore
loader = tx.store.Loader
}

lk := cidlink.Link{Cid: tx.root}
// Create an instance of map builder as we're looking to extract all the keys from an IPLD map
nb := basicnode.Prototype.Map.NewBuilder()
// Use a loader from the link to read all the children blocks from the global store
err := lk.Load(tx.ctx, ipld.LinkContext{}, nb, loader)
if err != nil {
return nil, err
}
// load the IPLD tree
nd := nb.Build()
// Gather the keys in an array
entries := make([]Entry, nd.Length())
it := nd.MapIterator()
i := 0
// Iterate over all the map entries
for !it.Done() {
k, v, err := it.Next()
// all succeed or fail
if err != nil {
return nil, err
}

key, err := k.AsString()
if err != nil {
return nil, err
}

// An entry with no value should fail
vn, err := v.LookupByString("Value")
if err != nil {
return nil, err
}
l, err := vn.AsLink()
if err != nil {
return nil, err
}

entries[i] = Entry{
Key: key,
Value: l.(cidlink.Link).Cid,
}
i++

// An entry with no size is still fine
sn, err := v.LookupByString("Size")
if err != nil {
log.Debug().Str("key", key).Msg("no size present in entry")
continue
}
size, err := sn.AsInt()
if err != nil {
continue
}
entries[i-1].Size = int64(size)

}
return entries, nil

}

func (tx *Tx) loadFileEntry(k string, store *multistore.Store) (files.Node, error) {
Expand Down
50 changes: 46 additions & 4 deletions exchange/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestMapFieldSelector(t *testing.T) {
stat, err := utils.Stat(ctx, tx.Store(), tx.Root(), sel.Key("line2.txt"))
require.NoError(t, err)
require.Equal(t, 2, stat.NumBlocks)
require.Equal(t, 627, stat.Size)
require.Equal(t, 683, stat.Size)

// Close the transaction
require.NoError(t, tx.Close())
Expand Down Expand Up @@ -429,7 +429,7 @@ loop2:
}

func TestTxGetEntries(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel()
mn := mocknet.New(ctx)

Expand All @@ -450,13 +450,55 @@ func TestTxGetEntries(t *testing.T) {
require.NoError(t, tx.Put(KeyFromPath(p), rootCid, int64(len(bytes))))
}

fname := n1.CreateRandomFile(t, 56000)
link, bytes := n1.LoadFileToStore(ctx, t, tx.Store(), fname)
rootCid := link.(cidlink.Link).Cid
require.NoError(t, tx.Put(KeyFromPath(fname), rootCid, int64(len(bytes))))

require.NoError(t, tx.Commit())
require.NoError(t, pn.Index().SetRef(tx.Ref()))
require.NoError(t, tx.Close())

// Fresh new tx based on the root of the previous one
ntx := pn.Tx(ctx, WithRoot(tx.Root()))
entries, err := ntx.GetEntries()
keys, err := ntx.Keys()
require.NoError(t, err)
require.Equal(t, len(filepaths)+1, len(keys))

// A client enters the scene
n2 := testutil.NewTestNode(mn, t)
opts2 := Options{
RepoPath: n2.DTTmpDir,
}
cn, err := New(ctx, n2.Host, n2.Ds, opts2)
require.NoError(t, err)

require.NoError(t, mn.LinkAll())
require.NoError(t, mn.ConnectAllButSelf())
time.Sleep(time.Second)

gtx := cn.Tx(ctx, WithRoot(tx.Root()), WithStrategy(SelectFirst))
require.NoError(t, gtx.Query(sel.Entries()))

loop:
for {
select {
case <-ctx.Done():
t.Fatal("could not finish gtx1")
case <-gtx.Ongoing():
case <-gtx.Done():
break loop
}
}

// @NOTE: even when selecting a specific key the operation will retrieve all other the entries
// without the linked data. We may need to alter this behavior in cases where there is a large
// number of entries
keys, err = gtx.Keys()
require.NoError(t, err)
require.Equal(t, len(filepaths)+1, len(keys))

entries, err := gtx.Entries()
require.NoError(t, err)
require.Equal(t, len(filepaths), len(entries))
require.Equal(t, len(filepaths)+1, len(entries))
}
2 changes: 1 addition & 1 deletion internal/utils/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func MapKeys(ctx context.Context, root cid.Cid, loader ipld.Loader) (KeyList, er
if err != nil {
return nil, err
}
// The key IPLD node needs to be decoded as bytes
// The key IPLD node needs to be decoded as a string
key, err := k.AsString()
if err != nil {
return nil, err
Expand Down
15 changes: 12 additions & 3 deletions node/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,23 @@ func (s *server) getHandler(w http.ResponseWriter, r *http.Request) {
return
}

log.Debug().Msg("retrieved blocks")

s.addUserHeaders(w)

tx := s.node.exch.Tx(r.Context(), exchange.WithRoot(root))

if key == "" {
// If there is no key we return all the keys
keys, err := tx.GetEntries()
// If there is no key we return all the entries as a JSON file detailing information
// about each entry. This allows clients to inspec the content in a transaction before
// fetching all of it.
entries, err := tx.Entries()
if err != nil {
http.Error(w, "Failed to get entries", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(keys)
json.NewEncoder(w).Encode(entries)
return
}
fnd, err := tx.GetFile(segs[0])
Expand Down Expand Up @@ -266,6 +270,11 @@ func (s *server) postHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "failed to commit tx", http.StatusInternalServerError)
return
}
err = s.node.exch.Index().SetRef(tx.Ref())
if err != nil {
http.Error(w, "failed to set new ref", http.StatusInternalServerError)
return
}
root = tx.Root()
} else {
c, err := s.node.Add(r.Context(), s.node.dag, files.NewReaderFile(r.Body))
Expand Down
8 changes: 8 additions & 0 deletions selectors/sel.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ func All() ipld.Node {
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
}

// Entries selects all entries of an IPLD collection without traversing any data linked in the entries
// Limitting the recursion depth to 1 will reach all the entries of a map but not beyond.
func Entries() ipld.Node {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
return ssb.ExploreRecursive(selector.RecursionLimitDepth(1),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
}

// Key selects the link and all the children associated with a given key in a Map
func Key(key string) ipld.Node {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
Expand Down

0 comments on commit 580a109

Please sign in to comment.