-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: compute piece for uploaded cars
- Loading branch information
1 parent
3756f02
commit 48d8bb6
Showing
21 changed files
with
1,029 additions
and
142 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
export class Failure extends Error { | ||
describe() { | ||
return this.toString() | ||
} | ||
|
||
get message() { | ||
return this.describe() | ||
} | ||
|
||
toJSON() { | ||
const { name, message, stack } = this | ||
return { name, message, stack } | ||
} | ||
} | ||
|
||
export const DatabaseOperationErrorName = /** @type {const} */ ( | ||
'DatabaseOperationFailed' | ||
) | ||
export class DatabaseOperationFailed extends Failure { | ||
get reason() { | ||
return this.message | ||
} | ||
|
||
get name() { | ||
return DatabaseOperationErrorName | ||
} | ||
} | ||
|
||
export const GetCarErrorName = /** @type {const} */ ( | ||
'GetCarFailed' | ||
) | ||
export class GetCarFailed extends Failure { | ||
get reason() { | ||
return this.message | ||
} | ||
|
||
get name() { | ||
return GetCarErrorName | ||
} | ||
} | ||
|
||
export const ComputePieceErrorName = /** @type {const} */ ( | ||
'ComputePieceFailed' | ||
) | ||
export class ComputePieceFailed extends Failure { | ||
get reason() { | ||
return this.message | ||
} | ||
|
||
get name() { | ||
return ComputePieceErrorName | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
import { S3Client } from '@aws-sdk/client-s3' | ||
import * as Sentry from '@sentry/serverless' | ||
|
||
import { computePieceCid } from '../index.js' | ||
import { createPieceTable } from '../tables/piece.js' | ||
|
||
Sentry.AWSLambda.init({ | ||
environment: process.env.SST_STAGE, | ||
dsn: process.env.SENTRY_DSN, | ||
tracesSampleRate: 1.0, | ||
}) | ||
|
||
const AWS_REGION = process.env.AWS_REGION || 'us-west-2' | ||
|
||
/** | ||
* Get EventRecord from the SQS Event triggering the handler | ||
* | ||
* @param {import('aws-lambda').SQSEvent} event | ||
*/ | ||
async function computeHandler (event) { | ||
const { | ||
pieceTableName, | ||
} = getEnv() | ||
|
||
const record = parseEvent(event) | ||
if (!record) { | ||
throw new Error('Unexpected sqs record format') | ||
} | ||
|
||
const s3Client = new S3Client({ region: record.bucketRegion }) | ||
const pieceTable = createPieceTable(AWS_REGION, pieceTableName) | ||
|
||
const { error, ok } = await computePieceCid({ | ||
record, | ||
s3Client, | ||
pieceTable, | ||
}) | ||
|
||
if (error) { | ||
return { | ||
statusCode: 500, | ||
body: error.message | ||
} | ||
} | ||
|
||
return { | ||
statusCode: 200, | ||
body: ok | ||
} | ||
} | ||
|
||
export const handler = Sentry.AWSLambda.wrapHandler(computeHandler) | ||
|
||
/** | ||
* Get Env validating it is set. | ||
*/ | ||
function getEnv () { | ||
return { | ||
pieceTableName: mustGetEnv('PIECE_TABLE_NAME'), | ||
} | ||
} | ||
|
||
/** | ||
* @param {string} name | ||
* @returns {string} | ||
*/ | ||
function mustGetEnv (name) { | ||
const value = process.env[name] | ||
if (!value) throw new Error(`Missing env var: ${name}`) | ||
return value | ||
} | ||
|
||
/** | ||
* Extract an EventRecord from the passed SQS Event | ||
* | ||
* @param {import('aws-lambda').SQSEvent} sqsEvent | ||
* @returns {import('../index.js').EventRecord | undefined} | ||
*/ | ||
function parseEvent (sqsEvent) { | ||
if (sqsEvent.Records.length !== 1) { | ||
throw new Error( | ||
`Expected 1 CAR per invocation but received ${sqsEvent.Records.length} CARs` | ||
) | ||
} | ||
|
||
const body = sqsEvent.Records[0].body | ||
if (!body) { | ||
return | ||
} | ||
const { key, bucketName, bucketRegion } = JSON.parse(body) | ||
|
||
return { | ||
bucketRegion, | ||
bucketName, | ||
key, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
import { | ||
GetObjectCommand, | ||
} from '@aws-sdk/client-s3' | ||
// @@ts-expect-error needs final dep | ||
// import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash' | ||
// import * as Digest from 'multiformats/hashes/digest' | ||
import { Piece } from '@web3-storage/data-segment' | ||
import { CID } from 'multiformats/cid' | ||
|
||
// import { GetCarFailed, ComputePieceFailed } from './errors.js' | ||
import { GetCarFailed } from './errors.js' | ||
|
||
/** | ||
* @typedef {object} EventRecord | ||
* @property {string} bucketRegion | ||
* @property {string} bucketName | ||
* @property {string} key | ||
* | ||
* @typedef {import('@aws-sdk/client-s3').S3Client} S3Client | ||
* @typedef {import('@aws-sdk/client-dynamodb').DynamoDBClient} DynamoDBClient | ||
* | ||
* @param {DynamoDBClient} props.dynamoClient | ||
* @param {string} props.pieceTableName | ||
*/ | ||
|
||
/** | ||
* Create CAR side index and write it to Satnav bucket if non existing. | ||
* | ||
* @param {object} props | ||
* @param {EventRecord} props.record | ||
* @param {S3Client} props.s3Client | ||
* @param {import('./types').PieceTable} props.pieceTable | ||
*/ | ||
export async function computePieceCid({ | ||
record, | ||
s3Client, | ||
pieceTable | ||
}) { | ||
const key = record.key | ||
// CIDs in carpark are in format `${carCid}/${carCid}.car` | ||
const cidString = key.split('/')[0] | ||
|
||
const getCmd = new GetObjectCommand({ | ||
Bucket: record.bucketName, | ||
Key: key, | ||
}) | ||
const res = await s3Client.send(getCmd) | ||
if (!res.Body) { | ||
return { | ||
error: new GetCarFailed(`failed to get CAR file with key ${key} in bucket ${record.bucketName}`) | ||
} | ||
} | ||
|
||
// let piece | ||
// try { | ||
// const hasher = Hasher.create() | ||
// const digestBytes = new Uint8Array(36) | ||
|
||
// // @ts-expect-error aws Readable stream types are not good | ||
// for await (const chunk of res.Body.transformToWebStream()) { | ||
// hasher.write(chunk) | ||
// } | ||
// hasher.digestInto(digestBytes, 0, true) | ||
|
||
// const digest = Digest.decode(digestBytes) | ||
// // @ts-expect-error some properties from PieceDigest are not present in MultihashDigest | ||
// piece = Piece.fromDigest(digest) | ||
// } catch (/** @type {any} */ error) { | ||
// return { | ||
// error: new ComputePieceFailed(error.cause) | ||
// } | ||
// } | ||
const piece = Piece.fromPayload(await res.Body.transformToByteArray()) | ||
|
||
// Write to table | ||
const { ok, error } = await pieceTable.insert({ | ||
link: CID.parse(cidString), | ||
piece: piece.link, | ||
}) | ||
|
||
return { | ||
ok, | ||
error | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
{ | ||
"name": "@web3-storage/w3infra-filecoin", | ||
"version": "0.0.0", | ||
"type": "module", | ||
"scripts": { | ||
"test": "ava --verbose --timeout=60s **/*.test.js" | ||
}, | ||
"dependencies": { | ||
"@aws-sdk/client-dynamodb": "^3.211.0", | ||
"@aws-sdk/client-s3": "^3.211.0", | ||
"@aws-sdk/client-sqs": "^3.226.0", | ||
"@sentry/serverless": "^7.22.0", | ||
"@web3-storage/data-segment": "^3.0.1", | ||
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash", | ||
"multiformats": "^12.1.1" | ||
}, | ||
"devDependencies": { | ||
"@serverless-stack/resources": "*", | ||
"ava": "^4.3.3", | ||
"nanoid": "^4.0.0", | ||
"testcontainers": "^8.13.0" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
/** @typedef {import('@serverless-stack/resources').TableProps} TableProps */ | ||
|
||
/** @type TableProps */ | ||
export const pieceTableProps = { | ||
fields: { | ||
piece: 'string', // `baga...1` | ||
link: 'string', // `bagy...1` | ||
aggregate: 'string', // `bagy...9` | ||
inclusion: 'string', // TODO: Inclusion? | ||
insertedAt: 'string', // `2022-12-24T...` | ||
}, | ||
primaryIndex: { partitionKey: 'piece', sortKey: 'insertedAt' }, | ||
globalIndexes: { | ||
link: { partitionKey: 'link', projection: 'all' } | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import { | ||
DynamoDBClient, | ||
PutItemCommand, | ||
} from '@aws-sdk/client-dynamodb' | ||
import { marshall } from '@aws-sdk/util-dynamodb' | ||
|
||
import { DatabaseOperationFailed } from '../errors.js' | ||
|
||
/** | ||
* Abstraction layer to handle operations on Piece Table. | ||
* | ||
* @param {string} region | ||
* @param {string} tableName | ||
* @param {object} [options] | ||
* @param {string} [options.endpoint] | ||
* @returns {import('../types').PieceTable} | ||
*/ | ||
export function createPieceTable (region, tableName, options = {}) { | ||
const dynamoDb = new DynamoDBClient({ | ||
region, | ||
endpoint: options.endpoint | ||
}) | ||
|
||
return usePieceTable(dynamoDb, tableName) | ||
} | ||
|
||
/** | ||
* @param {DynamoDBClient} dynamoDb | ||
* @param {string} tableName | ||
* @returns {import('../types').PieceTable} | ||
*/ | ||
export function usePieceTable(dynamoDb, tableName) { | ||
return { | ||
/** | ||
* Check if the given link CID is bound to the uploader account | ||
* | ||
* @param {import('../types').PieceInsertInput} input | ||
*/ | ||
insert: async (input) => { | ||
const insertedAt = new Date().toISOString() | ||
|
||
const cmd = new PutItemCommand({ | ||
TableName: tableName, | ||
Item: marshall({ | ||
link: input.link.toString(), | ||
piece: input.piece.toString(), | ||
insertedAt | ||
}), | ||
}) | ||
|
||
try { | ||
await dynamoDb.send(cmd) | ||
} catch (/** @type {any} */ error) { | ||
return { | ||
error: new DatabaseOperationFailed(error.cause) | ||
} | ||
} | ||
|
||
return { | ||
ok: {} | ||
} | ||
}, | ||
} | ||
} |
Oops, something went wrong.