Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: trigger aggregator from pieces computed #229

Merged
merged 1 commit into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ EIPFS_INDEXER_SQS_URL = 'https://sqs.us-west-2.amazonaws.com/505595374361/stagin
PROVIDERS = ''
UPLOAD_API_DID = ''
ACCESS_SERVICE_URL = ''
AGGREGATOR_DID = ''
AGGREGATOR_URL = ''

POSTMARK_TOKEN = ''
R2_ACCESS_KEY_ID = ''
R2_CARPARK_BUCKET_NAME = ''
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ The domain to deploy the roundabout API to. e.g `roundabout.web3.storage`. The v

URL of the w3access service.

#### `AGGREGATOR_SERVICE_DID`

DID of the filecoin aggregator service.

#### `AGGREGATOR_SERVICE_URL`

URL of the filecoin aggregator service.

#### `UPLOAD_API_DID`

[DID](https://www.w3.org/TR/did-core/) of the upload-api ucanto server. e.g. `did:web:up.web3.storage`. Optional: if omitted, a `did:key` will be derrived from `PRIVATE_KEY`
Expand Down
11 changes: 1 addition & 10 deletions filecoin/functions/piece-cid-compute.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { S3Client } from '@aws-sdk/client-s3'
import * as Sentry from '@sentry/serverless'

import { computePieceCid } from '../index.js'
import { mustGetEnv } from './utils.js'
import { createPieceTable } from '../tables/piece.js'

Sentry.AWSLambda.init({
Expand Down Expand Up @@ -62,16 +63,6 @@ function getEnv () {
}
}

/**
* @param {string} name
* @returns {string}
*/
function mustGetEnv (name) {
const value = process.env[name]
if (!value) throw new Error(`Missing env var: ${name}`)
return value
}

/**
* Extract an EventRecord from the passed SQS Event
*
Expand Down
87 changes: 87 additions & 0 deletions filecoin/functions/piece-cid-report.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import * as Sentry from '@sentry/serverless'
import { Config } from '@serverless-stack/node/config/index.js'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import { Piece } from '@web3-storage/data-segment'

import { reportPieceCid } from '../index.js'
import { getServiceConnection, getServiceSigner } from '../service.js'
import { mustGetEnv } from './utils.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

/**
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
async function pieceCidReport (event) {
const { aggregatorDid, aggregatorUrl } = getEnv()
const { PRIVATE_KEY: privateKey } = Config

const records = parseDynamoDbEvent(event)
if (records.length > 1) {
throw new Error('Should only receive one ferry to update')
}

// @ts-expect-error can't figure out type of new
const pieceRecord = unmarshall(records[0].new)
const piece = Piece.fromString(pieceRecord.piece).link

const aggregateServiceConnection = getServiceConnection({
did: aggregatorDid,
url: aggregatorUrl
})
const issuer = getServiceSigner({
privateKey
})
const audience = aggregateServiceConnection.id
/** @type {import('@web3-storage/filecoin-client/types').InvocationConfig} */
const invocationConfig = {
issuer,
audience,
with: issuer.did(),
}

const { ok, error } = await reportPieceCid({
piece,
group: issuer.did(),
aggregateServiceConnection,
invocationConfig
})

if (error) {
return {
statusCode: 500,
body: error.message || 'failed to add aggregate'
}
}

return {
statusCode: 200,
body: ok
}
}

export const handler = Sentry.AWSLambda.wrapHandler(pieceCidReport)

/**
* Get Env validating it is set.
*/
function getEnv() {
return {
aggregatorDid: mustGetEnv('AGGREGATOR_DID'),
aggregatorUrl: mustGetEnv('AGGREGATOR_URL'),
}
}

/**
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
function parseDynamoDbEvent (event) {
return event.Records.map(r => ({
new: r.dynamodb?.NewImage,
old: r.dynamodb?.OldImage
}))
}
9 changes: 9 additions & 0 deletions filecoin/functions/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* @param {string} name
* @returns {string}
*/
export function mustGetEnv (name) {
const value = process.env[name]
if (!value) throw new Error(`Missing env var: ${name}`)
return value
}
32 changes: 32 additions & 0 deletions filecoin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Digest from 'multiformats/hashes/digest'
import { Piece } from '@web3-storage/data-segment'
import { CID } from 'multiformats/cid'
import { Aggregator } from '@web3-storage/filecoin-client'

import { GetCarFailed, ComputePieceFailed } from './errors.js'

Expand Down Expand Up @@ -81,3 +82,34 @@ export async function computePieceCid({
error
}
}

/**
* @param {object} props
* @param {import('@web3-storage/data-segment').PieceLink} props.piece
* @param {string} props.group
* @param {import('@web3-storage/filecoin-client/types').InvocationConfig} props.invocationConfig
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} props.aggregateServiceConnection
*/
export async function reportPieceCid ({
piece,
group,
invocationConfig,
aggregateServiceConnection
}) {
// Add piece for aggregation
const aggregateQueue = await Aggregator.aggregateQueue(
invocationConfig,
piece,
group,
{ connection: aggregateServiceConnection }
)

if (aggregateQueue.out.error) {
return {
error: aggregateQueue.out.error
}
}
return {
ok: {},
}
}
5 changes: 5 additions & 0 deletions filecoin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/client-sqs": "^3.226.0",
"@sentry/serverless": "^7.22.0",
"@ucanto/client": "^8.0.1",
"@ucanto/principal": "^8.1.0",
"@ucanto/transport": "^8.0.0",
"@web3-storage/data-segment": "^3.0.1",
"@web3-storage/filecoin-client": "^1.3.0",
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^1.0.0",
"multiformats": "^12.1.1"
},
"devDependencies": {
"@serverless-stack/resources": "*",
"ava": "^4.3.3",
"nanoid": "^4.0.0",
"p-defer": "^4.0.0",
"testcontainers": "^8.13.0"
}
}
36 changes: 36 additions & 0 deletions filecoin/service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import * as ed25519 from '@ucanto/principal/ed25519'
import * as DID from '@ipld/dag-ucan/did'
import { CAR, HTTP } from '@ucanto/transport'
import { connect } from '@ucanto/client'

/**
* Given a config, return a ucanto Signer object representing the service
*
* @param {object} config
* @param {string} config.privateKey - multiformats private key of primary signing key
* @returns {import('@ucanto/principal/ed25519').Signer.Signer}
*/
export function getServiceSigner(config) {
return ed25519.parse(config.privateKey)
}

/**
*
* @param {{ did: string, url: string }} config
* @returns
*/
export function getServiceConnection (config) {
const servicePrincipal = DID.parse(config.did) // 'did:web:filecoin.web3.storage'
const serviceURL = new URL(config.url) // 'https://filecoin.web3.storage'

const serviceConnection = connect({
id: servicePrincipal,
codec: CAR.outbound,
channel: HTTP.open({
url: serviceURL,
method: 'POST',
}),
})

return serviceConnection
}
41 changes: 1 addition & 40 deletions filecoin/test/compute-piece-cid.test.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
import { test } from './helpers/context.js'

import { PutObjectCommand } from '@aws-sdk/client-s3'
import { encode } from 'multiformats/block'
import { identity } from 'multiformats/hashes/identity'
import { sha256 as hasher } from 'multiformats/hashes/sha2'
import * as pb from '@ipld/dag-pb'
import { CarBufferWriter } from '@ipld/car'
import { toString } from 'uint8arrays'
import { Piece } from '@web3-storage/data-segment'

import { createS3, createBucket, createDynamodDb } from './helpers/resources.js'
import { createDynamoTable, getItemsFromTable } from './helpers/tables.js'
import { createCar } from './helpers/car.js'

import { computePieceCid } from '../index.js'
import { pieceTableProps } from '../tables/index.js'
Expand Down Expand Up @@ -77,39 +71,6 @@ test('computes piece CID from a CAR file in the bucket', async t => {
t.is(storedItems?.[0].piece, piece.toString())
})

async function createCar () {
const id = await encode({
value: pb.prepare({ Data: 'a red car on the street!' }),
codec: pb,
hasher: identity,
})

const parent = await encode({
value: pb.prepare({ Links: [id.cid] }),
codec: pb,
hasher,
})
const car = CarBufferWriter.createWriter(Buffer.alloc(1000), {
roots: [parent.cid],
})
car.write(parent)

const body = car.close()
const digest = await hasher.digest(body)
const checksum = toString(digest.digest, 'base64pad')

const key = `${parent.cid.toString()}/${parent.cid.toString()}`
const piece = Piece.fromPayload(body)

return {
body,
checksum,
key,
link: parent.cid,
piece: piece.link
}
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient
* @param {import("@aws-sdk/client-s3").S3Client} s3Client
Expand Down
40 changes: 40 additions & 0 deletions filecoin/test/helpers/car.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { encode } from 'multiformats/block'
import { identity } from 'multiformats/hashes/identity'
import { sha256 as hasher } from 'multiformats/hashes/sha2'
import * as pb from '@ipld/dag-pb'
import { CarBufferWriter } from '@ipld/car'
import { toString } from 'uint8arrays'
import { Piece } from '@web3-storage/data-segment'

export async function createCar () {
const id = await encode({
value: pb.prepare({ Data: 'a red car on the street!' }),
codec: pb,
hasher: identity,
})

const parent = await encode({
value: pb.prepare({ Links: [id.cid] }),
codec: pb,
hasher,
})
const car = CarBufferWriter.createWriter(Buffer.alloc(1000), {
roots: [parent.cid],
})
car.write(parent)

const body = car.close()
const digest = await hasher.digest(body)
const checksum = toString(digest.digest, 'base64pad')

const key = `${parent.cid.toString()}/${parent.cid.toString()}`
const piece = Piece.fromPayload(body)

return {
body,
checksum,
key,
link: parent.cid,
piece: piece.link
}
}
21 changes: 21 additions & 0 deletions filecoin/test/helpers/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import * as Server from '@ucanto/server'

export const OperationErrorName = /** @type {const} */ ('OperationFailed')
export class OperationFailed extends Server.Failure {
/**
* @param {string} message
* @param {import('@web3-storage/data-segment').PieceLink} piece
*/
constructor(message, piece) {
super(message)
this.piece = piece
}

get reason() {
return this.message
}

get name() {
return OperationErrorName
}
}
Loading