Skip to content

Commit

Permalink
feat: expose configured dag walkers and hashers on helia interface
Browse files Browse the repository at this point in the history
To allow modules like `@helia/car` to re-use configured dag walkers
instead of having to bundle their own, expose configured dag walkers
and hashers on the helia interface and have them consume those.

Fixes #375
  • Loading branch information
achingbrain committed Jan 17, 2024
1 parent 76220cd commit bc0c3ae
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 277 deletions.
14 changes: 10 additions & 4 deletions packages/block-brokers/src/bitswap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { ProgressOptions } from 'progress-events'
interface BitswapComponents {
libp2p: Libp2p
blockstore: Blockstore
hashers: MultihashHasher[]
hashers: Record<string, MultihashHasher>
}

export interface BitswapInit extends BitswapOptions {
Expand All @@ -29,9 +29,15 @@ ProgressOptions<BitswapWantBlockProgressEvents>
this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})
let hasher: MultihashHasher | undefined

if (typeof codecOrName === 'string') {
hasher = Object.values(hashers).find(hasher => {
return hasher.name === codecOrName
})
} else {
hasher = hashers[codecOrName]
}

if (hasher != null) {
return hasher
Expand Down
3 changes: 1 addition & 2 deletions packages/car/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@
"dependencies": {
"@helia/interface": "^3.0.1",
"@ipld/car": "^5.1.1",
"@ipld/dag-pb": "^4.0.6",
"@libp2p/interfaces": "^3.3.1",
"cborg": "^4.0.3",
"it-drain": "^3.0.5",
"it-map": "^3.0.3",
"multiformats": "^13.0.0",
Expand All @@ -153,6 +151,7 @@
},
"devDependencies": {
"@helia/unixfs": "^2.0.1",
"@ipld/dag-pb": "^4.0.8",
"aegir": "^42.1.0",
"blockstore-core": "^4.3.10",
"interface-blockstore": "^5.2.9",
Expand Down
42 changes: 6 additions & 36 deletions packages/car/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import drain from 'it-drain'
import map from 'it-map'
import defer from 'p-defer'
import PQueue from 'p-queue'
import { cborWalker, dagPbWalker, jsonWalker, rawWalker } from './utils/dag-walkers.js'
import type { DAGWalker } from '@helia/interface'
import type { Blocks, GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
import type { CarReader, CarWriter } from '@ipld/car'
import type { AbortOptions } from '@libp2p/interfaces'
Expand All @@ -71,23 +71,7 @@ import type { ProgressOptions } from 'progress-events'

export interface CarComponents {
blockstore: Blocks
}

export interface CarInit {
/**
* In order to export CIDs that correspond to a DAG, it's necessary to know
* how to traverse that DAG. DAGWalkers take a block and yield any CIDs
* encoded within that block.
*/
dagWalkers?: DAGWalker[]
}

/**
* DAGWalkers take a block and yield CIDs encoded in that block
*/
export interface DAGWalker {
codec: number
walk(block: Uint8Array): AsyncGenerator<CID, void, undefined>
dagWalkers: Record<number, DAGWalker>
}

/**
Expand Down Expand Up @@ -146,27 +130,13 @@ export interface Car {
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void>
}

const DEFAULT_DAG_WALKERS = [
rawWalker,
dagPbWalker,
cborWalker,
jsonWalker
]

const DAG_WALK_QUEUE_CONCURRENCY = 1

class DefaultCar implements Car {
private readonly components: CarComponents
private dagWalkers: Record<number, DAGWalker>

constructor (components: CarComponents, init: CarInit) {
constructor (components: CarComponents, init: any) {
this.components = components

this.dagWalkers = {}

;[...DEFAULT_DAG_WALKERS, ...(init.dagWalkers ?? [])].forEach(dagWalker => {
this.dagWalkers[dagWalker.codec] = dagWalker
})
}

async import (reader: Pick<CarReader, 'blocks'>, options?: AbortOptions & ProgressOptions<PutManyBlocksProgressEvents>): Promise<void> {
Expand All @@ -188,7 +158,7 @@ class DefaultCar implements Car {
deferred.resolve()
})
queue.on('error', (err) => {
deferred.resolve(err)
deferred.reject(err)

Check warning on line 161 in packages/car/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/car/src/index.ts#L161

Added line #L161 was not covered by tests
})

for (const root of roots) {
Expand All @@ -212,7 +182,7 @@ class DefaultCar implements Car {
* and update the pin count for them
*/
async #walkDag (cid: CID, queue: PQueue, withBlock: (cid: CID, block: Uint8Array) => Promise<void>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
const dagWalker = this.dagWalkers[cid.code]
const dagWalker = this.components.dagWalkers[cid.code]

if (dagWalker == null) {
throw new Error(`No dag walker found for cid codec ${cid.code}`)
Expand All @@ -234,6 +204,6 @@ class DefaultCar implements Car {
/**
* Create a {@link Car} instance for use with {@link https://github.com/ipfs/helia Helia}
*/
export function car (helia: { blockstore: Blocks }, init: CarInit = {}): Car {
export function car (helia: { blockstore: Blocks, dagWalkers: Record<number, DAGWalker> }, init: any = {}): Car {
return new DefaultCar(helia, init)
}
176 changes: 0 additions & 176 deletions packages/car/src/utils/dag-walkers.ts

This file was deleted.

37 changes: 32 additions & 5 deletions packages/car/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,58 @@

import { type UnixFS, unixfs } from '@helia/unixfs'
import { CarReader } from '@ipld/car'
import * as dagPb from '@ipld/dag-pb'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import * as raw from 'multiformats/codecs/raw'
import { car, type Car } from '../src/index.js'
import { largeFile, smallFile } from './fixtures/files.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { DAGWalker } from '@helia/interface'
import type { Blockstore } from 'interface-blockstore'

/**
* Dag walker for dag-pb CIDs
*/
const dagPbWalker: DAGWalker = {
codec: dagPb.code,
* walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
}
}

const rawWalker: DAGWalker = {
codec: raw.code,
* walk () {
// no embedded CIDs in a raw block
}
}

describe('import', () => {
let blockstore: Blockstore
let c: Car
let u: UnixFS
let dagWalkers: Record<number, DAGWalker>

beforeEach(async () => {
blockstore = new MemoryBlockstore()
dagWalkers = {
[dagPb.code]: dagPbWalker,
[raw.code]: rawWalker
}

c = car({ blockstore })
c = car({ blockstore, dagWalkers })
u = unixfs({ blockstore })
})

it('exports and imports a car file', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const cid = await otherUnixFS.addBytes(smallFile)

const writer = memoryCarWriter(cid)
Expand All @@ -46,7 +73,7 @@ describe('import', () => {

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const cid1 = await otherUnixFS.addBytes(fileData1)
const cid2 = await otherUnixFS.addBytes(fileData2)
const cid3 = await otherUnixFS.addBytes(fileData3)
Expand All @@ -66,7 +93,7 @@ describe('import', () => {
it('exports and imports a multiple block car file', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const cid = await otherUnixFS.addBytes(largeFile, {
chunker: fixedSize({
chunkSize: 1024
Expand All @@ -90,7 +117,7 @@ describe('import', () => {

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const cid1 = await otherUnixFS.addBytes(fileData1, {
chunker: fixedSize({
chunkSize: 2
Expand Down
Loading

0 comments on commit bc0c3ae

Please sign in to comment.