diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 09ee5a6..2b0014d 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -65,12 +65,13 @@ to override the gateway address from which to retrieve IPNS Records from. ### `IPNS_RECORD_GATEWAY` Single URL or a comma separated list of Gateway endpoints that support requests for `application/vnd.ipfs.ipns-record`. -This is used for IPNS Record routing. If not set, the value of `PROXY_GATEWAY_URL` will be -used. +This is used for IPNS Record routing. `IPNS_RECORD_GATEWAY` also supports [Routing V1 HTTP API](https://specs.ipfs.tech/routing/http-routing-v1/) for IPNS Record routing ([IPIP-379](https://specs.ipfs.tech/ipips/ipip-0379/)). To use it, the provided URL must end with `/routing/v1`. +If not set, the IPNS records will be fetched from `KUBO_RPC_URL`. + ## Saturn Backend ### `STRN_ORCHESTRATOR_URL` diff --git a/handlers.go b/handlers.go index 92f62a1..a09a3ec 100644 --- a/handlers.go +++ b/handlers.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "math/rand" "net/http" @@ -12,6 +13,7 @@ import ( _ "net/http/pprof" "github.com/ipfs/bifrost-gateway/lib" + "github.com/libp2p/go-libp2p/core/routing" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/filecoin-saturn/caboose" @@ -69,8 +71,15 @@ func withRequestLogger(next http.Handler) http.Handler { } func makeGatewayHandler(bs bstore.Blockstore, kuboRPC, gatewayURLs []string, port int, blockCacheSize int, cdns *cachedDNS, useGraphBackend bool) (*http.Server, error) { - // Sets up the routing system, which will proxy the IPNS routing requests to the given gateway. - routing := newProxyRouting(gatewayURLs, cdns) + // Sets up the routing system, which will proxy the IPNS routing requests to the given gateway or kubo RPC. + var routing routing.ValueStore + if len(gatewayURLs) != 0 { + routing = newProxyRouting(gatewayURLs, cdns) + } else if len(kuboRPC) != 0 { + routing = newRPCProxyRouting(kuboRPC, cdns) + } else { + return nil, errors.New("kubo rpc or gateway urls must be provided in order to delegate routing") + } // Sets up a cache to store blocks in cacheBlockStore, err := lib.NewCacheBlockStore(blockCacheSize) diff --git a/main.go b/main.go index 884d9af..4292199 100644 --- a/main.go +++ b/main.go @@ -109,10 +109,6 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`, } ipnsProxyGateway := getEnvs(EnvIPNSRecordGateway, "") - if len(ipnsProxyGateway) == 0 { - ipnsProxyGateway = proxyGateway - } - gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, ipnsProxyGateway, gatewayPort, blockCacheSize, cdns, useGraphBackend) if err != nil { return err diff --git a/routing.go b/routing.go index b906125..9b1b216 100644 --- a/routing.go +++ b/routing.go @@ -1,7 +1,11 @@ package main import ( + "bytes" "context" + "encoding/base64" + "encoding/json" + "errors" "fmt" "io" "math/rand" @@ -14,6 +18,143 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) +type rpcProxyRouting struct { + kuboRPC []string + httpClient *http.Client + rand *rand.Rand +} + +func newRPCProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore { + s := rand.NewSource(time.Now().Unix()) + rand := rand.New(s) + + return &rpcProxyRouting{ + kuboRPC: kuboRPC, + httpClient: &http.Client{ + Transport: otelhttp.NewTransport(&customTransport{ + // Roundtripper with increased defaults than http.Transport such that retrieving + // multiple lookups concurrently is fast. + RoundTripper: &http.Transport{ + MaxIdleConns: 1000, + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + DialContext: cdns.dialWithCachedDNS, + ForceAttemptHTTP2: true, + }, + }), + }, + rand: rand, + } +} + +func (ps *rpcProxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error { + return routing.ErrNotSupported +} + +func (ps *rpcProxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) { + return ps.fetch(ctx, k) +} + +func (ps *rpcProxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) { + if !strings.HasPrefix(k, "/ipns/") { + return nil, routing.ErrNotSupported + } + + ch := make(chan []byte) + + go func() { + v, err := ps.fetch(ctx, k) + if err != nil { + close(ch) + } else { + ch <- v + close(ch) + } + }() + + return ch, nil +} + +func (ps *rpcProxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) { + name, err := ipns.NameFromRoutingKey([]byte(key)) + if err != nil { + return nil, err + } + + key = "/ipns/" + name.String() + + urlStr := fmt.Sprintf("%s/api/v0/dht/get?arg=%s", ps.getRandomKuboURL(), key) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, urlStr, nil) + if err != nil { + return nil, err + } + + goLog.Debugw("routing proxy fetch", "key", key, "from", req.URL.String()) + defer func() { + if err != nil { + goLog.Debugw("routing proxy fetch error", "key", key, "from", req.URL.String(), "error", err.Error()) + } + }() + + resp, err := ps.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Read at most 10 KiB (max size of IPNS record). + rb, err = io.ReadAll(io.LimitReader(resp.Body, 10240)) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("routing/get RPC returned unexpected status: %s, body: %q", resp.Status, string(rb)) + } + + parts := bytes.Split(bytes.TrimSpace(rb), []byte("\n")) + var b64 string + + for _, part := range parts { + var evt routing.QueryEvent + err = json.Unmarshal(part, &evt) + if err != nil { + return nil, fmt.Errorf("routing/get RPC response cannot be parsed: %w", err) + } + + if evt.Type == routing.Value { + b64 = evt.Extra + break + } + } + + if b64 == "" { + return nil, errors.New("routing/get RPC returned no value") + } + + rb, err = base64.StdEncoding.DecodeString(b64) + if err != nil { + return nil, err + } + + entry, err := ipns.UnmarshalRecord(rb) + if err != nil { + return nil, err + } + + err = ipns.ValidateWithName(entry, name) + if err != nil { + return nil, err + } + + return rb, nil +} + +func (ps *rpcProxyRouting) getRandomKuboURL() string { + return ps.kuboRPC[ps.rand.Intn(len(ps.kuboRPC))] +} + type proxyRouting struct { gatewayURLs []string httpClient *http.Client