Skip to content

Commit

Permalink
Merge branch 'master' into feat/config-bitswap-wanthave-replace
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero authored Sep 28, 2024
2 parents fda5f6e + ca4f486 commit 06c2e97
Show file tree
Hide file tree
Showing 16 changed files with 679 additions and 434 deletions.
20 changes: 1 addition & 19 deletions .github/workflows/interop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,9 @@ jobs:
restore-keys: ${{ runner.os }}-${{ github.job }}-helia-
- run: sudo apt update
- run: sudo apt install -y libxkbcommon0 libxdamage1 libgbm1 libpango-1.0-0 libcairo2 # dependencies for playwright

# TODO: for now run against version from https://github.com/ipfs/helia/pull/584
- name: Checkout helia-interop with provisional fix
uses: actions/checkout@v4
with:
repository: ipfs/helia
ref: ab6b385787075ad9932f24362293b3bb82ff1d96
path: helia
- name: Run provisional build of helia
run: npm i && npm run build
working-directory: helia
- name: Run provisional build of helia-interop
run: npm i && npm run build && npx .
working-directory: helia/packages/interop
- run: npx --package @helia/interop helia-interop
env:
KUBO_BINARY: ${{ github.workspace }}/cmd/ipfs/ipfs

# TODO: switch back to release once https://github.com/ipfs/helia/pull/584 ships
#- run: npx --package @helia/interop helia-interop
# env:
# KUBO_BINARY: ${{ github.workspace }}/cmd/ipfs/ipfs
ipfs-webui:
needs: [interop-prep]
runs-on: ${{ fromJSON(github.repository == 'ipfs/kubo' && '["self-hosted", "linux", "x64", "2xlarge"]' || '"ubuntu-latest"') }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Kubo Changelogs

- [v0.31](docs/changelogs/v0.31.md)
- [v0.30](docs/changelogs/v0.30.md)
- [v0.29](docs/changelogs/v0.29.md)
- [v0.28](docs/changelogs/v0.28.md)
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/kubo/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ take effect.
prometheus.MustRegister(&corehttp.IpfsNodeCollector{Node: node})

// start MFS pinning thread
startPinMFS(daemonConfigPollInterval, cctx, &ipfsPinMFSNode{node})
startPinMFS(cctx, daemonConfigPollInterval, &ipfsPinMFSNode{node})

// The daemon is *finally* ready.
fmt.Printf("Daemon is ready\n")
Expand Down
167 changes: 65 additions & 102 deletions cmd/ipfs/kubo/pinmfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
pinclient "github.com/ipfs/boxo/pinning/remote/client"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"

config "github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core"
Expand Down Expand Up @@ -40,6 +40,7 @@ func init() {
d, err := time.ParseDuration(pollDurStr)
if err != nil {
mfslog.Error("error parsing MFS_PIN_POLL_INTERVAL, using default:", err)
return
}
daemonConfigPollInterval = d
}
Expand Down Expand Up @@ -74,87 +75,58 @@ func (x *ipfsPinMFSNode) PeerHost() host.Host {
return x.node.PeerHost
}

func startPinMFS(configPollInterval time.Duration, cctx pinMFSContext, node pinMFSNode) {
errCh := make(chan error)
go pinMFSOnChange(configPollInterval, cctx, node, errCh)
go func() {
for {
select {
case err, isOpen := <-errCh:
if !isOpen {
return
}
mfslog.Errorf("%v", err)
case <-cctx.Context().Done():
return
}
}
}()
func startPinMFS(cctx pinMFSContext, configPollInterval time.Duration, node pinMFSNode) {
go pinMFSOnChange(cctx, configPollInterval, node)
}

func pinMFSOnChange(configPollInterval time.Duration, cctx pinMFSContext, node pinMFSNode, errCh chan<- error) {
defer close(errCh)

var tmo *time.Timer
defer func() {
if tmo != nil {
tmo.Stop()
}
}()
func pinMFSOnChange(cctx pinMFSContext, configPollInterval time.Duration, node pinMFSNode) {
tmo := time.NewTimer(configPollInterval)
defer tmo.Stop()

lastPins := map[string]lastPin{}
for {
// polling sleep
if tmo == nil {
tmo = time.NewTimer(configPollInterval)
} else {
tmo.Reset(configPollInterval)
}
select {
case <-cctx.Context().Done():
return
case <-tmo.C:
tmo.Reset(configPollInterval)
}

// reread the config, which may have changed in the meantime
cfg, err := cctx.GetConfig()
if err != nil {
select {
case errCh <- fmt.Errorf("pinning reading config (%v)", err):
case <-cctx.Context().Done():
return
}
mfslog.Errorf("pinning reading config (%v)", err)
continue
}
mfslog.Debugf("pinning loop is awake, %d remote services", len(cfg.Pinning.RemoteServices))

// get the most recent MFS root cid
rootNode, err := node.RootNode()
if err != nil {
select {
case errCh <- fmt.Errorf("pinning reading MFS root (%v)", err):
case <-cctx.Context().Done():
return
}
mfslog.Errorf("pinning reading MFS root (%v)", err)
continue
}
rootCid := rootNode.Cid()

// pin to all remote services in parallel
pinAllMFS(cctx.Context(), node, cfg, rootCid, lastPins, errCh)
pinAllMFS(cctx.Context(), node, cfg, rootNode.Cid(), lastPins)
}
}

// pinAllMFS pins on all remote services in parallel to overcome DoS attacks.
func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin, errCh chan<- error) {
ch := make(chan lastPin, len(cfg.Pinning.RemoteServices))
for svcName_, svcConfig_ := range cfg.Pinning.RemoteServices {
func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin) {
ch := make(chan lastPin)
var started int

for svcName, svcConfig := range cfg.Pinning.RemoteServices {
if ctx.Err() != nil {
break
}

// skip services where MFS is not enabled
svcName, svcConfig := svcName_, svcConfig_
mfslog.Debugf("pinning MFS root considering service %q", svcName)
if !svcConfig.Policies.MFS.Enable {
mfslog.Debugf("pinning service %q is not enabled", svcName)
ch <- lastPin{}
continue
}
// read mfs pin interval for this service
Expand All @@ -165,11 +137,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid
var err error
repinInterval, err = time.ParseDuration(svcConfig.Policies.MFS.RepinInterval)
if err != nil {
select {
case errCh <- fmt.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err):
case <-ctx.Done():
}
ch <- lastPin{}
mfslog.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err)
continue
}
}
Expand All @@ -182,38 +150,30 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid
} else {
mfslog.Debugf("pinning MFS root to %q: skipped due to MFS.RepinInterval=%s (remaining: %s)", svcName, repinInterval.String(), (repinInterval - time.Since(last.Time)).String())
}
ch <- lastPin{}
continue
}
}

mfslog.Debugf("pinning MFS root %q to %q", rootCid, svcName)
go func() {
if r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig); err != nil {
select {
case errCh <- fmt.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err):
case <-ctx.Done():
}
ch <- lastPin{}
} else {
ch <- r
go func(svcName string, svcConfig config.RemotePinningService) {
r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig)
if err != nil {
mfslog.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err)
}
}()
ch <- r
}(svcName, svcConfig)
started++
}
for i := 0; i < len(cfg.Pinning.RemoteServices); i++ {

// Collect results from all started goroutines.
for i := 0; i < started; i++ {
if x := <-ch; x.IsValid() {
lastPins[x.ServiceName] = x
}
}
}

func pinMFS(
ctx context.Context,
node pinMFSNode,
cid cid.Cid,
svcName string,
svcConfig config.RemotePinningService,
) (lastPin, error) {
func pinMFS(ctx context.Context, node pinMFSNode, cid cid.Cid, svcName string, svcConfig config.RemotePinningService) (lastPin, error) {
c := pinclient.NewClient(svcConfig.API.Endpoint, svcConfig.API.Key)

pinName := svcConfig.Policies.MFS.PinName
Expand Down Expand Up @@ -243,43 +203,46 @@ func pinMFS(
}
for range lsPinCh { // in case the prior loop exits early
}
if err := <-lsErrCh; err != nil {
err := <-lsErrCh
if err != nil {
return lastPin{}, fmt.Errorf("error while listing remote pins: %v", err)
}

// CID of the current MFS root is already being pinned, nothing to do
if pinning {
mfslog.Debugf("pinning MFS to %q: pin for %q exists since %s, skipping", svcName, cid, pinTime.String())
return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil
}

// Prepare Pin.name
addOpts := []pinclient.AddOption{pinclient.PinOpts.WithName(pinName)}
if !pinning {
// Prepare Pin.name
addOpts := []pinclient.AddOption{pinclient.PinOpts.WithName(pinName)}

// Prepare Pin.origins
// Add own multiaddrs to the 'origins' array, so Pinning Service can
// use that as a hint and connect back to us (if possible)
if node.PeerHost() != nil {
addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(node.PeerHost()))
if err != nil {
return lastPin{}, err
// Prepare Pin.origins
// Add own multiaddrs to the 'origins' array, so Pinning Service can
// use that as a hint and connect back to us (if possible)
if node.PeerHost() != nil {
addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(node.PeerHost()))
if err != nil {
return lastPin{}, err
}
addOpts = append(addOpts, pinclient.PinOpts.WithOrigins(addrs...))
}
addOpts = append(addOpts, pinclient.PinOpts.WithOrigins(addrs...))
}

// Create or replace pin for MFS root
if existingRequestID != "" {
mfslog.Debugf("pinning to %q: replacing existing MFS root pin with %q", svcName, cid)
_, err := c.Replace(ctx, existingRequestID, cid, addOpts...)
if err != nil {
return lastPin{}, err
// Create or replace pin for MFS root
if existingRequestID != "" {
mfslog.Debugf("pinning to %q: replacing existing MFS root pin with %q", svcName, cid)
if _, err = c.Replace(ctx, existingRequestID, cid, addOpts...); err != nil {
return lastPin{}, err
}
} else {
mfslog.Debugf("pinning to %q: creating a new MFS root pin for %q", svcName, cid)
if _, err = c.Add(ctx, cid, addOpts...); err != nil {
return lastPin{}, err
}
}
} else {
mfslog.Debugf("pinning to %q: creating a new MFS root pin for %q", svcName, cid)
_, err := c.Add(ctx, cid, addOpts...)
if err != nil {
return lastPin{}, err
}
mfslog.Debugf("pinning MFS to %q: pin for %q exists since %s, skipping", svcName, cid, pinTime.String())
}
return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil

return lastPin{
Time: pinTime,
ServiceName: svcName,
ServiceConfig: svcConfig,
CID: cid,
}, nil
}
Loading

0 comments on commit 06c2e97

Please sign in to comment.