Skip to content

Commit

Permalink
chore: use libp2p logger
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Dec 1, 2023
1 parent bc80a0a commit 147e974
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 40 deletions.
2 changes: 1 addition & 1 deletion packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
"@libp2p/identify": "^1.0.1",
"@libp2p/interface": "^1.0.1",
"@libp2p/kad-dht": "^11.0.2",
"@libp2p/logger": "^4.0.1",
"@libp2p/mdns": "^10.0.2",
"@libp2p/mplex": "^10.0.2",
"@libp2p/ping": "^1.0.1",
Expand Down Expand Up @@ -120,6 +119,7 @@
"uint8arrays": "^4.0.3"
},
"devDependencies": {
"@libp2p/logger": "^4.0.1",
"@multiformats/mafmt": "^12.1.5",
"@multiformats/multiaddr": "^12.1.7",
"@types/sinon": "^17.0.2",
Expand Down
20 changes: 10 additions & 10 deletions packages/helia/src/block-brokers/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { logger } from '@libp2p/logger'
import { TrustlessGateway } from './trustless-gateway.js'
import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-broker')

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
Expand All @@ -16,8 +14,10 @@ export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]
private readonly log: Logger

constructor (init: TrustlessGatewayBlockBrokerInit = {}) {
constructor (components: TrustlessGatewayComponents, init: TrustlessGatewayBlockBrokerInit = {}) {
this.log = components.logger.forComponent('helia:trustless-gateway-block-broker')
this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS)
.map((gatewayOrUrl) => {
return new TrustlessGateway(gatewayOrUrl)
Expand All @@ -31,30 +31,30 @@ ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
const aggregateErrors: Error[] = []

for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
this.log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
this.log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
this.log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
log.error('failed to get block for %c from %s', cid, gateway.url, err)
this.log.error('failed to get block for %c from %s', cid, gateway.url, err)
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)
this.log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)

Check warning on line 57 in packages/helia/src/block-brokers/trustless-gateway/broker.ts

View check run for this annotation

Codecov / codecov/patch

packages/helia/src/block-brokers/trustless-gateway/broker.ts#L57

Added line #L57 was not covered by tests
break
}
}
Expand Down
9 changes: 7 additions & 2 deletions packages/helia/src/block-brokers/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { TrustlessGatewayBlockBroker } from './broker.js'
import type { BlockRetriever } from '@helia/interface/src/blocks.js'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressEvent } from 'progress-events'

export const DEFAULT_TRUSTLESS_GATEWAYS = [
Expand All @@ -23,6 +24,10 @@ export interface TrustlessGatewayBlockBrokerInit {
gateways?: Array<string | URL>
}

export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): () => BlockRetriever {
return () => new TrustlessGatewayBlockBroker(init)
export interface TrustlessGatewayComponents {
logger: ComponentLogger
}

export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockRetriever {
return (components) => new TrustlessGatewayBlockBroker(components, init)
}
22 changes: 12 additions & 10 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { start, stop } from '@libp2p/interface'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
import { CustomProgressEvent } from 'progress-events'
import { bitswap, trustlessGateway } from './block-brokers/index.js'
Expand All @@ -11,13 +10,11 @@ import { NetworkedStorage } from './utils/networked-storage.js'
import type { HeliaInit } from '.'
import type { GCOptions, Helia } from '@helia/interface'
import type { Pins } from '@helia/interface/pins'
import type { Libp2p } from '@libp2p/interface'
import type { ComponentLogger, Libp2p, Logger } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'

const log = logger('helia')

interface HeliaImplInit<T extends Libp2p = Libp2p> extends HeliaInit<T> {
libp2p: T
blockstore: Blockstore
Expand All @@ -29,25 +26,30 @@ export class HeliaImpl implements Helia {
public blockstore: BlockStorage
public datastore: Datastore
public pins: Pins
public logger: ComponentLogger
private readonly log: Logger

constructor (init: HeliaImplInit) {
this.logger = init.libp2p.logger
this.log = this.logger.forComponent('helia')
const hashers = defaultHashers(init.hashers)

const components = {
blockstore: init.blockstore,
datastore: init.datastore,
libp2p: init.libp2p,
hashers
hashers,
logger: init.libp2p.logger
}

const blockBrokers = init.blockBrokers?.map((fn) => {
return fn(components)
}) ?? [
bitswap()(components),
trustlessGateway()()
trustlessGateway()(components)
]

const networkedStorage = new NetworkedStorage(init.blockstore, {
const networkedStorage = new NetworkedStorage(components, {
blockBrokers,
hashers
})
Expand Down Expand Up @@ -79,7 +81,7 @@ export class HeliaImpl implements Helia {
const helia = this
const blockstore = this.blockstore.unwrap()

log('gc start')
this.log('gc start')

await drain(blockstore.deleteMany((async function * (): AsyncGenerator<CID> {
for await (const { cid } of blockstore.getAll()) {
Expand All @@ -92,7 +94,7 @@ export class HeliaImpl implements Helia {

options.onProgress?.(new CustomProgressEvent<CID>('helia:gc:deleted', cid))
} catch (err) {
log.error('Error during gc', err)
helia.log.error('Error during gc', err)
options.onProgress?.(new CustomProgressEvent<Error>('helia:gc:error', err))
}
}
Expand All @@ -101,6 +103,6 @@ export class HeliaImpl implements Helia {
releaseLock()
}

log('gc finished')
this.log('gc finished')
}
}
13 changes: 11 additions & 2 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { createLibp2p } from './utils/libp2p.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { ComponentLogger, Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { Libp2pOptions } from 'libp2p'
Expand Down Expand Up @@ -113,6 +113,12 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
* webworker), pass true here to hold the gc lock in this process.
*/
holdGcLock?: boolean

/**
* An optional logging component to pass to libp2p. If not specified the
* default implementation from libp2p will be used.
*/
logger?: ComponentLogger
}

/**
Expand All @@ -129,7 +135,10 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
if (isLibp2p(init.libp2p)) {
libp2p = init.libp2p
} else {
libp2p = await createLibp2p(datastore, init.libp2p)
libp2p = await createLibp2p(datastore, {
logger: init.logger,
...init.libp2p
})
}

const helia = new HeliaImpl({
Expand Down
30 changes: 20 additions & 10 deletions packages/helia/src/utils/networked-storage.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import { CodeError, start, stop } from '@libp2p/interface'
import { logger } from '@libp2p/logger'
import { anySignal } from 'any-signal'
import filter from 'it-filter'
import forEach from 'it-foreach'
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer, BlockRetrievalOptions } from '@helia/interface/blocks'
import type { AbortOptions, Startable } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, Logger, LoggerOptions, Startable } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { AwaitIterable } from 'interface-store'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'

const log = logger('helia:networked-storage')

export interface NetworkedStorageStorageInit {
blockBrokers?: BlockBroker[]
hashers?: MultihashHasher[]
Expand All @@ -31,6 +28,11 @@ function isBlockAnnouncer (b: any): b is BlockAnnouncer {
return typeof b.announce === 'function'
}

export interface NetworkedStorageComponents {
blockstore: Blockstore
logger: ComponentLogger
}

/**
* Networked storage wraps a regular blockstore - when getting blocks if the
* blocks are not present Bitswap will be used to fetch them from network peers.
Expand All @@ -41,12 +43,14 @@ export class NetworkedStorage implements Blocks, Startable {
private readonly blockAnnouncers: BlockAnnouncer[]
private readonly hashers: MultihashHasher[]
private started: boolean
private readonly log: Logger

/**
* Create a new BlockStorage
*/
constructor (blockstore: Blockstore, init: NetworkedStorageStorageInit) {
this.child = blockstore
constructor (components: NetworkedStorageComponents, init: NetworkedStorageStorageInit) {
this.log = components.logger.forComponent('helia:networked-storage')
this.child = components.blockstore
this.blockRetrievers = (init.blockBrokers ?? []).filter(isBlockRetriever)
this.blockAnnouncers = (init.blockBrokers ?? []).filter(isBlockAnnouncer)
this.hashers = init.hashers ?? []
Expand Down Expand Up @@ -123,7 +127,10 @@ export class NetworkedStorage implements Blocks, Startable {
if (options.offline !== true && !(await this.child.has(cid))) {
// we do not have the block locally, get it from a block provider
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:providers:get', cid))
const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers, options)
const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers, {
...options,
log: this.log
})
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:blockstore:put', cid))
await this.child.put(cid, block, options)

Expand Down Expand Up @@ -151,7 +158,10 @@ export class NetworkedStorage implements Blocks, Startable {
if (options.offline !== true && !(await this.child.has(cid))) {
// we do not have the block locally, get it from a block provider
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get-many:providers:get', cid))
const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers, options)
const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers, {
...options,
log: this.log
})
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get-many:blockstore:put', cid))
await this.child.put(cid, block, options)

Expand Down Expand Up @@ -217,7 +227,7 @@ export const getCidBlockVerifierFunction = (cid: CID, hashers: MultihashHasher[]
* Race block providers cancelling any pending requests once the block has been
* found.
*/
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise<Uint8Array> {
async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions & LoggerOptions): Promise<Uint8Array> {
const validateFn = getCidBlockVerifierFunction(cid, hashers)

const controller = new AbortController()
Expand Down Expand Up @@ -245,7 +255,7 @@ async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashe

return block
} catch (err) {
log.error('could not retrieve verified block for %c', cid, err)
options.log.error('could not retrieve verified block for %c', cid, err)
throw err
}
})
Expand Down
11 changes: 9 additions & 2 deletions packages/helia/test/block-brokers/block-broker.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* eslint-env mocha */

import { defaultLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import delay from 'delay'
Expand Down Expand Up @@ -31,7 +32,10 @@ describe('block-broker', () => {
blockstore = new MemoryBlockstore()
bitswapBlockBroker = stubInterface()
gatewayBlockBroker = stubInterface()
storage = new NetworkedStorage(blockstore, {
storage = new NetworkedStorage({
blockstore,
logger: defaultLogger()
}, {
blockBrokers: [
bitswapBlockBroker,
gatewayBlockBroker
Expand Down Expand Up @@ -112,7 +116,10 @@ describe('block-broker', () => {
it('handles incorrect bytes from a gateway', async () => {
const { cid } = blocks[0]
const block = blocks[1].block
storage = new NetworkedStorage(blockstore, {
storage = new NetworkedStorage({
blockstore,
logger: defaultLogger()
}, {
blockBrokers: [
gatewayBlockBroker
],
Expand Down
6 changes: 5 additions & 1 deletion packages/helia/test/block-brokers/trustless-gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* eslint-env mocha */

import { defaultLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import * as raw from 'multiformats/codecs/raw'
import Sinon from 'sinon'
Expand Down Expand Up @@ -39,7 +41,9 @@ describe('trustless-gateway-block-broker', () => {
stubConstructor(TrustlessGateway, 'http://localhost:8082'),
stubConstructor(TrustlessGateway, 'http://localhost:8083')
]
gatewayBlockBroker = new TrustlessGatewayBlockBroker()
gatewayBlockBroker = new TrustlessGatewayBlockBroker({
logger: defaultLogger()
})
// must copy the array because the broker calls .sort which mutates in-place
;(gatewayBlockBroker as any).gateways = [...gateways]
})
Expand Down
6 changes: 5 additions & 1 deletion packages/helia/test/utils/networked-storage.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* eslint-env mocha */

import { defaultLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import delay from 'delay'
Expand Down Expand Up @@ -30,7 +31,10 @@ describe('networked-storage', () => {

blockstore = new MemoryBlockstore()
bitswap = stubInterface()
storage = new NetworkedStorage(blockstore, {
storage = new NetworkedStorage({
blockstore,
logger: defaultLogger()
}, {
blockBrokers: [
bitswap
],
Expand Down
7 changes: 6 additions & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import type { Blocks } from './blocks.js'
import type { Pins } from './pins.js'
import type { Libp2p, AbortOptions } from '@libp2p/interface'
import type { Libp2p, AbortOptions, ComponentLogger } from '@libp2p/interface'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'
Expand Down Expand Up @@ -47,6 +47,11 @@ export interface Helia<T = Libp2p> {
*/
pins: Pins

/**
* A logging component that can be reused by consumers
*/
logger: ComponentLogger

/**
* Starts the Helia node
*/
Expand Down

0 comments on commit 147e974

Please sign in to comment.