Skip to content

Commit

Permalink
feat: sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed Dec 11, 2023
1 parent 7e1cbf9 commit ec78be3
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 75 deletions.
6 changes: 3 additions & 3 deletions client/car/sharding/sharding.go → car/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// https://observablehq.com/@gozala/w3up-shard-size
const shardSize = 133_169_152
const ShardSize = 133_169_152

/** Byte length of a CBOR encoded CAR header with zero roots. */
const noRootsHeaderLen = 17
Expand Down Expand Up @@ -42,7 +42,7 @@ func NewSharderFromCAR(reader io.Reader) (iterable.Iterator[io.Reader], error) {
}

func NewSharder(roots []ipld.Link, blocks iterable.Iterator[block.Block], options ...Option) (iterable.Iterator[io.Reader], error) {
cfg := sharderConfig{shdsize: shardSize}
cfg := sharderConfig{shdsize: ShardSize}
for _, opt := range options {
if err := opt(&cfg); err != nil {
return nil, err
Expand All @@ -66,7 +66,7 @@ func NewSharder(roots []ipld.Link, blocks iterable.Iterator[block.Block], option
return nil, io.EOF
}

clen := hdrlen
clen := 0
return car.Encode(roots, iterable.NewIterator(func() (block.Block, error) {
var blk ipld.Block
if nxt != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/web3-storage/go-ucanto/core/ipld/block"
"github.com/web3-storage/go-ucanto/core/ipld/hash/sha256"
"github.com/web3-storage/go-ucanto/core/iterable"
"github.com/web3-storage/go-w3up/client/car/sharding"
"github.com/web3-storage/go-w3up/car/sharding"
)

func randomRawBlock(t testing.TB, size int) ipld.Block {
Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions cmd/util.go → cmd/util/util.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package util

import (
_ "embed"
Expand Down Expand Up @@ -27,7 +27,7 @@ type configurationModel struct {
Signer []byte
}

func mustGetSigner() principal.Signer {
func MustGetSigner() principal.Signer {
str := os.Getenv("W3UP_PRIVATE_KEY") // use env var preferably
if str != "" {
s, err := signer.Parse(str)
Expand Down Expand Up @@ -92,7 +92,7 @@ func mustReadConfig() *configurationModel {
return &conf
}

func mustGetConnection() client.Connection {
func MustGetConnection() client.Connection {
// service URL & DID
serviceURL, err := url.Parse("https://up.web3.storage")
if err != nil {
Expand All @@ -116,15 +116,15 @@ func mustGetConnection() client.Connection {
return conn
}

func mustParseDID(str string) did.DID {
func MustParseDID(str string) did.DID {
did, err := did.Parse(str)
if err != nil {
log.Fatalf("parsing DID: %s", err)
}
return did
}

func mustGetProof(path string) delegation.Delegation {
func MustGetProof(path string) delegation.Delegation {
b, err := os.ReadFile(path)
if err != nil {
log.Fatalf("reading proof file: %s", err)
Expand Down
160 changes: 97 additions & 63 deletions cmd/w3.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"bytes"
"fmt"
"io"
"log"
"net/http"
"os"
Expand All @@ -13,10 +15,14 @@ import (
"github.com/urfave/cli/v2"
"github.com/web3-storage/go-ucanto/core/car"
"github.com/web3-storage/go-ucanto/core/delegation"
"github.com/web3-storage/go-ucanto/did"
"github.com/web3-storage/go-ucanto/principal"
"github.com/web3-storage/go-w3up/capability/storeadd"
"github.com/web3-storage/go-w3up/capability/uploadadd"
"github.com/web3-storage/go-w3up/capability/uploadlist"
"github.com/web3-storage/go-w3up/car/sharding"
"github.com/web3-storage/go-w3up/client"
"github.com/web3-storage/go-w3up/cmd/util"
)

func main() {
Expand Down Expand Up @@ -48,7 +54,7 @@ func main() {
Name: "car",
Aliases: []string{"c"},
Value: "",
Usage: "Path to CAR file to upload (max 4GB).",
Usage: "Path to CAR file to upload.",
},
},
Action: up,
Expand Down Expand Up @@ -85,16 +91,16 @@ func main() {
}

func whoami(cCtx *cli.Context) error {
s := mustGetSigner()
s := util.MustGetSigner()
fmt.Println(s.DID())
return nil
}

func up(cCtx *cli.Context) error {
signer := mustGetSigner()
conn := mustGetConnection()
space := mustParseDID(cCtx.String("space"))
proof := mustGetProof(cCtx.String("proof"))
signer := util.MustGetSigner()
conn := util.MustGetConnection()
space := util.MustParseDID(cCtx.String("space"))
proofs := []delegation.Delegation{util.MustGetProof(cCtx.String("proof"))}

f0, err := os.Open(cCtx.String("car"))
if err != nil {
Expand All @@ -106,57 +112,118 @@ func up(cCtx *cli.Context) error {
log.Fatalf("stat file: %s", err)
}

mh, err := multihash.SumStream(f0, multihash.SHA2_256, -1)
if err != nil {
log.Fatalf("hashing CAR: %s", err)
var shdlnks []ipld.Link

defer f0.Close()
if stat.Size() < sharding.ShardSize {
link := storeShard(signer, space, f0, proofs)
fmt.Println(link.String())
shdlnks = append(shdlnks, link)
} else {
_, blocks, err := car.Decode(f0)
if err != nil {
log.Fatalf("decoding CAR: %s", err)
}
shds, err := sharding.NewSharder([]ipld.Link{}, blocks)
if err != nil {
log.Fatalf("sharding CAR: %s", err)
}

for {
shd, err := shds.Next()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
link := storeShard(signer, space, shd, proofs)
fmt.Println(link.String())
shdlnks = append(shdlnks, link)
}
}

err = f0.Close()
f3, err := os.Open(cCtx.String("car"))
if err != nil {
log.Fatalf("opening file: %s", err)
}
roots, _, err := car.Decode(f3)
if err != nil {
log.Fatalf("reading roots: %s", err)
}
err = f3.Close()
if err != nil {
log.Fatalf("closing file: %s", err)
}
if len(roots) > 0 {
rcpt, err := client.UploadAdd(
signer,
space,
&uploadadd.Caveat{
Root: roots[0],
Shards: shdlnks,
},
client.WithConnection(conn),
client.WithProofs(proofs),
)
if err != nil {
return err
}
if rcpt.Out().Error() != nil {
log.Fatalf("%+v\n", rcpt.Out().Error())
}

fmt.Printf("⁂ https://w3s.link/ipfs/%s\n", roots[0])
}

return nil
}

func storeShard(issuer principal.Signer, space did.DID, shard io.Reader, proofs []delegation.Delegation) ipld.Link {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(shard)
if err != nil {
log.Fatalf("reading CAR: %s", err)
}

mh, err := multihash.Sum(buf.Bytes(), multihash.SHA2_256, -1)
if err != nil {
log.Fatalf("hashing CAR: %s", err)
}

link := cidlink.Link{Cid: cid.NewCidV1(0x0202, mh)}
fmt.Println(link.String())

rcpt, err := client.StoreAdd(
signer,
issuer,
space,
&storeadd.Caveat{
Link: link,
Size: uint64(stat.Size()),
Size: uint64(buf.Len()),
},
client.WithConnection(conn),
client.WithProofs([]delegation.Delegation{proof}),
client.WithConnection(util.MustGetConnection()),
client.WithProofs(proofs),
)
if err != nil {
return err
log.Fatalf("store/add %s: %s", link, err)
}

if rcpt.Out().Error() != nil {
log.Fatalf("%+v\n", rcpt.Out().Error())
}

if rcpt.Out().Ok().Status == "upload" {
f2, err := os.Open(cCtx.String("car"))
if err != nil {
log.Fatalf("opening file: %s", err)
}

fmt.Println(*rcpt.Out().Ok().Url)
hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, f2)
hr, err := http.NewRequest("PUT", *rcpt.Out().Ok().Url, bytes.NewReader(buf.Bytes()))
if err != nil {
log.Fatalf("creating HTTP request: %s", err)
}

hdr := map[string][]string{}
for k, v := range rcpt.Out().Ok().Headers.Values {
fmt.Printf("%s: %s\n", k, v)
hdr[k] = []string{v}
}

hr.Header = hdr
hr.ContentLength = stat.Size()
hr.ContentLength = int64(buf.Len())
httpClient := http.Client{}
res, err := httpClient.Do(hr)
if err != nil {
Expand All @@ -171,47 +238,14 @@ func up(cCtx *cli.Context) error {
}
}

f3, err := os.Open(cCtx.String("car"))
if err != nil {
log.Fatalf("opening file: %s", err)
}
roots, _, err := car.Decode(f3)
if err != nil {
log.Fatalf("reading roots: %s", err)
}
err = f3.Close()
if err != nil {
log.Fatalf("closing file: %s", err)
}
if len(roots) > 0 {
rcpt, err := client.UploadAdd(
signer,
space,
&uploadadd.Caveat{
Root: roots[0],
Shards: []ipld.Link{link},
},
client.WithConnection(conn),
client.WithProofs([]delegation.Delegation{proof}),
)
if err != nil {
return err
}
if rcpt.Out().Error() != nil {
log.Fatalf("%+v\n", rcpt.Out().Error())
}

fmt.Printf("⁂ https://w3s.link/ipfs/%s\n", roots[0])
}

return nil
return link
}

func ls(cCtx *cli.Context) error {
signer := mustGetSigner()
conn := mustGetConnection()
space := mustParseDID(cCtx.String("space"))
proof := mustGetProof(cCtx.String("proof"))
signer := util.MustGetSigner()
conn := util.MustGetConnection()
space := util.MustParseDID(cCtx.String("space"))
proof := util.MustGetProof(cCtx.String("proof"))

rcpt, err := client.UploadList(
signer,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7
github.com/urfave/cli/v2 v2.25.7
github.com/web3-storage/go-ucanto v0.0.0-20231211123132-55806bb24d2e
github.com/web3-storage/go-ucanto v0.1.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsX
github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/web3-storage/go-ucanto v0.0.0-20231211123132-55806bb24d2e h1:4Ag+MHo9809f4HBEtiZ/ung6iwhjflttjiBlBo23DL0=
github.com/web3-storage/go-ucanto v0.0.0-20231211123132-55806bb24d2e/go.mod h1:XD6zahQ8HLh8Z2CSK7xYcTR0A7oCVYXTZB7fFtYqGF8=
github.com/web3-storage/go-ucanto v0.1.0 h1:Hg6jO7OLLeDLtmseQZWQGBFJMp+5t5OWAFVL5Y3LsOs=
github.com/web3-storage/go-ucanto v0.1.0/go.mod h1:XD6zahQ8HLh8Z2CSK7xYcTR0A7oCVYXTZB7fFtYqGF8=
github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25 h1:yVYDLoN2gmB3OdBXFW8e1UwgVbmCvNlnAKhvHPaNARI=
github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
Expand Down

0 comments on commit ec78be3

Please sign in to comment.