From b4ae896b0e251df4efd967f3e045796c72cdaa3c Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 13 Sep 2023 11:16:44 +0200 Subject: [PATCH] feat: compute piece for uploaded cars --- .gitignore | 1 + filecoin/errors.js | 53 ++++++ filecoin/functions/piece-cid-compute.js | 97 ++++++++++ filecoin/index.js | 85 +++++++++ filecoin/package.json | 23 +++ filecoin/tables/index.js | 16 ++ filecoin/tables/piece.js | 64 +++++++ filecoin/test/compute-piece-cid.test.js | 127 +++++++++++++ filecoin/test/helpers/context.js | 16 ++ filecoin/test/helpers/resources.js | 126 +++++++++++++ filecoin/test/helpers/tables.js | 43 +++++ filecoin/types.ts | 81 ++++++++ package-lock.json | 240 +++++++++++------------- package.json | 3 +- stacks/filecoin-stack.js | 88 +++++++++ stacks/index.js | 2 + stacks/upload-db-stack.js | 10 + test/helpers/context.js | 2 + test/helpers/table.js | 40 +++- test/integration.test.js | 35 +++- tsconfig.json | 2 +- upload-api/package.json | 2 +- 22 files changed, 1015 insertions(+), 141 deletions(-) create mode 100644 filecoin/errors.js create mode 100644 filecoin/functions/piece-cid-compute.js create mode 100644 filecoin/index.js create mode 100644 filecoin/package.json create mode 100644 filecoin/tables/index.js create mode 100644 filecoin/tables/piece.js create mode 100644 filecoin/test/compute-piece-cid.test.js create mode 100644 filecoin/test/helpers/context.js create mode 100644 filecoin/test/helpers/resources.js create mode 100644 filecoin/test/helpers/tables.js create mode 100644 filecoin/types.ts create mode 100644 stacks/filecoin-stack.js diff --git a/.gitignore b/.gitignore index a0d08aa9..00f59fa4 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ dist # misc .DS_Store .vscode +cdk.context.json # local env files .env*.local diff --git a/filecoin/errors.js b/filecoin/errors.js new file mode 100644 index 00000000..4a7bd2b1 --- /dev/null +++ b/filecoin/errors.js @@ -0,0 +1,53 @@ +export class Failure extends Error { + describe() { + return this.toString() + } + + get message() { + return this.describe() + } + + toJSON() { + const { name, message, stack } = this + return { name, message, stack } + } +} + +export const DatabaseOperationErrorName = /** @type {const} */ ( + 'DatabaseOperationFailed' +) +export class DatabaseOperationFailed extends Failure { + get reason() { + return this.message + } + + get name() { + return DatabaseOperationErrorName + } +} + +export const GetCarErrorName = /** @type {const} */ ( + 'GetCarFailed' +) +export class GetCarFailed extends Failure { + get reason() { + return this.message + } + + get name() { + return GetCarErrorName + } +} + +export const ComputePieceErrorName = /** @type {const} */ ( + 'ComputePieceFailed' +) +export class ComputePieceFailed extends Failure { + get reason() { + return this.message + } + + get name() { + return ComputePieceErrorName + } +} diff --git a/filecoin/functions/piece-cid-compute.js b/filecoin/functions/piece-cid-compute.js new file mode 100644 index 00000000..2639e270 --- /dev/null +++ b/filecoin/functions/piece-cid-compute.js @@ -0,0 +1,97 @@ +import { S3Client } from '@aws-sdk/client-s3' +import * as Sentry from '@sentry/serverless' + +import { computePieceCid } from '../index.js' +import { createPieceTable } from '../tables/piece.js' + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0, +}) + +const AWS_REGION = process.env.AWS_REGION || 'us-west-2' + +/** + * Get EventRecord from the SQS Event triggering the handler + * + * @param {import('aws-lambda').SQSEvent} event + */ +async function computeHandler (event) { + const { + pieceTableName, + } = getEnv() + + const record = parseEvent(event) + if (!record) { + throw new Error('Unexpected sqs record format') + } + + const s3Client = new S3Client({ region: record.bucketRegion }) + const pieceTable = createPieceTable(AWS_REGION, pieceTableName) + + const { error, ok } = await computePieceCid({ + record, + s3Client, + pieceTable, + }) + + if (error) { + return { + statusCode: 500, + body: error.message + } + } + + return { + statusCode: 200, + body: ok + } +} + +export const handler = Sentry.AWSLambda.wrapHandler(computeHandler) + +/** + * Get Env validating it is set. + */ +function getEnv () { + return { + pieceTableName: mustGetEnv('PIECE_TABLE_NAME'), + } +} + +/** + * @param {string} name + * @returns {string} + */ +function mustGetEnv (name) { + const value = process.env[name] + if (!value) throw new Error(`Missing env var: ${name}`) + return value +} + +/** + * Extract an EventRecord from the passed SQS Event + * + * @param {import('aws-lambda').SQSEvent} sqsEvent + * @returns {import('../index.js').EventRecord | undefined} + */ +function parseEvent (sqsEvent) { + if (sqsEvent.Records.length !== 1) { + throw new Error( + `Expected 1 CAR per invocation but received ${sqsEvent.Records.length} CARs` + ) + } + + const body = sqsEvent.Records[0].body + if (!body) { + return + } + const { key, bucketName, bucketRegion } = JSON.parse(body) + + return { + bucketRegion, + bucketName, + key, + } +} diff --git a/filecoin/index.js b/filecoin/index.js new file mode 100644 index 00000000..71527e0b --- /dev/null +++ b/filecoin/index.js @@ -0,0 +1,85 @@ +import { + GetObjectCommand, +} from '@aws-sdk/client-s3' +// @@ts-expect-error needs final dep +// import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash' +// import * as Digest from 'multiformats/hashes/digest' +import { Piece } from '@web3-storage/data-segment' +import { CID } from 'multiformats/cid' + +// import { GetCarFailed, ComputePieceFailed } from './errors.js' +import { GetCarFailed } from './errors.js' + +/** + * @typedef {object} EventRecord + * @property {string} bucketRegion + * @property {string} bucketName + * @property {string} key + * + * @typedef {import('@aws-sdk/client-s3').S3Client} S3Client + * @typedef {import('@aws-sdk/client-dynamodb').DynamoDBClient} DynamoDBClient + * + * @param {DynamoDBClient} props.dynamoClient + * @param {string} props.pieceTableName + */ + +/** + * Create CAR side index and write it to Satnav bucket if non existing. + * + * @param {object} props + * @param {EventRecord} props.record + * @param {S3Client} props.s3Client + * @param {import('./types').PieceTable} props.pieceTable + */ +export async function computePieceCid({ + record, + s3Client, + pieceTable +}) { + const key = record.key + // CIDs in carpark are in format `${carCid}/${carCid}.car` + const cidString = key.split('/')[0] + + const getCmd = new GetObjectCommand({ + Bucket: record.bucketName, + Key: key, + }) + const res = await s3Client.send(getCmd) + if (!res.Body) { + return { + error: new GetCarFailed(`failed to get CAR file with key ${key} in bucket ${record.bucketName}`) + } + } + + // let piece + // try { + // const hasher = Hasher.create() + // const digestBytes = new Uint8Array(36) + + // // @ts-expect-error aws Readable stream types are not good + // for await (const chunk of res.Body.transformToWebStream()) { + // hasher.write(chunk) + // } + // hasher.digestInto(digestBytes, 0, true) + + // const digest = Digest.decode(digestBytes) + // // @ts-expect-error some properties from PieceDigest are not present in MultihashDigest + // piece = Piece.fromDigest(digest) + // } catch (/** @type {any} */ error) { + // return { + // error: new ComputePieceFailed(error.cause) + // } + // } + const piece = Piece.fromPayload(await res.Body.transformToByteArray()) + + // Write to table + const { ok, error } = await pieceTable.insert({ + link: CID.parse(cidString), + piece: piece.link, + }) + + return { + ok, + error + } +} diff --git a/filecoin/package.json b/filecoin/package.json new file mode 100644 index 00000000..c02725b2 --- /dev/null +++ b/filecoin/package.json @@ -0,0 +1,23 @@ +{ + "name": "@web3-storage/w3infra-filecoin", + "version": "0.0.0", + "type": "module", + "scripts": { + "test": "ava --verbose --timeout=60s **/*.test.js" + }, + "dependencies": { + "@aws-sdk/client-dynamodb": "^3.211.0", + "@aws-sdk/client-s3": "^3.211.0", + "@aws-sdk/client-sqs": "^3.226.0", + "@sentry/serverless": "^7.22.0", + "@web3-storage/data-segment": "^3.0.1", + "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash", + "multiformats": "^12.1.1" + }, + "devDependencies": { + "@serverless-stack/resources": "*", + "ava": "^4.3.3", + "nanoid": "^4.0.0", + "testcontainers": "^8.13.0" + } +} diff --git a/filecoin/tables/index.js b/filecoin/tables/index.js new file mode 100644 index 00000000..bdc2d1be --- /dev/null +++ b/filecoin/tables/index.js @@ -0,0 +1,16 @@ +/** @typedef {import('@serverless-stack/resources').TableProps} TableProps */ + +/** @type TableProps */ +export const pieceTableProps = { + fields: { + piece: 'string', // `baga...1` + link: 'string', // `bagy...1` + aggregate: 'string', // `bagy...9` + inclusion: 'string', // TODO: Inclusion? + insertedAt: 'string', // `2022-12-24T...` + }, + primaryIndex: { partitionKey: 'piece', sortKey: 'insertedAt' }, + globalIndexes: { + link: { partitionKey: 'link', projection: 'all' } + } +} diff --git a/filecoin/tables/piece.js b/filecoin/tables/piece.js new file mode 100644 index 00000000..8f268d41 --- /dev/null +++ b/filecoin/tables/piece.js @@ -0,0 +1,64 @@ +import { + DynamoDBClient, + PutItemCommand, +} from '@aws-sdk/client-dynamodb' +import { marshall } from '@aws-sdk/util-dynamodb' + +import { DatabaseOperationFailed } from '../errors.js' + +/** + * Abstraction layer to handle operations on Piece Table. + * + * @param {string} region + * @param {string} tableName + * @param {object} [options] + * @param {string} [options.endpoint] + * @returns {import('../types').PieceTable} + */ +export function createPieceTable (region, tableName, options = {}) { + const dynamoDb = new DynamoDBClient({ + region, + endpoint: options.endpoint + }) + + return usePieceTable(dynamoDb, tableName) +} + +/** + * @param {DynamoDBClient} dynamoDb + * @param {string} tableName + * @returns {import('../types').PieceTable} + */ +export function usePieceTable(dynamoDb, tableName) { + return { + /** + * Bind a link CID to Piece CID. + * + * @param {import('../types').PieceInsertInput} input + */ + insert: async (input) => { + const insertedAt = new Date().toISOString() + + const cmd = new PutItemCommand({ + TableName: tableName, + Item: marshall({ + link: input.link.toString(), + piece: input.piece.toString(), + insertedAt + }), + }) + + try { + await dynamoDb.send(cmd) + } catch (/** @type {any} */ error) { + return { + error: new DatabaseOperationFailed(error.cause) + } + } + + return { + ok: {} + } + }, + } +} diff --git a/filecoin/test/compute-piece-cid.test.js b/filecoin/test/compute-piece-cid.test.js new file mode 100644 index 00000000..b9f21964 --- /dev/null +++ b/filecoin/test/compute-piece-cid.test.js @@ -0,0 +1,127 @@ +import { test } from './helpers/context.js' + +import { PutObjectCommand } from '@aws-sdk/client-s3' +import { encode } from 'multiformats/block' +import { identity } from 'multiformats/hashes/identity' +import { sha256 as hasher } from 'multiformats/hashes/sha2' +import * as pb from '@ipld/dag-pb' +import { CarBufferWriter } from '@ipld/car' +import { toString } from 'uint8arrays' +import { Piece } from '@web3-storage/data-segment' + +import { createS3, createBucket, createDynamodDb } from './helpers/resources.js' +import { createDynamoTable, getItemsFromTable } from './helpers/tables.js' + +import { computePieceCid } from '../index.js' +import { pieceTableProps } from '../tables/index.js' +import { createPieceTable } from '../tables/piece.js' + +const AWS_REGION = 'us-west-2' + +test.before(async t => { + // S3 + const { client } = await createS3({ port: 9000 }) + // DynamoDB + const { + client: dynamoClient, + endpoint: dbEndpoint + } = await createDynamodDb({ port: 8000 }) + + Object.assign(t.context, { + s3Client: client, + dbEndpoint, + dynamoClient + }) +}) + +test('computes piece cid from a CAR file in the bucket', async t => { + const { tableName, bucketName } = await prepareResources(t.context.dynamoClient, t.context.s3Client) + const { body, checksum, key, piece, link } = await createCar() + const pieceTable = createPieceTable(AWS_REGION, tableName, { + endpoint: t.context.dbEndpoint + }) + + await t.context.s3Client.send( + new PutObjectCommand({ + Bucket: bucketName, + Key: key, + Body: body, + ChecksumSHA256: checksum, + }) + ) + const record = { + bucketName, + bucketRegion: 'us-west-2', + key, + } + + const { ok, error } = await computePieceCid({ + record, + s3Client: t.context.s3Client, + pieceTable + }) + t.truthy(ok) + t.falsy(error) + + const storedItems = await getItemsFromTable(t.context.dynamoClient, tableName, { + link: { + ComparisonOperator: 'EQ', + AttributeValueList: [{ S: link.toString() }] + } + }, { + indexName: 'link' + }) + + t.truthy(storedItems) + t.is(storedItems?.length, 1) + t.is(storedItems?.[0].piece, piece.toString()) +}) + +async function createCar () { + const id = await encode({ + value: pb.prepare({ Data: 'a red car on the street!' }), + codec: pb, + hasher: identity, + }) + + const parent = await encode({ + value: pb.prepare({ Links: [id.cid] }), + codec: pb, + hasher, + }) + const car = CarBufferWriter.createWriter(Buffer.alloc(1000), { + roots: [parent.cid], + }) + car.write(parent) + + const body = car.close() + const digest = await hasher.digest(body) + const checksum = toString(digest.digest, 'base64pad') + + const key = `${parent.cid.toString()}/${parent.cid.toString()}` + const piece = Piece.fromPayload(body) + + return { + body, + checksum, + key, + link: parent.cid, + piece: piece.link + } +} + +/** + * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient + * @param {import("@aws-sdk/client-s3").S3Client} s3Client + */ +async function prepareResources (dynamoClient, s3Client) { + const [ tableName, bucketName ] = await Promise.all([ + createDynamoTable(dynamoClient, pieceTableProps), + createBucket(s3Client) + ]) + + return { + tableName, + bucketName + } +} diff --git a/filecoin/test/helpers/context.js b/filecoin/test/helpers/context.js new file mode 100644 index 00000000..f09a883c --- /dev/null +++ b/filecoin/test/helpers/context.js @@ -0,0 +1,16 @@ +import anyTest from 'ava' + +/** + * @typedef {object} S3Context + * @property {import('@aws-sdk/client-s3').S3Client} s3Client + * @property {import('@aws-sdk/client-s3').ServiceInputTypes} s3Opts + * + * @typedef {object} DynamoContext + * @property {string} dbEndpoint + * @property {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient + * + * @typedef {import("ava").TestFn>} TestAnyFn + */ + +// eslint-disable-next-line unicorn/prefer-export-from +export const test = /** @type {TestAnyFn} */ (anyTest) diff --git a/filecoin/test/helpers/resources.js b/filecoin/test/helpers/resources.js new file mode 100644 index 00000000..67165a3e --- /dev/null +++ b/filecoin/test/helpers/resources.js @@ -0,0 +1,126 @@ +import { customAlphabet } from 'nanoid' +import { GenericContainer as Container } from 'testcontainers' +import { S3Client, CreateBucketCommand } from '@aws-sdk/client-s3' +import { DynamoDBClient } from '@aws-sdk/client-dynamodb' + +/** + * @param {object} [opts] + * @param {number} [opts.port] + * @param {string} [opts.region] + */ +export async function createDynamodDb(opts = {}) { + const port = opts.port || 8000 + const region = opts.region || 'us-west-2' + const dbContainer = await new Container('amazon/dynamodb-local:latest') + .withExposedPorts(port) + .start() + + const endpoint = `http://${dbContainer.getHost()}:${dbContainer.getMappedPort(8000)}` + return { + client: new DynamoDBClient({ + region, + endpoint + }), + endpoint + } +} + +/** + * Convert SST TableProps to DynamoDB `CreateTableCommandInput` config + * + * @typedef {import('@aws-sdk/client-dynamodb').CreateTableCommandInput} CreateTableCommandInput + * @typedef {import('@serverless-stack/resources').TableProps} TableProps + * + * @param {TableProps} props + * @returns {Pick} + */ +export function dynamoDBTableConfig ({ fields, primaryIndex, globalIndexes = {} }) { + if (!primaryIndex || !fields) throw new Error('Expected primaryIndex and fields on TableProps') + const globalIndexValues = Object.values(globalIndexes) + const attributes = [ + ...Object.values(primaryIndex), + ...globalIndexValues.map((value) => value.partitionKey), + ...globalIndexValues.map((value) => value.sortKey) + ] + + const AttributeDefinitions = Object.entries(fields) + .filter(([k]) => attributes.includes(k)) // 'The number of attributes in key schema must match the number of attributes defined in attribute definitions' + .map(([k, v]) => ({ + AttributeName: k, + AttributeType: v[0].toUpperCase() + })) + const KeySchema = toKeySchema(primaryIndex) + const GlobalSecondaryIndexes = Object.entries(globalIndexes) + .map(([IndexName, val]) => ({ + IndexName, + KeySchema: toKeySchema(val), + Projection: { ProjectionType: 'ALL' }, + ProvisionedThroughput: { + ReadCapacityUnits: 5, + WriteCapacityUnits: 5 + } + })) + + return { + AttributeDefinitions, + KeySchema, + GlobalSecondaryIndexes: GlobalSecondaryIndexes.length ? GlobalSecondaryIndexes : undefined + } +} + +/** + * @param {object} index + * @param {string} index.partitionKey + * @param {string} [index.sortKey] + */ +function toKeySchema ({partitionKey, sortKey}) { + const KeySchema = [ + { AttributeName: partitionKey, KeyType: 'HASH' } + ] + if (sortKey) { + KeySchema.push( + { AttributeName: sortKey, KeyType: 'RANGE' } + ) + } + return KeySchema +} + +/** + * @param {object} [opts] + * @param {number} [opts.port] + * @param {string} [opts.region] + */ + export async function createS3(opts = {}) { + const region = opts.region || 'us-west-2' + const port = opts.port || 9000 + + const minio = await new Container('quay.io/minio/minio') + .withCmd(['server', '/data']) + .withExposedPorts(port) + .start() + + const clientOpts = { + endpoint: `http://${minio.getHost()}:${minio.getMappedPort(port)}`, + forcePathStyle: true, + region, + credentials: { + accessKeyId: 'minioadmin', + secretAccessKey: 'minioadmin', + }, + } + + return { + client: new S3Client(clientOpts), + clientOpts + } +} + +/** + * @param {S3Client} s3 + */ +export async function createBucket(s3) { + const id = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 10) + const Bucket = id() + await s3.send(new CreateBucketCommand({ Bucket })) + return Bucket +} diff --git a/filecoin/test/helpers/tables.js b/filecoin/test/helpers/tables.js new file mode 100644 index 00000000..246e83c2 --- /dev/null +++ b/filecoin/test/helpers/tables.js @@ -0,0 +1,43 @@ +import { customAlphabet } from 'nanoid' + +import { CreateTableCommand, QueryCommand } from '@aws-sdk/client-dynamodb' +import { unmarshall } from '@aws-sdk/util-dynamodb' + +import { dynamoDBTableConfig } from './resources.js' + +/** + * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo + * @param {import("@serverless-stack/resources").TableProps} tableProps + */ +export async function createDynamoTable(dynamo, tableProps) { + const id = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 10) + const tableName = id() + + await dynamo.send(new CreateTableCommand({ + TableName: tableName, + ...dynamoDBTableConfig(tableProps), + ProvisionedThroughput: { + ReadCapacityUnits: 1, + WriteCapacityUnits: 1 + } + })) + + return tableName +} + +/** + * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo + * @param {string} tableName + * @param {Record} keyConditions + * @param {object} [options] + * @param {string} [options.indexName] + */ +export async function getItemsFromTable(dynamo, tableName, keyConditions, options = {}) { + const params = { + TableName: tableName, + KeyConditions: keyConditions, + IndexName: options.indexName, + } + const response = await dynamo.send(new QueryCommand(params)) + return response?.Items && response?.Items.map(i => unmarshall(i)) +} diff --git a/filecoin/types.ts b/filecoin/types.ts new file mode 100644 index 00000000..4b86adb7 --- /dev/null +++ b/filecoin/types.ts @@ -0,0 +1,81 @@ +import { UnknownLink } from 'multiformats' +import { PieceLink } from '@web3-storage/data-segment' + +export interface PieceTable { + insert: (item: PieceInsertInput) => Promise> +} + +export interface PieceInsertInput { + link: UnknownLink + piece: PieceLink +} + +export type PieceInsertError = DatabaseOperationError | GetCarError | ComputePieceError + +export interface DatabaseOperationError extends Error { + name: 'DatabaseOperationFailed' +} +export interface GetCarError extends Error { + name: 'GetCarFailed' +} +export interface ComputePieceError extends Error { + name: 'ComputePieceFailed' +} + +export class Failure extends Error { + describe() { + return this.toString() + } + get message() { + return this.describe() + } + toJSON() { + const { name, message, stack } = this + return { name, message, stack } + } +} + +export type Result = Variant<{ + ok: T + error: X +}> + +/** + * Utility type for defining a [keyed union] type as in IPLD Schema. In practice + * this just works around typescript limitation that requires discriminant field + * on all variants. + * + * ```ts + * type Result = + * | { ok: T } + * | { error: X } + * + * const demo = (result: Result) => { + * if (result.ok) { + * // ^^^^^^^^^ Property 'ok' does not exist on type '{ error: Error; }` + * } + * } + * ``` + * + * Using `Variant` type we can define same union type that works as expected: + * + * ```ts + * type Result = Variant<{ + * ok: T + * error: X + * }> + * + * const demo = (result: Result) => { + * if (result.ok) { + * result.ok.toUpperCase() + * } + * } + * ``` + * + * [keyed union]:https://ipld.io/docs/schemas/features/representation-strategies/#union-keyed-representation + */ +export type Variant> = { + [Key in keyof U]: { [K in Exclude]?: never } & { + [K in Key]: U[Key] + } +}[keyof U] diff --git a/package-lock.json b/package-lock.json index 8035444c..9a0f0dd7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "satnav", "ucan-invocation", "upload-api", + "filecoin", "tools" ], "dependencies": { @@ -35,7 +36,7 @@ "@ucanto/validator": "^8.0.0", "@web-std/blob": "^3.0.4", "@web-std/fetch": "^4.1.0", - "@web3-storage/capabilities": "^6.0.1", + "@web3-storage/capabilities": "^9.3.0", "@web3-storage/w3up-client": "^6.0.0", "ava": "^4.3.3", "dotenv": "^16.0.3", @@ -96,6 +97,34 @@ "testcontainers": "^8.13.0" } }, + "filecoin": { + "name": "@web3-storage/w3infra-filecoin", + "version": "0.0.0", + "dependencies": { + "@aws-sdk/client-dynamodb": "^3.211.0", + "@aws-sdk/client-s3": "^3.211.0", + "@aws-sdk/client-sqs": "^3.226.0", + "@sentry/serverless": "^7.22.0", + "@web3-storage/data-segment": "^3.0.1", + "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash", + "multiformats": "^12.1.1" + }, + "devDependencies": { + "@serverless-stack/resources": "*", + "ava": "^4.3.3", + "nanoid": "^4.0.0", + "testcontainers": "^8.13.0" + } + }, + "filecoin/node_modules/multiformats": { + "version": "12.1.1", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-12.1.1.tgz", + "integrity": "sha512-GBSToTmri2vJYs8wqcZQ8kB21dCaeTOzHTIAlr8J06C1eL6UbzqURXFZ5Fl0EYm9GAFz1IlYY8SxGOs9G9NJRg==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, "node_modules/@aashutoshrathi/word-wrap": { "version": "1.2.6", "resolved": "https://registry.npmjs.org/@aashutoshrathi/word-wrap/-/word-wrap-1.2.6.tgz", @@ -4774,6 +4803,18 @@ "w3access": "src/cli/index.js" } }, + "node_modules/@web3-storage/access/node_modules/@web3-storage/capabilities": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-6.0.1.tgz", + "integrity": "sha512-rHJMR/0LOO4azVYjZyWml5qv0e5LzvaQ8K2Riy0xJcrxBsADxdhlBdfoG1hhT/R1qLQ0PzTlNUs2iDgZZHDdIA==", + "dependencies": { + "@ucanto/core": "^8.0.0", + "@ucanto/interface": "^8.0.0", + "@ucanto/principal": "^8.0.0", + "@ucanto/transport": "^8.0.0", + "@ucanto/validator": "^8.0.0" + } + }, "node_modules/@web3-storage/access/node_modules/kysely": { "version": "0.23.5", "resolved": "https://registry.npmjs.org/kysely/-/kysely-0.23.5.tgz", @@ -4783,21 +4824,22 @@ } }, "node_modules/@web3-storage/capabilities": { - "version": "6.0.1", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-6.0.1.tgz", - "integrity": "sha512-rHJMR/0LOO4azVYjZyWml5qv0e5LzvaQ8K2Riy0xJcrxBsADxdhlBdfoG1hhT/R1qLQ0PzTlNUs2iDgZZHDdIA==", + "version": "9.3.0", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-9.3.0.tgz", + "integrity": "sha512-z78CrQNyUfOr0+ZgV+lRs31DcrfRjFVLNOPqV7a1JVL42/j++ZpFVWRWdV1bvV7HFjx5WyT7GjXQULPpMF5GQQ==", "dependencies": { "@ucanto/core": "^8.0.0", "@ucanto/interface": "^8.0.0", "@ucanto/principal": "^8.0.0", "@ucanto/transport": "^8.0.0", - "@ucanto/validator": "^8.0.0" + "@ucanto/validator": "^8.0.0", + "@web3-storage/data-segment": "^3.0.1" } }, "node_modules/@web3-storage/data-segment": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-2.2.0.tgz", - "integrity": "sha512-kzCDxgIb6j9rUZTe/BzS4bnG+SO32s5WWJoUAoB34wPNeBPAX/35SyUBYJrh7TLwjoDbGTL89BgGJi0xV25vcA==", + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.0.1.tgz", + "integrity": "sha512-+e0KeqofejZ/1JLCdNmlEMCCSNf0PeJFXA/2Vw5+vWj4Nfko8sVLIGva86L8hXzm4dZGhWYU5m6JhX2U2Wn5Dg==", "dependencies": { "multiformats": "^11.0.2", "sync-multihash-sha2": "^1.0.0" @@ -4880,28 +4922,6 @@ "w3access": "src/cli/index.js" } }, - "node_modules/@web3-storage/upload-api/node_modules/@web3-storage/capabilities": { - "version": "9.2.1", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-9.2.1.tgz", - "integrity": "sha512-NBXm9320grYtKYV7g8nI7zzzS6GfOPp7EmDeWxDwOZ3nHvXDuiR77e3Sf7rRY265rMyJCtRUJ7/qRk7unQSfmA==", - "dependencies": { - "@ucanto/core": "^8.0.0", - "@ucanto/interface": "^8.0.0", - "@ucanto/principal": "^8.0.0", - "@ucanto/transport": "^8.0.0", - "@ucanto/validator": "^8.0.0", - "@web3-storage/data-segment": "^3.0.1" - } - }, - "node_modules/@web3-storage/upload-api/node_modules/@web3-storage/data-segment": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.0.1.tgz", - "integrity": "sha512-+e0KeqofejZ/1JLCdNmlEMCCSNf0PeJFXA/2Vw5+vWj4Nfko8sVLIGva86L8hXzm4dZGhWYU5m6JhX2U2Wn5Dg==", - "dependencies": { - "multiformats": "^11.0.2", - "sync-multihash-sha2": "^1.0.0" - } - }, "node_modules/@web3-storage/upload-api/node_modules/kysely": { "version": "0.23.5", "resolved": "https://registry.npmjs.org/kysely/-/kysely-0.23.5.tgz", @@ -4946,6 +4966,10 @@ "resolved": "carpark", "link": true }, + "node_modules/@web3-storage/w3infra-filecoin": { + "resolved": "filecoin", + "link": true + }, "node_modules/@web3-storage/w3infra-replicator": { "resolved": "replicator", "link": true @@ -9237,6 +9261,10 @@ "node": ">= 0.6" } }, + "node_modules/fr32-sha2-256-trunc254-padded-binary-tree-multihash": { + "version": "0.0.1", + "resolved": "git+ssh://git@github.com/web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash.git#98fe584ae5ae2c3790ec72c530aff5b36e2b1b92" + }, "node_modules/fresh": { "version": "0.5.2", "resolved": "https://registry.npmjs.org/fresh/-/fresh-0.5.2.tgz", @@ -15723,19 +15751,6 @@ "testcontainers": "^8.13.0" } }, - "ucan-invocation/node_modules/@web3-storage/capabilities": { - "version": "9.0.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-9.0.0.tgz", - "integrity": "sha512-2ql9MKdZxYaW2QjLJSzDo8r5brGpOWWeQa7YFkLVfX+5nyN/UIoJ+GYwSW0JZ7mNzQuIow8qPIsSNONGowT2ig==", - "dependencies": { - "@ucanto/core": "^8.0.0", - "@ucanto/interface": "^8.0.0", - "@ucanto/principal": "^8.0.0", - "@ucanto/transport": "^8.0.0", - "@ucanto/validator": "^8.0.0", - "@web3-storage/data-segment": "^2.2.0" - } - }, "ucan-invocation/node_modules/nanoid": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-4.0.0.tgz", @@ -15769,7 +15784,7 @@ "@ucanto/validator": "^8.0.0", "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^14.0.0", - "@web3-storage/capabilities": "^9.0.0", + "@web3-storage/capabilities": "^9.3.0", "@web3-storage/upload-api": "^5.3.1", "@web3-storage/w3infra-ucan-invocation": "*", "multiformats": "^11.0.1", @@ -15793,28 +15808,6 @@ "engines": { "node": ">=16.15" } - }, - "upload-api/node_modules/@web3-storage/capabilities": { - "version": "9.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-9.2.0.tgz", - "integrity": "sha512-Qm2OpH/zIRECrlrsq8ZMh9W8qtZecn+CmB6fbkDYL9CEWdm28s+UC+Dxt9yIPUCZZMC+eTrxwS7jUwalB7T+VA==", - "dependencies": { - "@ucanto/core": "^8.0.0", - "@ucanto/interface": "^8.0.0", - "@ucanto/principal": "^8.0.0", - "@ucanto/transport": "^8.0.0", - "@ucanto/validator": "^8.0.0", - "@web3-storage/data-segment": "^3.0.1" - } - }, - "upload-api/node_modules/@web3-storage/data-segment": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.0.1.tgz", - "integrity": "sha512-+e0KeqofejZ/1JLCdNmlEMCCSNf0PeJFXA/2Vw5+vWj4Nfko8sVLIGva86L8hXzm4dZGhWYU5m6JhX2U2Wn5Dg==", - "dependencies": { - "multiformats": "^11.0.2", - "sync-multihash-sha2": "^1.0.0" - } } }, "dependencies": { @@ -19738,6 +19731,18 @@ "zod": "^3.20.2" }, "dependencies": { + "@web3-storage/capabilities": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-6.0.1.tgz", + "integrity": "sha512-rHJMR/0LOO4azVYjZyWml5qv0e5LzvaQ8K2Riy0xJcrxBsADxdhlBdfoG1hhT/R1qLQ0PzTlNUs2iDgZZHDdIA==", + "requires": { + "@ucanto/core": "^8.0.0", + "@ucanto/interface": "^8.0.0", + "@ucanto/principal": "^8.0.0", + "@ucanto/transport": "^8.0.0", + "@ucanto/validator": "^8.0.0" + } + }, "kysely": { "version": "0.23.5", "resolved": "https://registry.npmjs.org/kysely/-/kysely-0.23.5.tgz", @@ -19746,21 +19751,22 @@ } }, "@web3-storage/capabilities": { - "version": "6.0.1", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-6.0.1.tgz", - "integrity": "sha512-rHJMR/0LOO4azVYjZyWml5qv0e5LzvaQ8K2Riy0xJcrxBsADxdhlBdfoG1hhT/R1qLQ0PzTlNUs2iDgZZHDdIA==", + "version": "9.3.0", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-9.3.0.tgz", + "integrity": "sha512-z78CrQNyUfOr0+ZgV+lRs31DcrfRjFVLNOPqV7a1JVL42/j++ZpFVWRWdV1bvV7HFjx5WyT7GjXQULPpMF5GQQ==", "requires": { "@ucanto/core": "^8.0.0", "@ucanto/interface": "^8.0.0", "@ucanto/principal": "^8.0.0", "@ucanto/transport": "^8.0.0", - "@ucanto/validator": "^8.0.0" + "@ucanto/validator": "^8.0.0", + "@web3-storage/data-segment": "^3.0.1" } }, "@web3-storage/data-segment": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-2.2.0.tgz", - "integrity": "sha512-kzCDxgIb6j9rUZTe/BzS4bnG+SO32s5WWJoUAoB34wPNeBPAX/35SyUBYJrh7TLwjoDbGTL89BgGJi0xV25vcA==", + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.0.1.tgz", + "integrity": "sha512-+e0KeqofejZ/1JLCdNmlEMCCSNf0PeJFXA/2Vw5+vWj4Nfko8sVLIGva86L8hXzm4dZGhWYU5m6JhX2U2Wn5Dg==", "requires": { "multiformats": "^11.0.2", "sync-multihash-sha2": "^1.0.0" @@ -19834,28 +19840,6 @@ "zod": "^3.20.2" } }, - "@web3-storage/capabilities": { - "version": "9.2.1", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-9.2.1.tgz", - "integrity": "sha512-NBXm9320grYtKYV7g8nI7zzzS6GfOPp7EmDeWxDwOZ3nHvXDuiR77e3Sf7rRY265rMyJCtRUJ7/qRk7unQSfmA==", - "requires": { - "@ucanto/core": "^8.0.0", - "@ucanto/interface": "^8.0.0", - "@ucanto/principal": "^8.0.0", - "@ucanto/transport": "^8.0.0", - "@ucanto/validator": "^8.0.0", - "@web3-storage/data-segment": "^3.0.1" - } - }, - "@web3-storage/data-segment": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.0.1.tgz", - "integrity": "sha512-+e0KeqofejZ/1JLCdNmlEMCCSNf0PeJFXA/2Vw5+vWj4Nfko8sVLIGva86L8hXzm4dZGhWYU5m6JhX2U2Wn5Dg==", - "requires": { - "multiformats": "^11.0.2", - "sync-multihash-sha2": "^1.0.0" - } - }, "kysely": { "version": "0.23.5", "resolved": "https://registry.npmjs.org/kysely/-/kysely-0.23.5.tgz", @@ -19910,6 +19894,29 @@ "testcontainers": "^8.13.0" } }, + "@web3-storage/w3infra-filecoin": { + "version": "file:filecoin", + "requires": { + "@aws-sdk/client-dynamodb": "^3.211.0", + "@aws-sdk/client-s3": "^3.211.0", + "@aws-sdk/client-sqs": "^3.226.0", + "@sentry/serverless": "^7.22.0", + "@serverless-stack/resources": "*", + "@web3-storage/data-segment": "^3.0.1", + "ava": "^4.3.3", + "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash", + "multiformats": "^12.1.1", + "nanoid": "^4.0.0", + "testcontainers": "^8.13.0" + }, + "dependencies": { + "multiformats": { + "version": "12.1.1", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-12.1.1.tgz", + "integrity": "sha512-GBSToTmri2vJYs8wqcZQ8kB21dCaeTOzHTIAlr8J06C1eL6UbzqURXFZ5Fl0EYm9GAFz1IlYY8SxGOs9G9NJRg==" + } + } + }, "@web3-storage/w3infra-replicator": { "version": "file:replicator", "requires": { @@ -20005,19 +20012,6 @@ "uint8arrays": "^4.0.2" }, "dependencies": { - "@web3-storage/capabilities": { - "version": "9.0.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-9.0.0.tgz", - "integrity": "sha512-2ql9MKdZxYaW2QjLJSzDo8r5brGpOWWeQa7YFkLVfX+5nyN/UIoJ+GYwSW0JZ7mNzQuIow8qPIsSNONGowT2ig==", - "requires": { - "@ucanto/core": "^8.0.0", - "@ucanto/interface": "^8.0.0", - "@ucanto/principal": "^8.0.0", - "@ucanto/transport": "^8.0.0", - "@ucanto/validator": "^8.0.0", - "@web3-storage/data-segment": "^2.2.0" - } - }, "nanoid": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-4.0.0.tgz", @@ -20050,7 +20044,7 @@ "@web-std/blob": "3.0.4", "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^14.0.0", - "@web3-storage/capabilities": "^9.0.0", + "@web3-storage/capabilities": "^9.3.0", "@web3-storage/sigv4": "^1.0.2", "@web3-storage/upload-api": "^5.3.1", "@web3-storage/w3infra-ucan-invocation": "*", @@ -20063,30 +20057,6 @@ "prom-client": "^14.2.0", "testcontainers": "^8.13.0", "uint8arrays": "^4.0.2" - }, - "dependencies": { - "@web3-storage/capabilities": { - "version": "9.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-9.2.0.tgz", - "integrity": "sha512-Qm2OpH/zIRECrlrsq8ZMh9W8qtZecn+CmB6fbkDYL9CEWdm28s+UC+Dxt9yIPUCZZMC+eTrxwS7jUwalB7T+VA==", - "requires": { - "@ucanto/core": "^8.0.0", - "@ucanto/interface": "^8.0.0", - "@ucanto/principal": "^8.0.0", - "@ucanto/transport": "^8.0.0", - "@ucanto/validator": "^8.0.0", - "@web3-storage/data-segment": "^3.0.1" - } - }, - "@web3-storage/data-segment": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.0.1.tgz", - "integrity": "sha512-+e0KeqofejZ/1JLCdNmlEMCCSNf0PeJFXA/2Vw5+vWj4Nfko8sVLIGva86L8hXzm4dZGhWYU5m6JhX2U2Wn5Dg==", - "requires": { - "multiformats": "^11.0.2", - "sync-multihash-sha2": "^1.0.0" - } - } } }, "@web3-storage/w3up-client": { @@ -23193,6 +23163,10 @@ "integrity": "sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==", "dev": true }, + "fr32-sha2-256-trunc254-padded-binary-tree-multihash": { + "version": "git+ssh://git@github.com/web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash.git#98fe584ae5ae2c3790ec72c530aff5b36e2b1b92", + "from": "fr32-sha2-256-trunc254-padded-binary-tree-multihash@github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash" + }, "fresh": { "version": "0.5.2", "resolved": "https://registry.npmjs.org/fresh/-/fresh-0.5.2.tgz", diff --git a/package.json b/package.json index 3a02c986..d23643b6 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "@ucanto/validator": "^8.0.0", "@web-std/blob": "^3.0.4", "@web-std/fetch": "^4.1.0", - "@web3-storage/capabilities": "^6.0.1", + "@web3-storage/capabilities": "^9.3.0", "@web3-storage/w3up-client": "^6.0.0", "ava": "^4.3.3", "dotenv": "^16.0.3", @@ -99,6 +99,7 @@ "satnav", "ucan-invocation", "upload-api", + "filecoin", "tools" ], "prettier": { diff --git a/stacks/filecoin-stack.js b/stacks/filecoin-stack.js new file mode 100644 index 00000000..f0b9598e --- /dev/null +++ b/stacks/filecoin-stack.js @@ -0,0 +1,88 @@ +import { + Function, + Queue, + use, +} from '@serverless-stack/resources' +import { Duration, aws_events as awsEvents } from 'aws-cdk-lib' + +import { BusStack } from './bus-stack.js' +import { CarparkStack } from './carpark-stack.js' +import { UploadDbStack } from './upload-db-stack.js' +import { setupSentry } from './config.js' +import { CARPARK_EVENT_BRIDGE_SOURCE_EVENT } from '../carpark/event-bus/source.js' + +/** + * @param {import('@serverless-stack/resources').StackContext} properties + */ +export function FilecoinStack({ stack, app }) { + stack.setDefaultFunctionProps({ + srcPath: 'filecoin' + }) + + // Setup app monitoring with Sentry + setupSentry(app, stack) + + // Get carpark reference + const { carparkBucket } = use(CarparkStack) + // Get eventBus reference + const { eventBus } = use(BusStack) + // Get store table reference + const { pieceTable } = use(UploadDbStack) + + // piece-cid compute + const pieceCidComputeHandler = new Function( + stack, + 'piece-cid-compute-handler', + { + environment : { + PIECE_TABLE_NAME: pieceTable.tableName, + }, + permissions: [pieceTable, carparkBucket], + handler: 'functions/piece-cid-compute.handler', + timeout: 15 * 60, + }, + ) + + const pieceCidComputeQueue = new Queue(stack, 'piece-cid-compute-queue', { + consumer: { + function: pieceCidComputeHandler, + cdk: { + eventSource: { + batchSize: 1, + }, + } + }, + cdk: { + queue: { + // Needs to be set as less or equal than consumer function + visibilityTimeout: Duration.seconds(15 * 60), + }, + }, + }) + + /** @type {import('@serverless-stack/resources').EventBusQueueTargetProps} */ + const targetPieceCidComputeQueue = { + type: 'queue', + queue: pieceCidComputeQueue, + cdk: { + target: { + message: awsEvents.RuleTargetInput.fromObject({ + bucketRegion: awsEvents.EventField.fromPath('$.detail.region'), + bucketName: awsEvents.EventField.fromPath('$.detail.bucketName'), + key: awsEvents.EventField.fromPath('$.detail.key') + }), + }, + } + } + + eventBus.addRules(stack, { + newCarToComputePiece: { + pattern: { + source: [CARPARK_EVENT_BRIDGE_SOURCE_EVENT], + }, + targets: { + targetPieceCidComputeQueue + } + } + }) +} diff --git a/stacks/index.js b/stacks/index.js index cf93554d..dffb9b91 100644 --- a/stacks/index.js +++ b/stacks/index.js @@ -5,6 +5,7 @@ import { UploadDbStack } from './upload-db-stack.js' import { UcanInvocationStack } from './ucan-invocation-stack.js' import { BusStack } from './bus-stack.js' import { CarparkStack } from './carpark-stack.js' +import { FilecoinStack } from './filecoin-stack.js' import { SatnavStack } from './satnav-stack.js' import { ReplicatorStack } from './replicator-stack.js' import { RoundaboutStack } from './roundabout-stack.js' @@ -32,6 +33,7 @@ export default function (app) { app.stack(UploadDbStack) app.stack(CarparkStack) app.stack(UcanInvocationStack) + app.stack(FilecoinStack) app.stack(SatnavStack) app.stack(UploadApiStack) app.stack(ReplicatorStack) diff --git a/stacks/upload-db-stack.js b/stacks/upload-db-stack.js index 5ce1c708..5d1d7285 100644 --- a/stacks/upload-db-stack.js +++ b/stacks/upload-db-stack.js @@ -12,6 +12,9 @@ import { adminMetricsTableProps, spaceMetricsTableProps } from '../ucan-invocation/tables/index.js' +import { + pieceTableProps +} from '../filecoin/tables/index.js' import { setupSentry, getBucketConfig } from './config.js' /** @@ -34,6 +37,12 @@ export function UploadDbStack({ stack, app }) { */ const uploadTable = new Table(stack, 'upload', uploadTableProps) + /** + * This table takes a stored CAR and makes an entry in the piece table + * Used by the filecoin/* service capabilities. // TODO + */ + const pieceTable = new Table(stack, 'piece', pieceTableProps) + /** * This table tracks the relationship between customers and providers. */ @@ -77,6 +86,7 @@ export function UploadDbStack({ stack, app }) { return { storeTable, uploadTable, + pieceTable, consumerTable, subscriptionTable, rateLimitTable, diff --git a/test/helpers/context.js b/test/helpers/context.js index 01fc65bf..8a4248a5 100644 --- a/test/helpers/context.js +++ b/test/helpers/context.js @@ -16,6 +16,8 @@ dotenv.config({ * @property {string} apiEndpoint * @property {Dynamo} metricsDynamo * @property {Dynamo} spaceMetricsDynamo + * @property {Dynamo} pieceDynamo + * @property {Dynamo} rateLimitsDynamo * * @typedef {object} RoundaboutContext * @property {string} roundaboutEndpoint diff --git a/test/helpers/table.js b/test/helpers/table.js index a703e6a9..9be6ad74 100644 --- a/test/helpers/table.js +++ b/test/helpers/table.js @@ -1,5 +1,6 @@ -import { GetItemCommand, ScanCommand } from '@aws-sdk/client-dynamodb' +import { GetItemCommand, QueryCommand, ScanCommand } from '@aws-sdk/client-dynamodb' import { marshall, unmarshall } from '@aws-sdk/util-dynamodb' +import pRetry from 'p-retry' /** * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo @@ -17,7 +18,42 @@ export async function getTableItem (dynamo, tableName, key) { } /** - * @param {import("@aws-sdk/client-dynamodb").DynamoDBClient} dynamo + * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo + * @param {string} tableName + * @param {Record} keyConditions + * @param {object} [options] + * @param {string} [options.indexName] + */ +export async function pollQueryTable (dynamo, tableName, keyConditions, options = {}) { + const cmd = new QueryCommand({ + TableName: tableName, + KeyConditions: keyConditions, + IndexName: options.indexName, + }) + + let response + try { + response = await pRetry(async () => { + const r = await dynamo.send(cmd) + console.log('r', r) + if (r.$metadata.httpStatusCode === 404 || !r.Count) { + throw new Error('not found in dynamoDB yet') + } + return r + }, { + maxTimeout: 2000, + minTimeout: 1000, + retries: 100 + }) + } catch {} + + console.log('items', response?.Items) + + return response?.Items && response?.Items.map(i => unmarshall(i)) +} + +/** + * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo * @param {string} tableName * @param {object} [options] * @param {number} [options.limit] diff --git a/test/integration.test.js b/test/integration.test.js index 5d64e628..8e53db48 100644 --- a/test/integration.test.js +++ b/test/integration.test.js @@ -20,13 +20,14 @@ import { } from './helpers/deployment.js' import { createNewClient, setupNewClient } from './helpers/up-client.js' import { randomFile } from './helpers/random.js' -import { getTableItem, getAllTableRows } from './helpers/table.js' +import { getTableItem, getAllTableRows, pollQueryTable } from './helpers/table.js' test.before(t => { t.context = { apiEndpoint: getApiEndpoint(), metricsDynamo: getDynamoDb('admin-metrics'), spaceMetricsDynamo: getDynamoDb('space-metrics'), + pieceDynamo: getDynamoDb('piece'), rateLimitsDynamo: getDynamoDb('rate-limit') } }) @@ -236,6 +237,12 @@ test('w3infra integration flow', async t => { interval: 100, }) + // Check filecoin piece computed after leaving queue + const pieces = await getPieces(t, shards[0].toString()) + t.assert(pieces) + t.is(pieces?.length, 1) + t.truthy(pieces?.[0].piece) + // Check metrics were updated if (beforeStoreAddSizeTotal && spaceBeforeUploadAddMetrics && spaceBeforeStoreAddSizeMetrics && beforeUploadAddTotal) { await pWaitFor(async () => { @@ -275,7 +282,7 @@ test('w3infra integration flow', async t => { TableName: t.context.rateLimitsDynamo.tableName, Item: marshall({ id: Math.random().toString(10), - subject: client.currentSpace().did(), + subject: client.currentSpace()?.did(), rate: 0 }) })) @@ -283,7 +290,7 @@ test('w3infra integration flow', async t => { await client.uploadFile(await randomFile(100)) }) - t.is(uploadError.message, 'failed store/add invocation') + t.is(uploadError?.message, 'failed store/add invocation') }) /** @@ -312,3 +319,25 @@ async function getSpaceMetrics (t, spaceDid, name) { return item } + +/** + * @param {import("ava").ExecutionContext} t + * @param {string} link + */ +async function getPieces (t, link) { + const item = await pollQueryTable( + t.context.pieceDynamo.client, + t.context.pieceDynamo.tableName, + { + link: { + ComparisonOperator: 'EQ', + AttributeValueList: [{ S: link }] + } + }, + { + indexName: 'link' + } + ) + + return item +} diff --git a/tsconfig.json b/tsconfig.json index 35920afd..72c6fcd0 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -35,5 +35,5 @@ "stripInternal": true, "resolveJsonModule": true, }, - "include": [ "upload-api", "stacks", "carpark", "replicator", "satnav", "ucan-invocation", "tools" ] + "include": [ "upload-api", "stacks", "carpark", "replicator", "satnav", "ucan-invocation", "filecoin", "tools" ] } \ No newline at end of file diff --git a/upload-api/package.json b/upload-api/package.json index 0c4c701c..9a073af9 100644 --- a/upload-api/package.json +++ b/upload-api/package.json @@ -23,7 +23,7 @@ "@ucanto/validator": "^8.0.0", "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^14.0.0", - "@web3-storage/capabilities": "^9.0.0", + "@web3-storage/capabilities": "^9.3.0", "@web3-storage/upload-api": "^5.3.1", "@web3-storage/w3infra-ucan-invocation": "*", "multiformats": "^11.0.1",