Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feat/libp2p-sharedtcp
Browse files Browse the repository at this point in the history
  • Loading branch information
lidel committed Dec 19, 2024
2 parents 6caf78c + 519ae27 commit 2919d6d
Show file tree
Hide file tree
Showing 17 changed files with 255 additions and 72 deletions.
23 changes: 19 additions & 4 deletions config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,21 @@ import (
"encoding/json"
)

// DefaultDataStoreDirectory is the directory to store all the local IPFS data.
const DefaultDataStoreDirectory = "datastore"
const (
// DefaultDataStoreDirectory is the directory to store all the local IPFS data.
DefaultDataStoreDirectory = "datastore"

// DefaultBlockKeyCacheSize is the size for the blockstore two-queue
// cache which caches block keys and sizes.
DefaultBlockKeyCacheSize = 64 << 10

// DefaultWriteThrough specifies whether to use a "write-through"
// Blockstore and Blockservice. This means that they will write
// without performing any reads to check if the incoming blocks are
// already present in the datastore. Enable for datastores with fast
// writes and slower reads.
DefaultWriteThrough bool = true
)

// Datastore tracks the configuration of the datastore.
type Datastore struct {
Expand All @@ -21,8 +34,10 @@ type Datastore struct {

Spec map[string]interface{}

HashOnRead bool
BloomFilterSize int
HashOnRead bool
BloomFilterSize int
BlockKeyCacheSize OptionalInteger `json:",omitempty"`
WriteThrough Flag `json:",omitempty"`
}

// DataStorePath returns the default data store path given a configuration root
Expand Down
11 changes: 11 additions & 0 deletions config/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ const (
DefaultUnixFSRawLeaves = false
DefaultUnixFSChunker = "size-262144"
DefaultHashFunction = "sha2-256"

// DefaultBatchMaxNodes controls the maximum number of nodes in a
// write-batch. The total size of the batch is limited by
// BatchMaxnodes and BatchMaxSize.
DefaultBatchMaxNodes = 128
// DefaultBatchMaxSize controls the maximum size of a single
// write-batch. The total size of the batch is limited by
// BatchMaxnodes and BatchMaxSize.
DefaultBatchMaxSize = 100 << 20 // 20MiB
)

// Import configures the default options for ingesting data. This affects commands
Expand All @@ -14,4 +23,6 @@ type Import struct {
UnixFSRawLeaves Flag
UnixFSChunker OptionalString
HashFunction OptionalString
BatchMaxNodes OptionalInteger
BatchMaxSize OptionalInteger
}
15 changes: 14 additions & 1 deletion core/commands/dag/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
cmds "github.com/ipfs/go-ipfs-cmds"
ipld "github.com/ipfs/go-ipld-format"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/coreiface/options"
gocarv2 "github.com/ipld/go-car/v2"

Expand All @@ -24,6 +25,11 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return err
}

cfg, err := node.Repo.Config()
if err != nil {
return err
}

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
Expand Down Expand Up @@ -55,7 +61,14 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())
batch := ipld.NewBatch(req.Context, api.Dag(),
// Default: 128. Means 128 file descriptors needed in flatfs
ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))),
// Default 100MiB. When setting block size to 1MiB, we can add
// ~100 nodes maximum. With default 256KiB block-size, we will
// hit the max nodes limit at 32MiB.p
ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))),
)

roots := cid.NewSet()
var blockCount, blockBytesCount uint64
Expand Down
82 changes: 64 additions & 18 deletions core/commands/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,18 @@ Content added with "ipfs add" (which by default also becomes pinned), is not
added to MFS. Any content can be lazily referenced from MFS with the command
"ipfs files cp /ipfs/<cid> /some/path/" (see ipfs files cp --help).
NOTE:
Most of the subcommands of 'ipfs files' accept the '--flush' flag. It defaults
to true. Use caution when setting this flag to false. It will improve
NOTE: Most of the subcommands of 'ipfs files' accept the '--flush' flag. It
defaults to true and ensures two things: 1) that the changes are reflected in
the full MFS structure (updated CIDs) 2) that the parent-folder's cache is
cleared. Use caution when setting this flag to false. It will improve
performance for large numbers of file operations, but it does so at the cost
of consistency guarantees. If the daemon is unexpectedly killed before running
'ipfs files flush' on the files in question, then data may be lost. This also
applies to run 'ipfs repo gc' concurrently with '--flush=false'
operations.
`,
of consistency guarantees and unbound growth of the directories' in-memory
caches. If the daemon is unexpectedly killed before running 'ipfs files
flush' on the files in question, then data may be lost. This also applies to
run 'ipfs repo gc' concurrently with '--flush=false' operations. We recommend
flushing paths reguarly with 'ipfs files flush', specially the folders on
which many write operations are happening, as a way to clear the directory
cache, free memory and speed up read operations.`,
},
Options: []cmds.Option{
cmds.BoolOption(filesFlushOptionName, "f", "Flush target and ancestors after write.").WithDefault(true),
Expand Down Expand Up @@ -491,10 +493,14 @@ being GC'ed.
}

if flush {
_, err := mfs.FlushPath(req.Context, nd.FilesRoot, dst)
if err != nil {
if _, err := mfs.FlushPath(req.Context, nd.FilesRoot, dst); err != nil {
return fmt.Errorf("cp: cannot flush the created file %s: %s", dst, err)
}
// Flush parent to clear directory cache and free memory.
parent := gopath.Dir(dst)
if _, err = mfs.FlushPath(req.Context, nd.FilesRoot, parent); err != nil {
return fmt.Errorf("cp: cannot flush the created file's parent folder %s: %s", dst, err)
}
}

return nil
Expand Down Expand Up @@ -792,10 +798,30 @@ Example:
}

err = mfs.Mv(nd.FilesRoot, src, dst)
if err == nil && flush {
_, err = mfs.FlushPath(req.Context, nd.FilesRoot, "/")
if err != nil {
return err
}
return err
if flush {
parentSrc := gopath.Dir(src)
parentDst := gopath.Dir(dst)
// Flush parent to clear directory cache and free memory.
if _, err = mfs.FlushPath(req.Context, nd.FilesRoot, parentDst); err != nil {
return fmt.Errorf("cp: cannot flush the destination file's parent folder %s: %s", dst, err)
}

// Avoid re-flushing when moving within the same folder.
if parentSrc != parentDst {
if _, err = mfs.FlushPath(req.Context, nd.FilesRoot, parentSrc); err != nil {
return fmt.Errorf("cp: cannot flush the source's file's parent folder %s: %s", dst, err)
}
}

if _, err = mfs.FlushPath(req.Context, nd.FilesRoot, "/"); err != nil {
return err
}
}

return nil
},
}

Expand Down Expand Up @@ -943,6 +969,17 @@ See '--to-files' in 'ipfs add --help' for more information.
flog.Error("files: error closing file mfs file descriptor", err)
}
}
if flush {
// Flush parent to clear directory cache and free memory.
parent := gopath.Dir(path)
if _, err := mfs.FlushPath(req.Context, nd.FilesRoot, parent); err != nil {
if retErr == nil {
retErr = err
} else {
flog.Error("files: flushing the parent folder", err)
}
}
}
}()

if trunc {
Expand Down Expand Up @@ -1105,11 +1142,20 @@ Change the CID version or hash function of the root node of a given path.
return err
}

err = updatePath(nd.FilesRoot, path, prefix)
if err == nil && flush {
_, err = mfs.FlushPath(req.Context, nd.FilesRoot, path)
if err := updatePath(nd.FilesRoot, path, prefix); err != nil {
return err
}
return err
if flush {
if _, err = mfs.FlushPath(req.Context, nd.FilesRoot, path); err != nil {
return err
}
// Flush parent to clear directory cache and free memory.
parent := gopath.Dir(path)
if _, err = mfs.FlushPath(req.Context, nd.FilesRoot, parent); err != nil {
return err
}
}
return nil
},
}

Expand Down
14 changes: 8 additions & 6 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
return nil
}

if settings.Offline {
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}

if settings.Offline {
cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
cs = node.DefaultIpnsCacheSize
Expand Down Expand Up @@ -244,7 +244,9 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e

if settings.Offline || !settings.FetchBlocks {
subAPI.exchange = offlinexch.Exchange(subAPI.blockstore)
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange)
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange,
bserv.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
)
subAPI.dag = dag.NewDAGService(subAPI.blocks)
}

Expand Down
13 changes: 8 additions & 5 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/kubo/config"
coreiface "github.com/ipfs/kubo/core/coreiface"
options "github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/core/coreunix"
Expand Down Expand Up @@ -85,13 +86,15 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
if settings.OnlyHash {
// setup a /dev/null pipeline to simulate adding the data
dstore := dssync.MutexWrap(ds.NewNullDatastore())
bs := bstore.NewBlockstore(dstore, bstore.WriteThrough(true))
addblockstore = bstore.NewGCBlockstore(bs, nil) // gclocker will never be used
exch = nil // exchange will never be used
pinning = nil // pinner will never be used
bs := bstore.NewBlockstore(dstore, bstore.WriteThrough(true)) // we use NewNullDatastore, so ok to always WriteThrough when OnlyHash
addblockstore = bstore.NewGCBlockstore(bs, nil) // gclocker will never be used
exch = nil // exchange will never be used
pinning = nil // pinner will never be used
}

bserv := blockservice.New(addblockstore, exch) // hash security 001
bserv := blockservice.New(addblockstore, exch,
blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
) // hash security 001
dserv := merkledag.NewDAGService(bserv)

// add a sync call to the DagService
Expand Down
25 changes: 15 additions & 10 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,26 @@ import (
dagpb "github.com/ipld/go-codec-dagpb"
"go.uber.org/fx"

"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
)

// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem)

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})

return bsvc
func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem,
blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
)

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})

return bsvc
}
}

// Pinning creates new pinner which tells GC which blocks should be kept
Expand Down
6 changes: 3 additions & 3 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
cacheOpts := blockstore.DefaultCacheOpts()
cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
cacheOpts.HasTwoQueueCacheSize = int(cfg.Datastore.BlockKeyCacheSize.WithDefault(config.DefaultBlockKeyCacheSize))
if !bcfg.Permanent {
cacheOpts.HasBloomFilterSize = 0
}
Expand All @@ -242,7 +243,7 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
return fx.Options(
fx.Provide(RepoConfig),
fx.Provide(Datastore),
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead)),
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead, cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough))),
finalBstore,
)
}
Expand Down Expand Up @@ -373,7 +374,6 @@ func Offline(cfg *config.Config) fx.Option {

// Core groups basic IPFS services
var Core = fx.Options(
fx.Provide(BlockService),
fx.Provide(Dag),
fx.Provide(FetcherConfig),
fx.Provide(PathResolverConfig),
Expand Down Expand Up @@ -428,7 +428,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
Identity(cfg),
IPNS,
Networked(bcfg, cfg, userResourceOverrides),

fx.Provide(BlockService(cfg)),
Core,
)
}
6 changes: 4 additions & 2 deletions core/node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ func Datastore(repo repo.Repo) datastore.Datastore {
type BaseBlocks blockstore.Blockstore

// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool, writeThrough bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
// hash security
bs = blockstore.NewBlockstore(repo.Datastore())
bs = blockstore.NewBlockstore(repo.Datastore(),
blockstore.WriteThrough(writeThrough),
)
bs = &verifbs.VerifBS{Blockstore: bs}
bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts)
if err != nil {
Expand Down
Loading

0 comments on commit 2919d6d

Please sign in to comment.