diff --git a/exchange/tx.go b/exchange/tx.go index 4d881142..f3a73a5e 100644 --- a/exchange/tx.go +++ b/exchange/tx.go @@ -32,6 +32,7 @@ import ( "github.com/myelnet/pop/retrieval" "github.com/myelnet/pop/retrieval/client" "github.com/myelnet/pop/retrieval/deal" + "github.com/rs/zerolog/log" ) // DefaultHashFunction used for generating CIDs of imported data @@ -44,11 +45,11 @@ var ErrNoStrategy = errors.New("no strategy") // Entry represents a link to an item in the DAG map type Entry struct { // Key is string name of the entry - Key string + Key string `json:"key"` // Value is the CID of the represented content - Value cid.Cid + Value cid.Cid `json:"value"` // Size is the original file size. Not encoded in the DAG - Size int64 + Size int64 `json:"size"` } // TxResult returns metadata about the transaction including a potential error if something failed @@ -231,6 +232,14 @@ func (tx *Tx) assembleEntries() (ipld.Node, error) { if err != nil { return nil, err } + sas, err := mas.AssembleEntry("Size") + if err != nil { + return nil, err + } + err = sas.AssignInt(int(v.Size)) + if err != nil { + return nil, err + } err = mas.Finish() if err != nil { return nil, err @@ -363,6 +372,9 @@ func (tx *Tx) IsLocal(key string) bool { if err != nil { return false } + if ref != nil && key == "" { + return true + } if ref != nil { return ref.Has(key) } @@ -370,8 +382,8 @@ func (tx *Tx) IsLocal(key string) bool { return false } -// GetEntries retrieves all the entries associated with the root of this transaction -func (tx *Tx) GetEntries() ([]string, error) { +// Keys lists the keys for all the entries in the root map of this transaction +func (tx *Tx) Keys() ([]string, error) { // If this transaction has entries we just return them otherwise // we're looking at a different transaction if len(tx.entries) > 0 { @@ -384,14 +396,89 @@ func (tx *Tx) GetEntries() ([]string, error) { return entries, nil } - if ref, err := tx.index.GetRef(tx.root); err == nil { - return utils.MapKeys( - tx.ctx, - ref.PayloadCID, - storeutil.LoaderForBlockstore(tx.bs), - ) + loader := storeutil.LoaderForBlockstore(tx.bs) + if _, err := tx.index.GetRef(tx.root); err != nil { + // Keys might still be in multistore + loader = tx.store.Loader } - return nil, fmt.Errorf("failed to get entried") + + keys, err := utils.MapKeys( + tx.ctx, + tx.root, + loader, + ) + if err != nil { + return nil, fmt.Errorf("failed to get keys: %w", err) + } + return keys, nil +} + +// Entries returns all the entries in the root map of this transaction +func (tx *Tx) Entries() ([]Entry, error) { + loader := storeutil.LoaderForBlockstore(tx.bs) + if _, err := tx.index.GetRef(tx.root); err != nil { + // Keys might still be in multistore + loader = tx.store.Loader + } + + lk := cidlink.Link{Cid: tx.root} + // Create an instance of map builder as we're looking to extract all the keys from an IPLD map + nb := basicnode.Prototype.Map.NewBuilder() + // Use a loader from the link to read all the children blocks from the global store + err := lk.Load(tx.ctx, ipld.LinkContext{}, nb, loader) + if err != nil { + return nil, err + } + // load the IPLD tree + nd := nb.Build() + // Gather the keys in an array + entries := make([]Entry, nd.Length()) + it := nd.MapIterator() + i := 0 + // Iterate over all the map entries + for !it.Done() { + k, v, err := it.Next() + // all succeed or fail + if err != nil { + return nil, err + } + + key, err := k.AsString() + if err != nil { + return nil, err + } + + // An entry with no value should fail + vn, err := v.LookupByString("Value") + if err != nil { + return nil, err + } + l, err := vn.AsLink() + if err != nil { + return nil, err + } + + entries[i] = Entry{ + Key: key, + Value: l.(cidlink.Link).Cid, + } + i++ + + // An entry with no size is still fine + sn, err := v.LookupByString("Size") + if err != nil { + log.Debug().Str("key", key).Msg("no size present in entry") + continue + } + size, err := sn.AsInt() + if err != nil { + continue + } + entries[i-1].Size = int64(size) + + } + return entries, nil + } func (tx *Tx) loadFileEntry(k string, store *multistore.Store) (files.Node, error) { diff --git a/exchange/tx_test.go b/exchange/tx_test.go index 16e802e2..cea9d186 100644 --- a/exchange/tx_test.go +++ b/exchange/tx_test.go @@ -305,7 +305,7 @@ func TestMapFieldSelector(t *testing.T) { stat, err := utils.Stat(ctx, tx.Store(), tx.Root(), sel.Key("line2.txt")) require.NoError(t, err) require.Equal(t, 2, stat.NumBlocks) - require.Equal(t, 627, stat.Size) + require.Equal(t, 683, stat.Size) // Close the transaction require.NoError(t, tx.Close()) @@ -429,7 +429,7 @@ loop2: } func TestTxGetEntries(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second) defer cancel() mn := mocknet.New(ctx) @@ -450,13 +450,55 @@ func TestTxGetEntries(t *testing.T) { require.NoError(t, tx.Put(KeyFromPath(p), rootCid, int64(len(bytes)))) } + fname := n1.CreateRandomFile(t, 56000) + link, bytes := n1.LoadFileToStore(ctx, t, tx.Store(), fname) + rootCid := link.(cidlink.Link).Cid + require.NoError(t, tx.Put(KeyFromPath(fname), rootCid, int64(len(bytes)))) + require.NoError(t, tx.Commit()) require.NoError(t, pn.Index().SetRef(tx.Ref())) require.NoError(t, tx.Close()) // Fresh new tx based on the root of the previous one ntx := pn.Tx(ctx, WithRoot(tx.Root())) - entries, err := ntx.GetEntries() + keys, err := ntx.Keys() + require.NoError(t, err) + require.Equal(t, len(filepaths)+1, len(keys)) + + // A client enters the scene + n2 := testutil.NewTestNode(mn, t) + opts2 := Options{ + RepoPath: n2.DTTmpDir, + } + cn, err := New(ctx, n2.Host, n2.Ds, opts2) + require.NoError(t, err) + + require.NoError(t, mn.LinkAll()) + require.NoError(t, mn.ConnectAllButSelf()) + time.Sleep(time.Second) + + gtx := cn.Tx(ctx, WithRoot(tx.Root()), WithStrategy(SelectFirst)) + require.NoError(t, gtx.Query(sel.Entries())) + +loop: + for { + select { + case <-ctx.Done(): + t.Fatal("could not finish gtx1") + case <-gtx.Ongoing(): + case <-gtx.Done(): + break loop + } + } + + // @NOTE: even when selecting a specific key the operation will retrieve all other the entries + // without the linked data. We may need to alter this behavior in cases where there is a large + // number of entries + keys, err = gtx.Keys() + require.NoError(t, err) + require.Equal(t, len(filepaths)+1, len(keys)) + + entries, err := gtx.Entries() require.NoError(t, err) - require.Equal(t, len(filepaths), len(entries)) + require.Equal(t, len(filepaths)+1, len(entries)) } diff --git a/internal/utils/stat.go b/internal/utils/stat.go index 2f4f7e8f..c5d97def 100644 --- a/internal/utils/stat.go +++ b/internal/utils/stat.go @@ -113,7 +113,7 @@ func MapKeys(ctx context.Context, root cid.Cid, loader ipld.Loader) (KeyList, er if err != nil { return nil, err } - // The key IPLD node needs to be decoded as bytes + // The key IPLD node needs to be decoded as a string key, err := k.AsString() if err != nil { return nil, err diff --git a/node/server.go b/node/server.go index ddebbb00..936f04cb 100644 --- a/node/server.go +++ b/node/server.go @@ -170,19 +170,23 @@ func (s *server) getHandler(w http.ResponseWriter, r *http.Request) { return } + log.Debug().Msg("retrieved blocks") + s.addUserHeaders(w) tx := s.node.exch.Tx(r.Context(), exchange.WithRoot(root)) if key == "" { - // If there is no key we return all the keys - keys, err := tx.GetEntries() + // If there is no key we return all the entries as a JSON file detailing information + // about each entry. This allows clients to inspec the content in a transaction before + // fetching all of it. + entries, err := tx.Entries() if err != nil { http.Error(w, "Failed to get entries", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(keys) + json.NewEncoder(w).Encode(entries) return } fnd, err := tx.GetFile(segs[0]) @@ -266,6 +270,11 @@ func (s *server) postHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "failed to commit tx", http.StatusInternalServerError) return } + err = s.node.exch.Index().SetRef(tx.Ref()) + if err != nil { + http.Error(w, "failed to set new ref", http.StatusInternalServerError) + return + } root = tx.Root() } else { c, err := s.node.Add(r.Context(), s.node.dag, files.NewReaderFile(r.Body)) diff --git a/selectors/sel.go b/selectors/sel.go index d1d177a1..15d6d185 100644 --- a/selectors/sel.go +++ b/selectors/sel.go @@ -14,6 +14,14 @@ func All() ipld.Node { ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() } +// Entries selects all entries of an IPLD collection without traversing any data linked in the entries +// Limitting the recursion depth to 1 will reach all the entries of a map but not beyond. +func Entries() ipld.Node { + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + return ssb.ExploreRecursive(selector.RecursionLimitDepth(1), + ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() +} + // Key selects the link and all the children associated with a given key in a Map func Key(key string) ipld.Node { ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)