From ff2598782be39609f9b8298a9b849938b910bbdf Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Thu, 17 Oct 2024 10:30:36 -0300 Subject: [PATCH 01/15] feat: egress traffic tracker --- billing/data/egress.js | 40 ++++++ billing/functions/egress-traffic-handler.js | 131 ++++++++++++++++++ billing/lib/api.ts | 21 ++- billing/queues/egress-traffic.js | 9 ++ .../functions/ucan-invocation-router.js | 6 +- upload-api/stores/usage.js | 28 +++- 6 files changed, 231 insertions(+), 4 deletions(-) create mode 100644 billing/data/egress.js create mode 100644 billing/functions/egress-traffic-handler.js create mode 100644 billing/queues/egress-traffic.js diff --git a/billing/data/egress.js b/billing/data/egress.js new file mode 100644 index 00000000..b7593ac7 --- /dev/null +++ b/billing/data/egress.js @@ -0,0 +1,40 @@ +import { DecodeFailure, EncodeFailure, Schema } from './lib.js' + +export const egressSchema = Schema.struct({ + customer: Schema.did({ method: 'mailto' }), + resource: Schema.link(), + bytes: Schema.bigint(), + servedAt: Schema.date(), +}) + +/** @type {import('../lib/api').Validator} */ +export const validate = input => egressSchema.read(input) + +/** @type {import('../lib/api').Encoder} */ +export const encode = input => { + try { + return { ok: JSON.stringify(input) } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Decoder} */ +export const decode = input => { + try { + return { + ok: { + customer: Schema.did({ method: 'mailto' }).from(input.customer), + resource: Schema.link().from(input.resourceId), + bytes: Schema.bigint().from(input.bytes), + servedAt: Schema.date().from(input.servedAt), + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) + } + } +} \ No newline at end of file diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-handler.js new file mode 100644 index 00000000..79cfa6ba --- /dev/null +++ b/billing/functions/egress-traffic-handler.js @@ -0,0 +1,131 @@ +import * as Sentry from '@sentry/serverless' +import { expect } from './lib.js' +import { decode } from '../data/egress.js' +import { SQSClient, DeleteMessageCommand } from '@aws-sdk/client-sqs' +import { mustGetEnv } from '../../lib/env.js' +import { createCustomerStore } from '../tables/customer.js' +import Stripe from 'stripe' +import { Config } from 'sst/node/config' + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0 +}) + +/** + * @typedef {{ + * region?: 'us-west-2'|'us-east-2' + * queueUrl?: string + * customerTable?: string + * billingMeterName?: string + * stripeSecretKey?: string + * }} CustomHandlerContext + */ + +/** + * AWS Lambda handler to process egress events from the egress traffic queue. + * Each event is a JSON object with `customer`, `resource`, `bytes` and `timestamp`. + * The message is then deleted from the queue when successful. + */ +export const handler = Sentry.AWSLambda.wrapHandler( + /** + * @param {import('aws-lambda').SQSEvent} event + * @param {import('aws-lambda').Context} context + */ + + async (event, context) => { + /** @type {CustomHandlerContext|undefined} */ + const customContext = context?.clientContext?.Custom + const region = customContext?.region ?? mustGetEnv('AWS_REGION') + const queueUrl = customContext?.queueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL') + const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') + const sqsClient = new SQSClient({ region }) + const customerStore = createCustomerStore({ region }, { tableName: customerTable }) + const billingMeterName = customContext?.billingMeterName ?? 'gateway_egress_traffic' + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY + + if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') + const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) + + for (const record of event.Records) { + try { + const messageBody = JSON.parse(record.body) + const decoded = decode(messageBody) + const egressEvent = expect(decoded, 'Failed to decode egress message') + + expect( + await recordEgress(customerStore, stripe, billingMeterName, egressEvent), + `Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}` + ) + + /** + * SQS requires explicit acknowledgment that a message has been successfully processed. + * This is done by deleting the message from the queue using its ReceiptHandle + */ + await sqsClient.send(new DeleteMessageCommand({ + QueueUrl: queueUrl, + ReceiptHandle: record.receiptHandle + })) + } catch (error) { + console.error('Error processing egress event:', error) + } + } + + return { + statusCode: 200, + body: 'Egress events processed successfully' + } + }, +) + +/** + * Finds the Stripe customer ID for the given customer and records the egress traffic data in the Stripe Billing Meter API. + * + * @param {import('../lib/api.ts').CustomerStore} customerStore + * @param {import('stripe').Stripe} stripe + * @param {string} billingMeterName + * @param {import('../lib/api.ts').EgressTrafficData} egressEventData + */ +async function recordEgress(customerStore, stripe, billingMeterName, egressEventData) { + const response = await customerStore.get({ customer: egressEventData.customer }) + if (response.error) { + return { + error: { + name: 'CustomerNotFound', + message: `Error getting customer ${egressEventData.customer}`, + cause: response.error + } + } + } + const stripeCustomerId = response.ok.account.slice('stripe:'.length) + /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ + const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) + if (stripeCustomer.deleted) { + return { + error: { + name: 'StripeCustomerNotFound', + message: `Customer ${stripeCustomerId} has been deleted from Stripe`, + } + } + } + + /** @type {import('stripe').Stripe.Billing.MeterEvent} */ + const meterEvent = await stripe.billing.meterEvents.create({ + event_name: billingMeterName, + payload: { + stripe_customer_id: stripeCustomerId, + value: egressEventData.bytes.toString(), + }, + timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000) + }) + if (meterEvent.identifier) { + return { ok: { meterEvent } } + } + return { + error: { + name: 'StripeBillingMeterEventCreationFailed', + message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`, + } + } +} \ No newline at end of file diff --git a/billing/lib/api.ts b/billing/lib/api.ts index 22df39de..c8bebd92 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -1,4 +1,4 @@ -import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure } from '@ucanto/interface' +import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure, UnknownLink } from '@ucanto/interface' // Billing stores ///////////////////////////////////////////////////////////// @@ -158,6 +158,25 @@ export interface CustomerBillingInstruction { export type CustomerBillingQueue = QueueAdder +/** + * Captures details about egress traffic that should be billed for a given period + */ +export interface EgressTrafficData { + /** Customer DID (did:mailto:...). */ + customer: CustomerDID + /** Resource that was served. */ + resource: UnknownLink + /** Number of bytes that were served. */ + bytes: bigint + /** Time the egress traffic was served at. */ + servedAt: Date +} + +/** + * Queue for egress traffic data. + */ +export type EgressTrafficQueue = QueueAdder + /** * Captures details about a space that should be billed for a given customer in * the given period of usage. diff --git a/billing/queues/egress-traffic.js b/billing/queues/egress-traffic.js new file mode 100644 index 00000000..95f79737 --- /dev/null +++ b/billing/queues/egress-traffic.js @@ -0,0 +1,9 @@ +import { createQueueAdderClient } from './client.js' +import { encode, validate } from '../data/egress.js' + +/** + * @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf + * @param {{ url: URL }} context + */ +export const createEgressTrafficQueue = (conf, { url }) => + createQueueAdderClient(conf, { url, encode, validate }) \ No newline at end of file diff --git a/upload-api/functions/ucan-invocation-router.js b/upload-api/functions/ucan-invocation-router.js index 16a3a6f6..0d0735a1 100644 --- a/upload-api/functions/ucan-invocation-router.js +++ b/upload-api/functions/ucan-invocation-router.js @@ -41,6 +41,7 @@ import { createStripeBillingProvider } from '../billing.js' import { createIPNIService } from '../external-services/ipni-service.js' import * as UploadAPI from '@web3-storage/upload-api' import { mustGetEnv } from '../../lib/env.js' +import { createEgressTrafficQueue } from '@web3-storage/w3infra-billing/queues/egress-traffic.js' Sentry.AWSLambda.init({ environment: process.env.SST_STAGE, @@ -120,6 +121,7 @@ export async function ucanInvocationRouter(request) { dealTrackerUrl, pieceOfferQueueUrl, filecoinSubmitQueueUrl, + egressTrafficQueueUrl, requirePaymentPlan, // set for testing dbEndpoint, @@ -201,7 +203,8 @@ export async function ucanInvocationRouter(request) { const revocationsStorage = createRevocationsTable(AWS_REGION, revocationTableName) const spaceDiffStore = createSpaceDiffStore({ region: AWS_REGION }, { tableName: spaceDiffTableName }) const spaceSnapshotStore = createSpaceSnapshotStore({ region: AWS_REGION }, { tableName: spaceSnapshotTableName }) - const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore }) + const egressTrafficQueue = createEgressTrafficQueue({ region: AWS_REGION }, { url: new URL(egressTrafficQueueUrl) }) + const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore, egressTrafficQueue }) const dealTrackerConnection = getServiceConnection({ did: dealTrackerDid, @@ -346,6 +349,7 @@ function getLambdaEnv () { spaceSnapshotTableName: mustGetEnv('SPACE_SNAPSHOT_TABLE_NAME'), pieceOfferQueueUrl: mustGetEnv('PIECE_OFFER_QUEUE_URL'), filecoinSubmitQueueUrl: mustGetEnv('FILECOIN_SUBMIT_QUEUE_URL'), + egressTrafficQueueUrl: mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL'), r2DelegationBucketEndpoint: mustGetEnv('R2_ENDPOINT'), r2DelegationBucketAccessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'), r2DelegationBucketSecretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'), diff --git a/upload-api/stores/usage.js b/upload-api/stores/usage.js index 45720254..c4d38059 100644 --- a/upload-api/stores/usage.js +++ b/upload-api/stores/usage.js @@ -4,15 +4,16 @@ import { iterateSpaceDiffs } from '@web3-storage/w3infra-billing/lib/space-billi * @param {object} conf * @param {import('@web3-storage/w3infra-billing/lib/api').SpaceSnapshotStore} conf.spaceSnapshotStore * @param {import('@web3-storage/w3infra-billing/lib/api').SpaceDiffStore} conf.spaceDiffStore + * @param {import('@web3-storage/w3infra-billing/lib/api').EgressTrafficQueue} conf.egressTrafficQueue */ -export function useUsageStore ({ spaceSnapshotStore, spaceDiffStore }) { +export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTrafficQueue }) { return { /** * @param {import('@web3-storage/upload-api').ProviderDID} provider * @param {import('@web3-storage/upload-api').SpaceDID} space * @param {{ from: Date, to: Date }} period */ - async report (provider, space, period) { + async report(provider, space, period) { const snapResult = await spaceSnapshotStore.get({ provider, space, @@ -57,6 +58,29 @@ export function useUsageStore ({ spaceSnapshotStore, spaceDiffStore }) { events, } return { ok: report } + }, + + /** + * Handle egress traffic data and enqueues it, so the billing system can process it and update the Stripe Billing Meter API. + * + * @param {import('@web3-storage/upload-api').AccountDID} customer + * @param {import('@web3-storage/upload-api').UnknownLink} resource + * @param {bigint} bytes + * @param {Date} servedAt + * @returns {Promise>} + */ + async record(customer, resource, bytes, servedAt) { + const record = { + customer, + resource, + bytes, + servedAt + } + + const result = await egressTrafficQueue.add(record) + if (result.error) return result + + return { ok: record } } } } From 2c671846d5e4a6a6acc6e34fe8715050be07700c Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Thu, 17 Oct 2024 15:57:47 -0300 Subject: [PATCH 02/15] tests: egress traffic --- billing/data/egress.js | 13 ++++- billing/functions/egress-traffic-handler.js | 5 +- billing/package.json | 1 + billing/test/helpers/context.js | 59 ++++++++++++++++++++ billing/test/helpers/egress.js | 14 +++++ billing/test/lib.egress-traffic.spec.js | 4 ++ billing/test/lib/api.ts | 18 +++++- billing/test/lib/egress-traffic.js | 62 +++++++++++++++++++++ filecoin/test/filecoin-events.test.js | 13 +++-- filecoin/test/filecoin-service.test.js | 13 +++-- 10 files changed, 187 insertions(+), 15 deletions(-) create mode 100644 billing/test/helpers/egress.js create mode 100644 billing/test/lib.egress-traffic.spec.js create mode 100644 billing/test/lib/egress-traffic.js diff --git a/billing/data/egress.js b/billing/data/egress.js index b7593ac7..9093f3e7 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -37,4 +37,15 @@ export const decode = input => { error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) } } -} \ No newline at end of file +} + +/** @type {import('../lib/api').Decoder} */ +export const decodeStr = input => { + try { + return decode(JSON.parse(input)) + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err }) + } + } +} diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-handler.js index 79cfa6ba..8c54c6f4 100644 --- a/billing/functions/egress-traffic-handler.js +++ b/billing/functions/egress-traffic-handler.js @@ -33,7 +33,6 @@ export const handler = Sentry.AWSLambda.wrapHandler( * @param {import('aws-lambda').SQSEvent} event * @param {import('aws-lambda').Context} context */ - async (event, context) => { /** @type {CustomHandlerContext|undefined} */ const customContext = context?.clientContext?.Custom @@ -82,10 +81,10 @@ export const handler = Sentry.AWSLambda.wrapHandler( /** * Finds the Stripe customer ID for the given customer and records the egress traffic data in the Stripe Billing Meter API. * - * @param {import('../lib/api.ts').CustomerStore} customerStore + * @param {import('../lib/api.js').CustomerStore} customerStore * @param {import('stripe').Stripe} stripe * @param {string} billingMeterName - * @param {import('../lib/api.ts').EgressTrafficData} egressEventData + * @param {import('../lib/api.js').EgressTrafficData} egressEventData */ async function recordEgress(customerStore, stripe, billingMeterName, egressEventData) { const response = await customerStore.get({ customer: egressEventData.customer }) diff --git a/billing/package.json b/billing/package.json index b0a96027..48177251 100644 --- a/billing/package.json +++ b/billing/package.json @@ -4,6 +4,7 @@ "type": "module", "scripts": { "test": "entail '**/*.spec.js'", + "test-only": "entail", "coverage": "c8 -r text -r html npm test" }, "dependencies": { diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 478c5d1f..6e061217 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -1,3 +1,6 @@ +import dotenv from 'dotenv' +import path from 'node:path' +import Stripe from 'stripe' import { createDynamoDB, createSQS, createQueue, createTable } from './aws.js' import { createCustomerStore, customerTableProps } from '../../tables/customer.js' import { encode as encodeCustomer, validate as validateCustomer } from '../../data/customer.js' @@ -6,6 +9,7 @@ import { decode as decodeSpaceBillingInstruction } from '../../data/space-billin import { encode as encodeSubscription, validate as validateSubscription } from '../../data/subscription.js' import { encode as encodeConsumer, validate as validateConsumer } from '../../data/consumer.js' import { decode as decodeUsage, lister as usageLister } from '../../data/usage.js' +import { decodeStr as decodeEgressTrafficEvent } from '../../data/egress.js' import { createCustomerBillingQueue } from '../../queues/customer.js' import { createSpaceBillingQueue } from '../../queues/space.js' import { consumerTableProps, subscriptionTableProps } from '../../../upload-api/tables/index.js' @@ -16,6 +20,10 @@ import { createSpaceDiffStore, spaceDiffTableProps } from '../../tables/space-di import { createSpaceSnapshotStore, spaceSnapshotTableProps } from '../../tables/space-snapshot.js' import { createUsageStore, usageTableProps } from '../../tables/usage.js' import { createQueueRemoverClient } from './queue.js' +import { createEgressTrafficQueue } from '../../queues/egress-traffic.js' +import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js' + +dotenv.config({ path: path.resolve('../.env.local'), override: true, debug: true }) /** * @typedef {{ @@ -137,6 +145,57 @@ export const createUCANStreamTestContext = async () => { return { consumerStore, spaceDiffStore } } +/** + * @returns {Promise} + */ +export const createEgressTrafficTestContext = async () => { + await createAWSServices() + const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY + if (!stripeSecretKey) { + throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set') + } + + const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-queue-')) + const egressTrafficQueue = { + add: createEgressTrafficQueue(awsServices.sqs.client, { url: egressQueueURL }).add, + remove: createQueueRemoverClient(awsServices.sqs.client, { url: egressQueueURL, decode: decodeEgressTrafficEvent }).remove, + } + + const accountId = (await awsServices.sqs.client.config.credentials()).accountId + const region = await awsServices.sqs.client.config.region() + + return { + egressTrafficQueue, + egressTrafficQueueUrl: egressQueueURL.toString(), + egressTrafficHandler: createEgressTrafficHandler, + accountId: accountId ?? '', + region: region ?? '', + stripeSecretKey, + stripe: new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }), + // Add mock properties for default Context + callbackWaitsForEmptyEventLoop: false, + functionName: 'egress-traffic-handler', + functionVersion: '1', + invokedFunctionArn: `arn:aws:lambda:${region}:${accountId}:function:egress-traffic-handler`, + memoryLimitInMB: '128', + awsRequestId: 'mockRequestId', + logGroupName: 'mockLogGroup', + logStreamName: 'mockLogStream', + identity: undefined, + clientContext: undefined, + getRemainingTimeInMillis: () => 30000, // mock implementation + done: () => { + console.log('Egress traffic handler done') + }, + fail: () => { + console.log('Egress traffic handler fail') + }, + succeed: () => { + console.log('Egress traffic handler succeed') + } + } +} + /** * @template C * @param {import('../lib/api').TestSuite} suite diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js new file mode 100644 index 00000000..71b0c876 --- /dev/null +++ b/billing/test/helpers/egress.js @@ -0,0 +1,14 @@ +import { randomDIDMailto } from './did.js' +import { randomLink } from './dag.js' + +/** + * @param {Partial} [base] + * @returns {Promise} + */ +export const randomEgressEvent = async (base = {}) => ({ + customer: await randomDIDMailto(), + resource: randomLink(), + bytes: BigInt(Math.floor(Math.random() * 1000000)), + servedAt: new Date(), + ...base +}) \ No newline at end of file diff --git a/billing/test/lib.egress-traffic.spec.js b/billing/test/lib.egress-traffic.spec.js new file mode 100644 index 00000000..306b57a5 --- /dev/null +++ b/billing/test/lib.egress-traffic.spec.js @@ -0,0 +1,4 @@ +import * as EgressTrafficSuite from './lib/egress-traffic.js' +import { bindTestContext, createEgressTrafficTestContext } from './helpers/context.js' + +export const test = bindTestContext(EgressTrafficSuite.test, createEgressTrafficTestContext) \ No newline at end of file diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index 68814ca9..fbee21d4 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -18,8 +18,12 @@ import { SpaceSnapshotStore, UsageStore, UsageListKey, - Usage + Usage, + EgressTrafficQueue, + EgressTrafficData } from '../../lib/api.js' +import { Context, Handler, SQSEvent } from 'aws-lambda' +import Stripe from 'stripe' export interface BillingCronTestContext { customerStore: CustomerStore & StorePutter @@ -47,12 +51,24 @@ export interface UCANStreamTestContext { consumerStore: ConsumerStore & StorePutter } + +export interface EgressTrafficTestContext extends Context { + egressTrafficQueue: EgressTrafficQueue & QueueRemover + egressTrafficQueueUrl: string + egressTrafficHandler: Handler + accountId: string + region: string + stripeSecretKey: string + stripe: Stripe +} + export type TestContext = & BillingCronTestContext & CustomerBillingQueueTestContext & SpaceBillingQueueTestContext & StripeTestContext & UCANStreamTestContext + & EgressTrafficTestContext /** QueueRemover can remove items from the head of the queue. */ export interface QueueRemover { diff --git a/billing/test/lib/egress-traffic.js b/billing/test/lib/egress-traffic.js new file mode 100644 index 00000000..ceb26cfa --- /dev/null +++ b/billing/test/lib/egress-traffic.js @@ -0,0 +1,62 @@ +import { randomEgressEvent } from '../helpers/egress.js' + +/** @type {import('./api').TestSuite} */ +export const test = { + 'should process egress events': async (/** @type {import('entail').assert} */ assert, ctx) => { + const maxEvents = 100 + const events = await Promise.all( + Array.from({ length: maxEvents }, () => randomEgressEvent()) + ) + + // 1. Add egress events to the queue to simulate events from the Freeway worker + for (const e of events) { + console.log(`Adding egress event to the queue: CustomerId: ${e.customer}, ResourceId: ${e.resource}, ServedAt: ${e.servedAt.toISOString()}`) + await ctx.egressTrafficQueue.add(e) + } + + + // 2. Create a SQS event batch + // @type {import('aws-lambda').SQSEvent} + const sqsEventBatch = { + Records: events.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: JSON.stringify(e), + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.servedAt.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), + }, + messageAttributes: {}, + })) + } + + // 3. Process the SQS event to trigger the handler + await ctx.egressTrafficHandler(sqsEventBatch, ctx, (err, res) => { + if (err) { + assert.fail(err) + } + assert.ok(res) + assert.equal(res.statusCode, 200) + assert.equal(res.body, 'Egress events processed successfully') + }) + + // 4. Ensure we got a billing meter event or each egress event in the queue + // query stripe for the billing meter events + // const billingMeterEvents = await ctx.stripe.billing.meterEvents.list({ + // limit: maxEvents, + // }) + // assert.equal(billingMeterEvents.data.length, events.length) + // FIXME (fforbeck): how to check we send the events to stripe? + // we need to mock the stripe client + // and check that the correct events are sent to stripe + } +} \ No newline at end of file diff --git a/filecoin/test/filecoin-events.test.js b/filecoin/test/filecoin-events.test.js index dfd5663a..5added75 100644 --- a/filecoin/test/filecoin-events.test.js +++ b/filecoin/test/filecoin-events.test.js @@ -90,12 +90,15 @@ test.after(async t => { }) for (const [title, unit] of Object.entries(filecoinApiTest.events.storefront)) { - const define = title.startsWith('only ') + let define; + if (title.startsWith('only ')) { // eslint-disable-next-line no-only-tests/no-only-tests - ? test.only - : title.startsWith('skip ') - ? test.skip - : test + define = test.only; + } else if (title.startsWith('skip ')) { + define = test.skip; + } else { + define = test; + } define(title, async (t) => { const queues = getQueues(t.context) diff --git a/filecoin/test/filecoin-service.test.js b/filecoin/test/filecoin-service.test.js index 7fc853e0..54d60ea1 100644 --- a/filecoin/test/filecoin-service.test.js +++ b/filecoin/test/filecoin-service.test.js @@ -91,12 +91,15 @@ test.after(async t => { }) for (const [title, unit] of Object.entries(filecoinApiTest.service.storefront)) { - const define = title.startsWith('only ') + let define; + if (title.startsWith('only ')) { // eslint-disable-next-line no-only-tests/no-only-tests - ? test.only - : title.startsWith('skip ') - ? test.skip - : test + define = test.only; + } else if (title.startsWith('skip ')) { + define = test.skip; + } else { + define = test; + } define(title, async (t) => { const queues = getQueues(t.context) From 95d5f14ccc08a374a719fa02833c7acd965b5a9c Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Fri, 18 Oct 2024 15:03:11 -0300 Subject: [PATCH 03/15] tests: egress traffic --- billing/data/egress.js | 16 ++- billing/functions/egress-traffic-handler.js | 38 ++--- billing/test/helpers/context.js | 68 +++++---- billing/test/helpers/did.js | 2 +- billing/test/helpers/egress.js | 13 +- billing/test/lib/api.ts | 4 + billing/test/lib/egress-traffic.js | 147 +++++++++++++------- 7 files changed, 180 insertions(+), 108 deletions(-) diff --git a/billing/data/egress.js b/billing/data/egress.js index 9093f3e7..d44bbd3c 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -1,3 +1,4 @@ +import { Link } from '@ucanto/server' import { DecodeFailure, EncodeFailure, Schema } from './lib.js' export const egressSchema = Schema.struct({ @@ -13,7 +14,14 @@ export const validate = input => egressSchema.read(input) /** @type {import('../lib/api').Encoder} */ export const encode = input => { try { - return { ok: JSON.stringify(input) } + return { + ok: JSON.stringify({ + customer: input.customer.toString(), + resource: input.resource.toString(), + bytes: input.bytes.toString(), + servedAt: input.servedAt.toISOString(), + }) + } } catch (/** @type {any} */ err) { return { error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) @@ -27,9 +35,9 @@ export const decode = input => { return { ok: { customer: Schema.did({ method: 'mailto' }).from(input.customer), - resource: Schema.link().from(input.resourceId), - bytes: Schema.bigint().from(input.bytes), - servedAt: Schema.date().from(input.servedAt), + resource: Link.parse(/** @type {string} */(input.resource)), + bytes: BigInt(input.bytes), + servedAt: new Date(input.servedAt), } } } catch (/** @type {any} */ err) { diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-handler.js index 8c54c6f4..cb5d4cfa 100644 --- a/billing/functions/egress-traffic-handler.js +++ b/billing/functions/egress-traffic-handler.js @@ -1,7 +1,6 @@ import * as Sentry from '@sentry/serverless' import { expect } from './lib.js' -import { decode } from '../data/egress.js' -import { SQSClient, DeleteMessageCommand } from '@aws-sdk/client-sqs' +import { decodeStr } from '../data/egress.js' import { mustGetEnv } from '../../lib/env.js' import { createCustomerStore } from '../tables/customer.js' import Stripe from 'stripe' @@ -16,10 +15,11 @@ Sentry.AWSLambda.init({ /** * @typedef {{ * region?: 'us-west-2'|'us-east-2' - * queueUrl?: string + * egressTrafficQueueUrl?: string * customerTable?: string * billingMeterName?: string * stripeSecretKey?: string + * customerStore?: import('../lib/api').CustomerStore * }} CustomHandlerContext */ @@ -37,21 +37,23 @@ export const handler = Sentry.AWSLambda.wrapHandler( /** @type {CustomHandlerContext|undefined} */ const customContext = context?.clientContext?.Custom const region = customContext?.region ?? mustGetEnv('AWS_REGION') - const queueUrl = customContext?.queueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL') + // const queueUrl = customContext?.egressTrafficQueueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL') + // const sqsClient = new SQSClient({ region }) const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') - const sqsClient = new SQSClient({ region }) - const customerStore = createCustomerStore({ region }, { tableName: customerTable }) - const billingMeterName = customContext?.billingMeterName ?? 'gateway_egress_traffic' - const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY + const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') + + const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME') + if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') + const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) for (const record of event.Records) { try { - const messageBody = JSON.parse(record.body) - const decoded = decode(messageBody) - const egressEvent = expect(decoded, 'Failed to decode egress message') + const decoded = decodeStr(record.body) + const egressEvent = expect(decoded, 'Failed to decode egress event') expect( await recordEgress(customerStore, stripe, billingMeterName, egressEvent), @@ -62,10 +64,10 @@ export const handler = Sentry.AWSLambda.wrapHandler( * SQS requires explicit acknowledgment that a message has been successfully processed. * This is done by deleting the message from the queue using its ReceiptHandle */ - await sqsClient.send(new DeleteMessageCommand({ - QueueUrl: queueUrl, - ReceiptHandle: record.receiptHandle - })) + // await sqsClient.send(new DeleteMessageCommand({ + // QueueUrl: queueUrl, + // ReceiptHandle: record.receiptHandle + // })) } catch (error) { console.error('Error processing egress event:', error) } @@ -83,10 +85,10 @@ export const handler = Sentry.AWSLambda.wrapHandler( * * @param {import('../lib/api.js').CustomerStore} customerStore * @param {import('stripe').Stripe} stripe - * @param {string} billingMeterName + * @param {string} billingMeterEventName * @param {import('../lib/api.js').EgressTrafficData} egressEventData */ -async function recordEgress(customerStore, stripe, billingMeterName, egressEventData) { +async function recordEgress(customerStore, stripe, billingMeterEventName, egressEventData) { const response = await customerStore.get({ customer: egressEventData.customer }) if (response.error) { return { @@ -111,7 +113,7 @@ async function recordEgress(customerStore, stripe, billingMeterName, egressEvent /** @type {import('stripe').Stripe.Billing.MeterEvent} */ const meterEvent = await stripe.billing.meterEvents.create({ - event_name: billingMeterName, + event_name: billingMeterEventName, payload: { stripe_customer_id: stripeCustomerId, value: egressEventData.bytes.toString(), diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 6e061217..0ee2a2cc 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -1,6 +1,5 @@ import dotenv from 'dotenv' import path from 'node:path' -import Stripe from 'stripe' import { createDynamoDB, createSQS, createQueue, createTable } from './aws.js' import { createCustomerStore, customerTableProps } from '../../tables/customer.js' import { encode as encodeCustomer, validate as validateCustomer } from '../../data/customer.js' @@ -22,6 +21,7 @@ import { createUsageStore, usageTableProps } from '../../tables/usage.js' import { createQueueRemoverClient } from './queue.js' import { createEgressTrafficQueue } from '../../queues/egress-traffic.js' import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js' +import Stripe from 'stripe' dotenv.config({ path: path.resolve('../.env.local'), override: true, debug: true }) @@ -41,6 +41,26 @@ const createAWSServices = async () => { } } +/** + * @returns {{ stripe: Stripe, stripeSecretKey: string, billingMeterEventName: string, billingMeterId: string }} + */ +const createStripeService = () => { + const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY + if (!stripeSecretKey) { + throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set') + } + const billingMeterEventName = process.env.STRIPE_BILLING_METER_EVENT_NAME + if (!billingMeterEventName) { + throw new Error('STRIPE_BILLING_METER_EVENT_NAME environment variable is not set') + } + const billingMeterId = process.env.STRIPE_BILLING_METER_ID + if (!billingMeterId) { + throw new Error('STRIPE_BILLING_METER_ID environment variable is not set') + } + const stripe = new Stripe(stripeSecretKey, { apiVersion: "2023-10-16" }) + return { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } +} + export const createBillingCronTestContext = async () => { await createAWSServices() @@ -150,10 +170,6 @@ export const createUCANStreamTestContext = async () => { */ export const createEgressTrafficTestContext = async () => { await createAWSServices() - const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY - if (!stripeSecretKey) { - throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set') - } const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-queue-')) const egressTrafficQueue = { @@ -162,37 +178,33 @@ export const createEgressTrafficTestContext = async () => { } const accountId = (await awsServices.sqs.client.config.credentials()).accountId - const region = await awsServices.sqs.client.config.region() + const region = 'us-west-2' + + const customerTable = await createTable(awsServices.dynamo.client, customerTableProps, 'customer-') + const customerStore = { + ...createCustomerStore(awsServices.dynamo.client, { tableName: customerTable }), + ...createStorePutterClient(awsServices.dynamo.client, { + tableName: customerTable, + validate: validateCustomer, // assume test data is valid + encode: encodeCustomer + }) + } + + const { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } = createStripeService() + // @ts-expect-error -- Don't need to initialize the full lambda context for testing return { egressTrafficQueue, egressTrafficQueueUrl: egressQueueURL.toString(), egressTrafficHandler: createEgressTrafficHandler, accountId: accountId ?? '', region: region ?? '', + customerTable, + customerStore, + billingMeterEventName, + billingMeterId, stripeSecretKey, - stripe: new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }), - // Add mock properties for default Context - callbackWaitsForEmptyEventLoop: false, - functionName: 'egress-traffic-handler', - functionVersion: '1', - invokedFunctionArn: `arn:aws:lambda:${region}:${accountId}:function:egress-traffic-handler`, - memoryLimitInMB: '128', - awsRequestId: 'mockRequestId', - logGroupName: 'mockLogGroup', - logStreamName: 'mockLogStream', - identity: undefined, - clientContext: undefined, - getRemainingTimeInMillis: () => 30000, // mock implementation - done: () => { - console.log('Egress traffic handler done') - }, - fail: () => { - console.log('Egress traffic handler fail') - }, - succeed: () => { - console.log('Egress traffic handler succeed') - } + stripe, } } diff --git a/billing/test/helpers/did.js b/billing/test/helpers/did.js index c58fb9e7..3f0bc759 100644 --- a/billing/test/helpers/did.js +++ b/billing/test/helpers/did.js @@ -8,7 +8,7 @@ const randomDomain = () => `${randomAlphas(randomInteger(1, 32))}.${tlds[randomInteger(0, tlds.length)]}` /** @returns {import("@ucanto/interface").DID<'mailto'>} */ -export const randomDIDMailto = () => +export const randomDIDMailto = () => `did:mailto:${randomDomain()}:${randomAlphas(randomInteger(1, 16))}` /** @returns {Promise} */ diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js index 71b0c876..f2110081 100644 --- a/billing/test/helpers/egress.js +++ b/billing/test/helpers/egress.js @@ -1,14 +1,13 @@ -import { randomDIDMailto } from './did.js' import { randomLink } from './dag.js' /** - * @param {Partial} [base] - * @returns {Promise} + * @param {import('../../lib/api').Customer} customer + * @returns {import('../../lib/api').EgressTrafficData} */ -export const randomEgressEvent = async (base = {}) => ({ - customer: await randomDIDMailto(), +export const randomEgressEvent = (customer) => ({ + customer: customer.customer, resource: randomLink(), bytes: BigInt(Math.floor(Math.random() * 1000000)), - servedAt: new Date(), - ...base + // Random timestamp within the last 1 hour + servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)), }) \ No newline at end of file diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index fbee21d4..081e1186 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -58,6 +58,10 @@ export interface EgressTrafficTestContext extends Context { egressTrafficHandler: Handler accountId: string region: string + customerTable: string + customerStore: CustomerStore + billingMeterEventName: string + billingMeterId: string stripeSecretKey: string stripe: Stripe } diff --git a/billing/test/lib/egress-traffic.js b/billing/test/lib/egress-traffic.js index ceb26cfa..3d3cabab 100644 --- a/billing/test/lib/egress-traffic.js +++ b/billing/test/lib/egress-traffic.js @@ -1,62 +1,109 @@ +import { encode } from '../../data/egress.js' +import { randomCustomer } from '../helpers/customer.js' +import { randomDIDMailto } from '../helpers/did.js' import { randomEgressEvent } from '../helpers/egress.js' +import * as DidMailto from '@web3-storage/did-mailto' /** @type {import('./api').TestSuite} */ export const test = { - 'should process egress events': async (/** @type {import('entail').assert} */ assert, ctx) => { - const maxEvents = 100 - const events = await Promise.all( - Array.from({ length: maxEvents }, () => randomEgressEvent()) - ) + /** + * @param {import('entail').assert} assert + * @param {import('./api').EgressTrafficTestContext} ctx + */ + 'should process all the egress traffic events from the queue': async (assert, ctx) => { + let stripeCustomerId; + try { + // 0. Create a test customer email, add it to stripe and to the customer store + const didMailto = randomDIDMailto() + const email = DidMailto.toEmail(/** @type {`did:mailto:${string}:${string}`} */(didMailto)) + const stripeCustomer = await ctx.stripe.customers.create({ email }) + assert.ok(stripeCustomer.id, 'Error adding customer to stripe') + stripeCustomerId = stripeCustomer.id - // 1. Add egress events to the queue to simulate events from the Freeway worker - for (const e of events) { - console.log(`Adding egress event to the queue: CustomerId: ${e.customer}, ResourceId: ${e.resource}, ServedAt: ${e.servedAt.toISOString()}`) - await ctx.egressTrafficQueue.add(e) - } + const customer = randomCustomer({ + customer: didMailto, + /** @type {`stripe:${string}`} */ + account: `stripe:${stripeCustomerId}` + }) + const { error } = await ctx.customerStore.put(customer) + assert.ok(!error, 'Error adding customer') + // 1. Add egress events to the queue to simulate egress traffic from the Freeway worker + const maxEvents = 10 + /** @type {import('../../lib/api').EgressTrafficData[]} */ + const events = await Promise.all( + Array.from( + { length: maxEvents }, + () => randomEgressEvent(customer) + ) + ) - // 2. Create a SQS event batch - // @type {import('aws-lambda').SQSEvent} - const sqsEventBatch = { - Records: events.map(e => ({ - // @type {import('aws-lambda').SQSRecord} - body: JSON.stringify(e), - messageId: Math.random().toString(), - receiptHandle: Math.random().toString(), - awsRegion: ctx.region, - eventSource: 'aws:sqs', - eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, - awsAccountId: ctx.accountId, - md5OfBody: '', - md5OfMessageAttributes: '', - attributes: { - ApproximateReceiveCount: '1', - SentTimestamp: e.servedAt.getTime().toString(), - SenderId: ctx.accountId, - ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), - }, - messageAttributes: {}, - })) - } + for (const e of events) { + console.log(`Egress traffic for ${e.customer}, bytes: ${e.bytes}, servedAt: ${e.servedAt.toISOString()}, `) + const result = await ctx.egressTrafficQueue.add(e) + assert.ok(!result.error, 'Error adding egress event to the queue') + } - // 3. Process the SQS event to trigger the handler - await ctx.egressTrafficHandler(sqsEventBatch, ctx, (err, res) => { - if (err) { - assert.fail(err) + // 2. Create a SQS event batch + // @type {import('aws-lambda').SQSEvent} + const sqsEventBatch = { + Records: events.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: encode(e).ok ?? '', + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.servedAt.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), + }, + messageAttributes: {}, + })) } - assert.ok(res) - assert.equal(res.statusCode, 200) - assert.equal(res.body, 'Egress events processed successfully') - }) - // 4. Ensure we got a billing meter event or each egress event in the queue - // query stripe for the billing meter events - // const billingMeterEvents = await ctx.stripe.billing.meterEvents.list({ - // limit: maxEvents, - // }) - // assert.equal(billingMeterEvents.data.length, events.length) - // FIXME (fforbeck): how to check we send the events to stripe? - // we need to mock the stripe client - // and check that the correct events are sent to stripe + // 3. Process the SQS event to trigger the handler using the custom context + const customCtx = { + clientContext: { + Custom: ctx, + }, + } + // @ts-expect-error -- Don't need to initialize the full lambda context for testing + await ctx.egressTrafficHandler(sqsEventBatch, customCtx, (err, res) => { + if (err) { + assert.fail(err) + } + assert.ok(res) + assert.equal(res.statusCode, 200) + assert.equal(res.body, 'Egress events processed successfully') + }) + + // 4. Check if the aggregated meter event exists and has a value greater than 0 + const aggregatedMeterEvent = await ctx.stripe.billing.meters.listEventSummaries( + ctx.billingMeterId, + { + customer: stripeCustomerId, + start_time: Math.floor(events[0].servedAt.getTime() / 1000), + end_time: Math.floor(Date.now() / 1000), + } + ) + assert.ok(aggregatedMeterEvent.data, 'No aggregated meter event found') + assert.equal(aggregatedMeterEvent.data.length, 1, 'Expected 1 aggregated meter event') + // We can't verify the total bytes served because the meter events are not immediately available in stripe + // and the test would fail intermittently + assert.ok(aggregatedMeterEvent.data[0].aggregated_value > 0, 'Aggregated value is 0') + } finally { + if (stripeCustomerId) { + // 5. Delete the test customer from stripe + const deletedCustomer = await ctx.stripe.customers.del(stripeCustomerId); + assert.ok(deletedCustomer.deleted, 'Error deleting customer from stripe') + } + } } } \ No newline at end of file From d1ed98b1d7df981e8f49f6a2bfe3a89959698a85 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Fri, 18 Oct 2024 15:53:53 -0300 Subject: [PATCH 04/15] chore: reading repo variables --- .github/workflows/test.yaml | 2 ++ billing/functions/egress-traffic-handler.js | 14 ++-------- stacks/billing-stack.js | 31 +++++++++++++++++++++ 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b968740b..773a4ec4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -25,3 +25,5 @@ jobs: AWS_ACCESS_KEY_ID: 'NOSUCH' AWS_SECRET_ACCESS_KEY: 'NOSUCH' STRIPE_TEST_SECRET_KEY: ${{ secrets.STRIPE_TEST_SECRET_KEY }} + STRIPE_BILLING_METER_ID: ${{ vars.STRIPE_BILLING_METER_ID }} + STRIPE_BILLING_METER_EVENT_NAME: ${{ vars.STRIPE_BILLING_METER_EVENT_NAME }} diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-handler.js index cb5d4cfa..fe9a29c9 100644 --- a/billing/functions/egress-traffic-handler.js +++ b/billing/functions/egress-traffic-handler.js @@ -25,7 +25,7 @@ Sentry.AWSLambda.init({ /** * AWS Lambda handler to process egress events from the egress traffic queue. - * Each event is a JSON object with `customer`, `resource`, `bytes` and `timestamp`. + * Each event is a JSON object with `customer`, `resource`, `bytes` and `se`. * The message is then deleted from the queue when successful. */ export const handler = Sentry.AWSLambda.wrapHandler( @@ -37,8 +37,6 @@ export const handler = Sentry.AWSLambda.wrapHandler( /** @type {CustomHandlerContext|undefined} */ const customContext = context?.clientContext?.Custom const region = customContext?.region ?? mustGetEnv('AWS_REGION') - // const queueUrl = customContext?.egressTrafficQueueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL') - // const sqsClient = new SQSClient({ region }) const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) @@ -60,14 +58,7 @@ export const handler = Sentry.AWSLambda.wrapHandler( `Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}` ) - /** - * SQS requires explicit acknowledgment that a message has been successfully processed. - * This is done by deleting the message from the queue using its ReceiptHandle - */ - // await sqsClient.send(new DeleteMessageCommand({ - // QueueUrl: queueUrl, - // ReceiptHandle: record.receiptHandle - // })) + // TODO: delete the message from the queue? } catch (error) { console.error('Error processing egress event:', error) } @@ -111,6 +102,7 @@ async function recordEgress(customerStore, stripe, billingMeterEventName, egress } } + // TODO (fforbeck): implement some retry logic in case rate limiting errors /** @type {import('stripe').Stripe.Billing.MeterEvent} */ const meterEvent = await stripe.billing.meterEvents.create({ event_name: billingMeterEventName, diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index 13011b88..669198e7 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -179,5 +179,36 @@ export function BillingStack ({ stack, app }) { CustomDomain: customDomain ? `https://${customDomain.domainName}` : 'Set BILLING_HOSTED_ZONE in env to deploy to a custom domain' }) + // Lambda that handles egress traffic tracking + const egressTrafficQueueHandler = new Function(stack, 'egress-traffic-queue-handler', { + permissions: [customerTable], + handler: 'billing/functions/egress-traffic-queue.handler', + timeout: '15 minutes', + bind: [stripeSecretKey], + environment: { + AWS_REGION: stack.region, + CUSTOMER_TABLE_NAME: customerTable.tableName, + // TODO (fforbeck): make this a config based on the env: local, staging, prod + STRIPE_BILLING_METER_EVENT_NAME: 'test-gateway-egress-traffic' + } + }) + + // Queue for egress traffic tracking + const egressTrafficDLQ = new Queue(stack, 'egress-traffic-dlq', { + cdk: { queue: { retentionPeriod: Duration.days(14) } } + }) + const egressTrafficQueue = new Queue(stack, 'egress-traffic-queue', { + consumer: { + function: egressTrafficQueueHandler, + deadLetterQueue: egressTrafficDLQ.cdk.queue, + cdk: { eventSource: { batchSize: 1 } } + }, + cdk: { queue: { visibilityTimeout: Duration.seconds(60) } } + }) + + stack.addOutputs({ + EgressTrafficQueueURL: egressTrafficQueue.queueUrl + }) + return { billingCron } } From f2803a5a5fd6be5aa6cd5e3b77b338c2b4a5b5e8 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Tue, 22 Oct 2024 12:30:19 -0300 Subject: [PATCH 05/15] fix: get consumer response must include customer --- upload-api/stores/provisions.js | 3 ++- upload-api/tables/consumer.js | 3 ++- upload-api/types.ts | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/upload-api/stores/provisions.js b/upload-api/stores/provisions.js index bff7987d..19630e4e 100644 --- a/upload-api/stores/provisions.js +++ b/upload-api/stores/provisions.js @@ -109,7 +109,8 @@ export function useProvisionStore (subscriptionTable, consumerTable, spaceMetric did: consumer, allocated, limit: 1_000_000_000, // set to an arbitrarily high number because we currently don't enforce any limits - subscription: consumerRecord.subscription + subscription: consumerRecord.subscription, + customer: consumerRecord.customer } }) : ( { error: { name: 'ConsumerNotFound', message: `could not find ${consumer}` } } diff --git a/upload-api/tables/consumer.js b/upload-api/tables/consumer.js index 0cf48b8d..b2036c5d 100644 --- a/upload-api/tables/consumer.js +++ b/upload-api/tables/consumer.js @@ -98,7 +98,8 @@ export function useConsumerTable (dynamoDb, tableName) { // provider/consumer pair, but I suspect we'll never get there const record = response.Items?.map(i => unmarshall(i)).find(i => i.provider === provider) return record ? { - subscription: record.subscription + subscription: record.subscription, + customer: record.customer } : null }, diff --git a/upload-api/types.ts b/upload-api/types.ts index 20eab517..f7a5e5b7 100644 --- a/upload-api/types.ts +++ b/upload-api/types.ts @@ -184,7 +184,7 @@ export interface ConsumerListRecord { export interface ConsumerTable { /** get a consumer record for a given provider */ - get: (provider: ProviderDID, consumer: DIDKey) => Promise<{ subscription: string } | null> + get: (provider: ProviderDID, consumer: DIDKey) => Promise<{ subscription: string, customer: AccountDID } | null> /** get a consumer record for a given subscription */ getBySubscription: (provider: ProviderDID, subscription: string) => Promise<{ consumer: DID } | null> /** add a consumer - a relationship between a provider, subscription and consumer */ From 5240ea7947b53dfc4406b036effafd3c435172c3 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Tue, 22 Oct 2024 15:55:21 -0300 Subject: [PATCH 06/15] implemented suggested changes --- billing/data/egress.js | 74 ++++---- billing/functions/egress-traffic-handler.js | 159 +++++++++-------- billing/queues/egress-traffic.js | 2 +- billing/test/helpers/egress.js | 12 +- billing/test/lib/egress-traffic.js | 184 ++++++++++---------- billing/test/utils/stripe.js | 17 +- billing/utils/stripe.js | 14 ++ 7 files changed, 248 insertions(+), 214 deletions(-) diff --git a/billing/data/egress.js b/billing/data/egress.js index d44bbd3c..37d68c3d 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -2,10 +2,10 @@ import { Link } from '@ucanto/server' import { DecodeFailure, EncodeFailure, Schema } from './lib.js' export const egressSchema = Schema.struct({ - customer: Schema.did({ method: 'mailto' }), - resource: Schema.link(), - bytes: Schema.bigint(), - servedAt: Schema.date(), + customer: Schema.did({ method: 'mailto' }), + resource: Schema.link(), + bytes: Schema.bigint(), + servedAt: Schema.date(), }) /** @type {import('../lib/api').Validator} */ @@ -13,47 +13,47 @@ export const validate = input => egressSchema.read(input) /** @type {import('../lib/api').Encoder} */ export const encode = input => { - try { - return { - ok: JSON.stringify({ - customer: input.customer.toString(), - resource: input.resource.toString(), - bytes: input.bytes.toString(), - servedAt: input.servedAt.toISOString(), - }) - } - } catch (/** @type {any} */ err) { - return { - error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) - } + try { + return { + ok: JSON.stringify({ + customer: input.customer.toString(), + resource: input.resource.toString(), + bytes: input.bytes.toString(), + servedAt: input.servedAt.toISOString(), + }) } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } } /** @type {import('../lib/api').Decoder} */ export const decode = input => { - try { - return { - ok: { - customer: Schema.did({ method: 'mailto' }).from(input.customer), - resource: Link.parse(/** @type {string} */(input.resource)), - bytes: BigInt(input.bytes), - servedAt: new Date(input.servedAt), - } - } - } catch (/** @type {any} */ err) { - return { - error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) - } + try { + return { + ok: { + customer: Schema.did({ method: 'mailto' }).from(input.customer), + resource: Link.parse(/** @type {string} */(input.resource)), + bytes: BigInt(input.bytes), + servedAt: new Date(input.servedAt), + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) } + } } /** @type {import('../lib/api').Decoder} */ export const decodeStr = input => { - try { - return decode(JSON.parse(input)) - } catch (/** @type {any} */ err) { - return { - error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err }) - } + try { + return decode(JSON.parse(input)) + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err }) } -} + } +} \ No newline at end of file diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-handler.js index fe9a29c9..6d4c2b4d 100644 --- a/billing/functions/egress-traffic-handler.js +++ b/billing/functions/egress-traffic-handler.js @@ -5,11 +5,13 @@ import { mustGetEnv } from '../../lib/env.js' import { createCustomerStore } from '../tables/customer.js' import Stripe from 'stripe' import { Config } from 'sst/node/config' +import { accountIDToStripeCustomerID } from '../utils/stripe.js' + Sentry.AWSLambda.init({ - environment: process.env.SST_STAGE, - dsn: process.env.SENTRY_DSN, - tracesSampleRate: 1.0 + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0 }) /** @@ -25,50 +27,51 @@ Sentry.AWSLambda.init({ /** * AWS Lambda handler to process egress events from the egress traffic queue. - * Each event is a JSON object with `customer`, `resource`, `bytes` and `se`. + * Each event is a JSON object with `customer`, `resource`, `bytes` and `servedAt`. * The message is then deleted from the queue when successful. */ export const handler = Sentry.AWSLambda.wrapHandler( - /** - * @param {import('aws-lambda').SQSEvent} event - * @param {import('aws-lambda').Context} context - */ - async (event, context) => { - /** @type {CustomHandlerContext|undefined} */ - const customContext = context?.clientContext?.Custom - const region = customContext?.region ?? mustGetEnv('AWS_REGION') - const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') - const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) - - const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY - if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') - - const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME') - if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') + /** + * @param {import('aws-lambda').SQSEvent} event + * @param {import('aws-lambda').Context} context + */ + async (event, context) => { + /** @type {CustomHandlerContext|undefined} */ + const customContext = context?.clientContext?.Custom + const region = customContext?.region ?? mustGetEnv('AWS_REGION') + const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') + const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) - const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY + if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') - for (const record of event.Records) { - try { - const decoded = decodeStr(record.body) - const egressEvent = expect(decoded, 'Failed to decode egress event') + const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME') + if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') - expect( - await recordEgress(customerStore, stripe, billingMeterName, egressEvent), - `Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}` - ) + const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) + const batchItemFailures = [] + for (const record of event.Records) { + try { + const decoded = decodeStr(record.body) + const egressEvent = expect(decoded, 'Failed to decode egress event') - // TODO: delete the message from the queue? - } catch (error) { - console.error('Error processing egress event:', error) - } - } + expect( + await recordEgress(customerStore, stripe, billingMeterName, egressEvent), + `Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}` + ) + } catch (error) { + console.error('Error processing egress event:', error) + batchItemFailures.push({ itemIdentifier: record.messageId }) + } + } - return { - statusCode: 200, - body: 'Egress events processed successfully' - } - }, + return { + statusCode: 200, + body: 'Egress events processed successfully', + // Return the failed records so they can be retried + batchItemFailures + } + }, ) /** @@ -77,48 +80,50 @@ export const handler = Sentry.AWSLambda.wrapHandler( * @param {import('../lib/api.js').CustomerStore} customerStore * @param {import('stripe').Stripe} stripe * @param {string} billingMeterEventName - * @param {import('../lib/api.js').EgressTrafficData} egressEventData + * @param {import('../lib/api').EgressTrafficData} egressEventData */ async function recordEgress(customerStore, stripe, billingMeterEventName, egressEventData) { - const response = await customerStore.get({ customer: egressEventData.customer }) - if (response.error) { - return { - error: { - name: 'CustomerNotFound', - message: `Error getting customer ${egressEventData.customer}`, - cause: response.error - } - } + const response = await customerStore.get({ customer: egressEventData.customer }) + if (response.error) { + return { + error: { + name: 'CustomerNotFound', + message: `Error getting customer ${egressEventData.customer}`, + cause: response.error + } } - const stripeCustomerId = response.ok.account.slice('stripe:'.length) - /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ - const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) - if (stripeCustomer.deleted) { - return { - error: { - name: 'StripeCustomerNotFound', - message: `Customer ${stripeCustomerId} has been deleted from Stripe`, - } - } + } + + const stripeCustomerId = accountIDToStripeCustomerID(response.ok.account) + /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ + const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) + if (stripeCustomer.deleted) { + return { + error: { + name: 'StripeCustomerNotFound', + message: `Customer ${stripeCustomerId} has been deleted from Stripe`, + } } + } - // TODO (fforbeck): implement some retry logic in case rate limiting errors - /** @type {import('stripe').Stripe.Billing.MeterEvent} */ - const meterEvent = await stripe.billing.meterEvents.create({ - event_name: billingMeterEventName, - payload: { - stripe_customer_id: stripeCustomerId, - value: egressEventData.bytes.toString(), - }, - timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000) - }) - if (meterEvent.identifier) { - return { ok: { meterEvent } } - } - return { - error: { - name: 'StripeBillingMeterEventCreationFailed', - message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`, - } + /** @type {import('stripe').Stripe.Billing.MeterEvent} */ + const meterEvent = await stripe.billing.meterEvents.create({ + event_name: billingMeterEventName, + payload: { + stripe_customer_id: stripeCustomerId, + value: egressEventData.bytes.toString(), + }, + timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000) + }) + + // Identifier is only set if the event was successfully created + if (meterEvent.identifier) { + return { ok: { meterEvent } } + } + return { + error: { + name: 'StripeBillingMeterEventCreationFailed', + message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`, } + } } \ No newline at end of file diff --git a/billing/queues/egress-traffic.js b/billing/queues/egress-traffic.js index 95f79737..5add3f7c 100644 --- a/billing/queues/egress-traffic.js +++ b/billing/queues/egress-traffic.js @@ -6,4 +6,4 @@ import { encode, validate } from '../data/egress.js' * @param {{ url: URL }} context */ export const createEgressTrafficQueue = (conf, { url }) => - createQueueAdderClient(conf, { url, encode, validate }) \ No newline at end of file + createQueueAdderClient(conf, { url, encode, validate }) diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js index f2110081..01321ddf 100644 --- a/billing/test/helpers/egress.js +++ b/billing/test/helpers/egress.js @@ -5,9 +5,9 @@ import { randomLink } from './dag.js' * @returns {import('../../lib/api').EgressTrafficData} */ export const randomEgressEvent = (customer) => ({ - customer: customer.customer, - resource: randomLink(), - bytes: BigInt(Math.floor(Math.random() * 1000000)), - // Random timestamp within the last 1 hour - servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)), -}) \ No newline at end of file + customer: customer.customer, + resource: randomLink(), + bytes: BigInt(Math.floor(Math.random() * 1000000)), + // Random timestamp within the last 1 hour + servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)) +}) diff --git a/billing/test/lib/egress-traffic.js b/billing/test/lib/egress-traffic.js index 3d3cabab..189bb250 100644 --- a/billing/test/lib/egress-traffic.js +++ b/billing/test/lib/egress-traffic.js @@ -6,104 +6,104 @@ import * as DidMailto from '@web3-storage/did-mailto' /** @type {import('./api').TestSuite} */ export const test = { - /** - * @param {import('entail').assert} assert - * @param {import('./api').EgressTrafficTestContext} ctx - */ - 'should process all the egress traffic events from the queue': async (assert, ctx) => { - let stripeCustomerId; - try { - // 0. Create a test customer email, add it to stripe and to the customer store - const didMailto = randomDIDMailto() - const email = DidMailto.toEmail(/** @type {`did:mailto:${string}:${string}`} */(didMailto)) - const stripeCustomer = await ctx.stripe.customers.create({ email }) - assert.ok(stripeCustomer.id, 'Error adding customer to stripe') - stripeCustomerId = stripeCustomer.id + /** + * @param {import('entail').assert} assert + * @param {import('./api').EgressTrafficTestContext} ctx + */ + 'should process all the egress traffic events from the queue': async (assert, ctx) => { + let stripeCustomerId; + try { + // 0. Create a test customer email, add it to stripe and to the customer store + const didMailto = randomDIDMailto() + const email = DidMailto.toEmail(/** @type {`did:mailto:${string}:${string}`} */(didMailto)) + const stripeCustomer = await ctx.stripe.customers.create({ email }) + assert.ok(stripeCustomer.id, 'Error adding customer to stripe') + stripeCustomerId = stripeCustomer.id - const customer = randomCustomer({ - customer: didMailto, - /** @type {`stripe:${string}`} */ - account: `stripe:${stripeCustomerId}` - }) - const { error } = await ctx.customerStore.put(customer) - assert.ok(!error, 'Error adding customer') + const customer = randomCustomer({ + customer: didMailto, + /** @type {`stripe:${string}`} */ + account: `stripe:${stripeCustomerId}` + }) + const { error } = await ctx.customerStore.put(customer) + assert.ok(!error, 'Error adding customer') - // 1. Add egress events to the queue to simulate egress traffic from the Freeway worker - const maxEvents = 10 - /** @type {import('../../lib/api').EgressTrafficData[]} */ - const events = await Promise.all( - Array.from( - { length: maxEvents }, - () => randomEgressEvent(customer) - ) - ) + // 1. Add egress events to the queue to simulate egress traffic from the Freeway worker + const maxEvents = 10 + /** @type {import('../../lib/api').EgressTrafficData[]} */ + const events = await Promise.all( + Array.from( + { length: maxEvents }, + () => randomEgressEvent(customer) + ) + ) - for (const e of events) { - console.log(`Egress traffic for ${e.customer}, bytes: ${e.bytes}, servedAt: ${e.servedAt.toISOString()}, `) - const result = await ctx.egressTrafficQueue.add(e) - assert.ok(!result.error, 'Error adding egress event to the queue') - } + for (const e of events) { + console.log(`Egress traffic for ${e.customer}, bytes: ${e.bytes}, servedAt: ${e.servedAt.toISOString()}, `) + const result = await ctx.egressTrafficQueue.add(e) + assert.ok(!result.error, 'Error adding egress event to the queue') + } - // 2. Create a SQS event batch - // @type {import('aws-lambda').SQSEvent} - const sqsEventBatch = { - Records: events.map(e => ({ - // @type {import('aws-lambda').SQSRecord} - body: encode(e).ok ?? '', - messageId: Math.random().toString(), - receiptHandle: Math.random().toString(), - awsRegion: ctx.region, - eventSource: 'aws:sqs', - eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, - awsAccountId: ctx.accountId, - md5OfBody: '', - md5OfMessageAttributes: '', - attributes: { - ApproximateReceiveCount: '1', - SentTimestamp: e.servedAt.getTime().toString(), - SenderId: ctx.accountId, - ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), - }, - messageAttributes: {}, - })) - } + // 2. Create a SQS event batch + // @type {import('aws-lambda').SQSEvent} + const sqsEventBatch = { + Records: events.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: encode(e).ok ?? '', + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.servedAt.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), + }, + messageAttributes: {}, + })) + } - // 3. Process the SQS event to trigger the handler using the custom context - const customCtx = { - clientContext: { - Custom: ctx, - }, - } - // @ts-expect-error -- Don't need to initialize the full lambda context for testing - await ctx.egressTrafficHandler(sqsEventBatch, customCtx, (err, res) => { - if (err) { - assert.fail(err) - } - assert.ok(res) - assert.equal(res.statusCode, 200) - assert.equal(res.body, 'Egress events processed successfully') - }) + // 3. Process the SQS event to trigger the handler using the custom context + const customCtx = { + clientContext: { + Custom: ctx, + }, + } + // @ts-expect-error -- Don't need to initialize the full lambda context for testing + await ctx.egressTrafficHandler(sqsEventBatch, customCtx, (err, res) => { + if (err) { + assert.fail(err) + } + assert.ok(res) + assert.equal(res.statusCode, 200) + assert.equal(res.body, 'Egress events processed successfully') + }) - // 4. Check if the aggregated meter event exists and has a value greater than 0 - const aggregatedMeterEvent = await ctx.stripe.billing.meters.listEventSummaries( - ctx.billingMeterId, - { - customer: stripeCustomerId, - start_time: Math.floor(events[0].servedAt.getTime() / 1000), - end_time: Math.floor(Date.now() / 1000), - } - ) - assert.ok(aggregatedMeterEvent.data, 'No aggregated meter event found') - assert.equal(aggregatedMeterEvent.data.length, 1, 'Expected 1 aggregated meter event') - // We can't verify the total bytes served because the meter events are not immediately available in stripe - // and the test would fail intermittently - assert.ok(aggregatedMeterEvent.data[0].aggregated_value > 0, 'Aggregated value is 0') - } finally { - if (stripeCustomerId) { - // 5. Delete the test customer from stripe - const deletedCustomer = await ctx.stripe.customers.del(stripeCustomerId); - assert.ok(deletedCustomer.deleted, 'Error deleting customer from stripe') - } + // 4. Check if the aggregated meter event exists and has a value greater than 0 + const aggregatedMeterEvent = await ctx.stripe.billing.meters.listEventSummaries( + ctx.billingMeterId, + { + customer: stripeCustomerId, + start_time: Math.floor(events[0].servedAt.getTime() / 1000), + end_time: Math.floor(Date.now() / 1000), } + ) + assert.ok(aggregatedMeterEvent.data, 'No aggregated meter event found') + assert.equal(aggregatedMeterEvent.data.length, 1, 'Expected 1 aggregated meter event') + // We can't verify the total bytes served because the meter events are not immediately available in stripe + // and the test would fail intermittently + assert.ok(aggregatedMeterEvent.data[0].aggregated_value > 0, 'Aggregated value is 0') + } finally { + if (stripeCustomerId) { + // 5. Delete the test customer from stripe + const deletedCustomer = await ctx.stripe.customers.del(stripeCustomerId); + assert.ok(deletedCustomer.deleted, 'Error deleting customer from stripe') + } } + } } \ No newline at end of file diff --git a/billing/test/utils/stripe.js b/billing/test/utils/stripe.js index 72828296..2b7c6206 100644 --- a/billing/test/utils/stripe.js +++ b/billing/test/utils/stripe.js @@ -1,4 +1,7 @@ -import { handleCustomerSubscriptionCreated } from '../../utils/stripe.js' +import { + accountIDToStripeCustomerID, + stripeIDToAccountID, + handleCustomerSubscriptionCreated } from '../../utils/stripe.js' import * as DidMailto from '@web3-storage/did-mailto' @@ -44,5 +47,17 @@ export const test = { assert.ok(result.ok) const customerRecord = await ctx.customerStore.get({ customer }) assert.equal(customerRecord.ok?.product, product) + }, + + 'should convert an account ID to a stripe customer ID': (/** @type {import('entail').assert} */ assert) => { + const accountID = 'stripe:cus_1234567890' + const stripeCustomerId = accountIDToStripeCustomerID(accountID) + assert.equal(stripeCustomerId, 'cus_1234567890') + }, + + 'should convert a stripe customer ID to an account ID': (/** @type {import('entail').assert} */ assert) => { + const stripeCustomerId = 'cus_1234567890' + const accountID = stripeIDToAccountID(stripeCustomerId) + assert.equal(accountID, 'stripe:cus_1234567890') } } diff --git a/billing/utils/stripe.js b/billing/utils/stripe.js index b47b3ded..5fe8124e 100644 --- a/billing/utils/stripe.js +++ b/billing/utils/stripe.js @@ -7,6 +7,9 @@ import * as DidMailto from '@web3-storage/did-mailto' */ /** + * Converts a Stripe customer ID to an Account ID. + * e.g: + * cus_1234567890 -> stripe:cus_1234567890 * * @param {string} stripeID * @returns {AccountID} @@ -15,6 +18,17 @@ export function stripeIDToAccountID(stripeID) { return /** @type {AccountID} */(`stripe:${stripeID}`) } +/** + * Converts an Account ID to a Stripe customer ID. + * e.g: + * stripe:cus_1234567890 -> cus_1234567890 + * + * @param {AccountID} accountID + * @returns {string} + */ +export const accountIDToStripeCustomerID = (accountID) => accountID.slice('stripe:'.length) + + /** * * @param {Stripe} stripe From 9952ff834e7e3fb060da75c43a3e2304459e5e52 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Tue, 22 Oct 2024 16:00:30 -0300 Subject: [PATCH 07/15] removed reserved env var --- stacks/billing-stack.js | 1 - 1 file changed, 1 deletion(-) diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index 669198e7..5d6d1a09 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -186,7 +186,6 @@ export function BillingStack ({ stack, app }) { timeout: '15 minutes', bind: [stripeSecretKey], environment: { - AWS_REGION: stack.region, CUSTOMER_TABLE_NAME: customerTable.tableName, // TODO (fforbeck): make this a config based on the env: local, staging, prod STRIPE_BILLING_METER_EVENT_NAME: 'test-gateway-egress-traffic' From 5f14548f8c9143ef17f2208f45ef2ffe40f07b90 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Tue, 22 Oct 2024 16:41:07 -0300 Subject: [PATCH 08/15] fix function handler name --- .../{egress-traffic-handler.js => egress-traffic-queue.js} | 4 ++-- billing/test/helpers/context.js | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename billing/functions/{egress-traffic-handler.js => egress-traffic-queue.js} (97%) diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-queue.js similarity index 97% rename from billing/functions/egress-traffic-handler.js rename to billing/functions/egress-traffic-queue.js index 6d4c2b4d..a34eae8e 100644 --- a/billing/functions/egress-traffic-handler.js +++ b/billing/functions/egress-traffic-queue.js @@ -21,7 +21,7 @@ Sentry.AWSLambda.init({ * customerTable?: string * billingMeterName?: string * stripeSecretKey?: string - * customerStore?: import('../lib/api').CustomerStore + * customerStore?: import('../lib/api.js').CustomerStore * }} CustomHandlerContext */ @@ -80,7 +80,7 @@ export const handler = Sentry.AWSLambda.wrapHandler( * @param {import('../lib/api.js').CustomerStore} customerStore * @param {import('stripe').Stripe} stripe * @param {string} billingMeterEventName - * @param {import('../lib/api').EgressTrafficData} egressEventData + * @param {import('../lib/api.js').EgressTrafficData} egressEventData */ async function recordEgress(customerStore, stripe, billingMeterEventName, egressEventData) { const response = await customerStore.get({ customer: egressEventData.customer }) diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 0ee2a2cc..3cf5de4d 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -20,7 +20,7 @@ import { createSpaceSnapshotStore, spaceSnapshotTableProps } from '../../tables/ import { createUsageStore, usageTableProps } from '../../tables/usage.js' import { createQueueRemoverClient } from './queue.js' import { createEgressTrafficQueue } from '../../queues/egress-traffic.js' -import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js' +import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-queue.js' import Stripe from 'stripe' dotenv.config({ path: path.resolve('../.env.local'), override: true, debug: true }) From ff0f52abc1c4a6b120811ab961858fb1ed4fa76b Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Tue, 22 Oct 2024 16:48:51 -0300 Subject: [PATCH 09/15] fix visibility timeout --- stacks/billing-stack.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index 5d6d1a09..c34d6bc6 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -202,7 +202,7 @@ export function BillingStack ({ stack, app }) { deadLetterQueue: egressTrafficDLQ.cdk.queue, cdk: { eventSource: { batchSize: 1 } } }, - cdk: { queue: { visibilityTimeout: Duration.seconds(60) } } + cdk: { queue: { visibilityTimeout: Duration.minutes(15) } } }) stack.addOutputs({ From a8776a713ed0698d195338d9e215efca12acbfb7 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Wed, 23 Oct 2024 09:38:00 -0300 Subject: [PATCH 10/15] fix billing meter event name --- stacks/billing-stack.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index c34d6bc6..fab37ba9 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -187,8 +187,8 @@ export function BillingStack ({ stack, app }) { bind: [stripeSecretKey], environment: { CUSTOMER_TABLE_NAME: customerTable.tableName, - // TODO (fforbeck): make this a config based on the env: local, staging, prod - STRIPE_BILLING_METER_EVENT_NAME: 'test-gateway-egress-traffic' + // Billing Meter Event Name for Stripe Test and Production APIs + STRIPE_BILLING_METER_EVENT_NAME: 'gateway-egress-traffic' } }) From a7a531dcf82c54b764aed7f731aea49ef3859bfd Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Wed, 23 Oct 2024 17:06:26 -0300 Subject: [PATCH 11/15] saving egress traffic data into dynamo --- billing/data/egress.js | 57 +++++++++++++- billing/functions/egress-traffic-queue.js | 90 +++++++---------------- billing/lib/api.ts | 14 ++++ billing/queues/egress-traffic.js | 4 +- billing/tables/egress-traffic.js | 42 +++++++++++ billing/test/helpers/context.js | 15 +++- billing/test/helpers/egress.js | 9 ++- billing/test/lib/api.ts | 5 +- billing/test/lib/egress-traffic.js | 6 +- billing/utils/stripe.js | 47 ++++++++++++ upload-api/stores/usage.js | 8 +- 11 files changed, 220 insertions(+), 77 deletions(-) create mode 100644 billing/tables/egress-traffic.js diff --git a/billing/data/egress.js b/billing/data/egress.js index 37d68c3d..66d68ade 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -1,27 +1,49 @@ import { Link } from '@ucanto/server' import { DecodeFailure, EncodeFailure, Schema } from './lib.js' +/** + * @typedef { import('../types').InferStoreRecord } EgressTrafficStoreRecord + * @typedef { import('../types').InferStoreRecord } EgressTrafficKeyStoreRecord + */ + export const egressSchema = Schema.struct({ + space: Schema.did({ method: 'key' }), customer: Schema.did({ method: 'mailto' }), resource: Schema.link(), bytes: Schema.bigint(), servedAt: Schema.date(), + cause: Schema.link(), }) /** @type {import('../lib/api').Validator} */ export const validate = input => egressSchema.read(input) -/** @type {import('../lib/api').Encoder} */ +/** @type {import('../lib/api').Encoder} */ export const encode = input => { try { return { - ok: JSON.stringify({ + ok: { + space: input.space.toString(), customer: input.customer.toString(), resource: input.resource.toString(), bytes: input.bytes.toString(), servedAt: input.servedAt.toISOString(), - }) + cause: input.cause.toString(), + } + } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) } + } +} + +/** @type {import('../lib/api').Encoder} */ +export const encodeStr = input => { + try { + const data = encode(input) + if (data.error) throw data.error + return { ok: JSON.stringify(data.ok) } } catch (/** @type {any} */ err) { return { error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) @@ -34,10 +56,12 @@ export const decode = input => { try { return { ok: { + space: Schema.did({ method: 'key' }).from(input.space), customer: Schema.did({ method: 'mailto' }).from(input.customer), resource: Link.parse(/** @type {string} */(input.resource)), bytes: BigInt(input.bytes), servedAt: new Date(input.servedAt), + cause: Link.parse(/** @type {string} */(input.cause)), } } } catch (/** @type {any} */ err) { @@ -56,4 +80,31 @@ export const decodeStr = input => { error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err }) } } +} + +export const lister = { + /** @type {import('../lib/api').Encoder} */ + encodeKey: input => ({ + ok: { + space: input.space.toString(), + customer: input.customer.toString(), + from: input.from.toISOString() + } + }), + /** @type {import('../lib/api').Decoder} */ + decodeKey: input => { + try { + return { + ok: { + space: Schema.did({ method: 'key' }).from(input.space), + customer: Schema.did({ method: 'mailto' }).from(input.customer), + from: new Date(input.from) + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress traffic event list key: ${err.message}`, { cause: err }) + } + } + } } \ No newline at end of file diff --git a/billing/functions/egress-traffic-queue.js b/billing/functions/egress-traffic-queue.js index a34eae8e..34a565f0 100644 --- a/billing/functions/egress-traffic-queue.js +++ b/billing/functions/egress-traffic-queue.js @@ -5,7 +5,8 @@ import { mustGetEnv } from '../../lib/env.js' import { createCustomerStore } from '../tables/customer.js' import Stripe from 'stripe' import { Config } from 'sst/node/config' -import { accountIDToStripeCustomerID } from '../utils/stripe.js' +import { recordBillingMeterEvent } from '../utils/stripe.js' +import { createEgressTrafficEventStore } from '../tables/egress-traffic.js' Sentry.AWSLambda.init({ @@ -22,6 +23,8 @@ Sentry.AWSLambda.init({ * billingMeterName?: string * stripeSecretKey?: string * customerStore?: import('../lib/api.js').CustomerStore + * egressTrafficTable?: string + * egressTrafficEventStore?: import('../lib/api.js').EgressTrafficEventStore * }} CustomHandlerContext */ @@ -41,7 +44,9 @@ export const handler = Sentry.AWSLambda.wrapHandler( const region = customContext?.region ?? mustGetEnv('AWS_REGION') const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) - + const egressTrafficTable = customContext?.egressTrafficTable ?? mustGetEnv('EGRESS_TRAFFIC_TABLE_NAME') + const egressTrafficEventStore = customContext?.egressTrafficEventStore ?? createEgressTrafficEventStore({ region }, { tableName: egressTrafficTable }) + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') @@ -49,19 +54,34 @@ export const handler = Sentry.AWSLambda.wrapHandler( if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) - const batchItemFailures = [] + const batchItemFailures = [] for (const record of event.Records) { try { const decoded = decodeStr(record.body) - const egressEvent = expect(decoded, 'Failed to decode egress event') + const egressData = expect(decoded, 'Failed to decode egress event') + + const putResult = await egressTrafficEventStore.put(egressData) + if (putResult.error) throw putResult.error + + const response = await customerStore.get({ customer: egressData.customer }) + if (response.error) { + return { + error: { + name: 'CustomerNotFound', + message: `Error getting customer ${egressData.customer}`, + cause: response.error + } + } + } + const customerAccount = response.ok.account expect( - await recordEgress(customerStore, stripe, billingMeterName, egressEvent), - `Failed to send record usage to Stripe for customer: ${egressEvent.customer}, resource: ${egressEvent.resource}, servedAt: ${egressEvent.servedAt.toISOString()}` + await recordBillingMeterEvent(stripe, billingMeterName, egressData, customerAccount), + `Failed to record egress event in Stripe API for customer: ${egressData.customer}, account: ${customerAccount}, bytes: ${egressData.bytes}, servedAt: ${egressData.servedAt.toISOString()}, resource: ${egressData.resource}` ) } catch (error) { console.error('Error processing egress event:', error) - batchItemFailures.push({ itemIdentifier: record.messageId }) + batchItemFailures.push({ itemIdentifier: record.messageId }) } } @@ -72,58 +92,4 @@ export const handler = Sentry.AWSLambda.wrapHandler( batchItemFailures } }, -) - -/** - * Finds the Stripe customer ID for the given customer and records the egress traffic data in the Stripe Billing Meter API. - * - * @param {import('../lib/api.js').CustomerStore} customerStore - * @param {import('stripe').Stripe} stripe - * @param {string} billingMeterEventName - * @param {import('../lib/api.js').EgressTrafficData} egressEventData - */ -async function recordEgress(customerStore, stripe, billingMeterEventName, egressEventData) { - const response = await customerStore.get({ customer: egressEventData.customer }) - if (response.error) { - return { - error: { - name: 'CustomerNotFound', - message: `Error getting customer ${egressEventData.customer}`, - cause: response.error - } - } - } - - const stripeCustomerId = accountIDToStripeCustomerID(response.ok.account) - /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ - const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) - if (stripeCustomer.deleted) { - return { - error: { - name: 'StripeCustomerNotFound', - message: `Customer ${stripeCustomerId} has been deleted from Stripe`, - } - } - } - - /** @type {import('stripe').Stripe.Billing.MeterEvent} */ - const meterEvent = await stripe.billing.meterEvents.create({ - event_name: billingMeterEventName, - payload: { - stripe_customer_id: stripeCustomerId, - value: egressEventData.bytes.toString(), - }, - timestamp: Math.floor(egressEventData.servedAt.getTime() / 1000) - }) - - // Identifier is only set if the event was successfully created - if (meterEvent.identifier) { - return { ok: { meterEvent } } - } - return { - error: { - name: 'StripeBillingMeterEventCreationFailed', - message: `Error creating meter event for egress traffic in Stripe for customer ${egressEventData.customer} @ ${egressEventData.servedAt.toISOString()}`, - } - } -} \ No newline at end of file +) \ No newline at end of file diff --git a/billing/lib/api.ts b/billing/lib/api.ts index c8bebd92..3aeb5e38 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -133,6 +133,11 @@ export interface UsageListKey { customer: CustomerDID, from: Date } export type UsageStore = StorePutter +/** + * Store for egress traffic data. + */ +export type EgressTrafficEventStore = StorePutter & StoreLister + // Billing queues ///////////////////////////////////////////////////////////// /** @@ -162,6 +167,8 @@ export type CustomerBillingQueue = QueueAdder * Captures details about egress traffic that should be billed for a given period */ export interface EgressTrafficData { + /** Space DID (did:key:...). */ + space: ConsumerDID /** Customer DID (did:mailto:...). */ customer: CustomerDID /** Resource that was served. */ @@ -170,6 +177,8 @@ export interface EgressTrafficData { bytes: bigint /** Time the egress traffic was served at. */ servedAt: Date + /** UCAN invocation IDthat caused the egress traffic. */ + cause: UnknownLink } /** @@ -177,6 +186,11 @@ export interface EgressTrafficData { */ export type EgressTrafficQueue = QueueAdder +/** + * List key for egress traffic data. + */ +export interface EgressTrafficEventListKey { space: ConsumerDID, customer: CustomerDID, from: Date } + /** * Captures details about a space that should be billed for a given customer in * the given period of usage. diff --git a/billing/queues/egress-traffic.js b/billing/queues/egress-traffic.js index 5add3f7c..173b79d2 100644 --- a/billing/queues/egress-traffic.js +++ b/billing/queues/egress-traffic.js @@ -1,9 +1,9 @@ import { createQueueAdderClient } from './client.js' -import { encode, validate } from '../data/egress.js' +import { encodeStr, validate } from '../data/egress.js' /** * @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf * @param {{ url: URL }} context */ export const createEgressTrafficQueue = (conf, { url }) => - createQueueAdderClient(conf, { url, encode, validate }) + createQueueAdderClient(conf, { url, encode: encodeStr, validate }) diff --git a/billing/tables/egress-traffic.js b/billing/tables/egress-traffic.js new file mode 100644 index 00000000..fadd160c --- /dev/null +++ b/billing/tables/egress-traffic.js @@ -0,0 +1,42 @@ +import { createStorePutterClient, createStoreListerClient } from './client.js' +import { validate, encode, lister, decode } from '../data/egress.js' + +/** + * Source of truth for egress traffic data. + * + * @type {import('sst/constructs').TableProps} + */ +export const egressTrafficTableProps = { + fields: { + /** Space DID (did:key:...). */ + space: 'string', + /** Customer DID (did:mailto:...). */ + customer: 'string', + /** Resource CID. */ + resource: 'string', + /** ISO timestamp of the event. */ + servedAt: 'string', + /** Bytes served. */ + bytes: 'number', + /** UCAN invocation ID that caused the egress traffic. */ + cause: 'string', + }, + primaryIndex: { partitionKey: 'space', sortKey: 'servedAt' }, + globalIndexes: { + customer: { + partitionKey: 'customer', + sortKey: 'servedAt', + projection: ['space', 'resource', 'bytes', 'cause', 'servedAt'] + } + } +} + +/** + * @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf + * @param {{ tableName: string }} context + * @returns {import('../lib/api.js').EgressTrafficEventStore} + */ +export const createEgressTrafficEventStore = (conf, { tableName }) => ({ + ...createStorePutterClient(conf, { tableName, validate, encode }), + ...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode }) +}) \ No newline at end of file diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 3cf5de4d..ea321a73 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -8,7 +8,7 @@ import { decode as decodeSpaceBillingInstruction } from '../../data/space-billin import { encode as encodeSubscription, validate as validateSubscription } from '../../data/subscription.js' import { encode as encodeConsumer, validate as validateConsumer } from '../../data/consumer.js' import { decode as decodeUsage, lister as usageLister } from '../../data/usage.js' -import { decodeStr as decodeEgressTrafficEvent } from '../../data/egress.js' +import { decodeStr as decodeEgressTrafficEvent, validate as validateEgressTrafficEvent, encode as encodeEgressTrafficEvent } from '../../data/egress.js' import { createCustomerBillingQueue } from '../../queues/customer.js' import { createSpaceBillingQueue } from '../../queues/space.js' import { consumerTableProps, subscriptionTableProps } from '../../../upload-api/tables/index.js' @@ -22,6 +22,7 @@ import { createQueueRemoverClient } from './queue.js' import { createEgressTrafficQueue } from '../../queues/egress-traffic.js' import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-queue.js' import Stripe from 'stripe' +import { createEgressTrafficEventStore, egressTrafficTableProps } from '../../tables/egress-traffic.js' dotenv.config({ path: path.resolve('../.env.local'), override: true, debug: true }) @@ -190,6 +191,16 @@ export const createEgressTrafficTestContext = async () => { }) } + const egressTrafficTable = await createTable(awsServices.dynamo.client, egressTrafficTableProps, 'egress-traffic-') + const egressTrafficEventStore = { + ...createEgressTrafficEventStore(awsServices.dynamo.client, { tableName: egressTrafficTable }), + ...createStorePutterClient(awsServices.dynamo.client, { + tableName: egressTrafficTable, + validate: validateEgressTrafficEvent, // assume test data is valid + encode: encodeEgressTrafficEvent + }) + } + const { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } = createStripeService() // @ts-expect-error -- Don't need to initialize the full lambda context for testing @@ -201,6 +212,8 @@ export const createEgressTrafficTestContext = async () => { region: region ?? '', customerTable, customerStore, + egressTrafficTable, + egressTrafficEventStore, billingMeterEventName, billingMeterId, stripeSecretKey, diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js index 01321ddf..9baa94fd 100644 --- a/billing/test/helpers/egress.js +++ b/billing/test/helpers/egress.js @@ -1,13 +1,16 @@ import { randomLink } from './dag.js' +import { randomDID } from './did.js' /** * @param {import('../../lib/api').Customer} customer - * @returns {import('../../lib/api').EgressTrafficData} + * @returns {Promise} */ -export const randomEgressEvent = (customer) => ({ +export const randomEgressEvent = async (customer) => ({ + space: await randomDID(), customer: customer.customer, resource: randomLink(), bytes: BigInt(Math.floor(Math.random() * 1000000)), // Random timestamp within the last 1 hour - servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)) + servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)), + cause: randomLink() }) diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index 081e1186..e89ea108 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -20,7 +20,8 @@ import { UsageListKey, Usage, EgressTrafficQueue, - EgressTrafficData + EgressTrafficData, + EgressTrafficEventStore } from '../../lib/api.js' import { Context, Handler, SQSEvent } from 'aws-lambda' import Stripe from 'stripe' @@ -60,6 +61,8 @@ export interface EgressTrafficTestContext extends Context { region: string customerTable: string customerStore: CustomerStore + egressTrafficTable: string + egressTrafficEventStore: EgressTrafficEventStore billingMeterEventName: string billingMeterId: string stripeSecretKey: string diff --git a/billing/test/lib/egress-traffic.js b/billing/test/lib/egress-traffic.js index 189bb250..a5e27009 100644 --- a/billing/test/lib/egress-traffic.js +++ b/billing/test/lib/egress-traffic.js @@ -1,4 +1,4 @@ -import { encode } from '../../data/egress.js' +import { encodeStr } from '../../data/egress.js' import { randomCustomer } from '../helpers/customer.js' import { randomDIDMailto } from '../helpers/did.js' import { randomEgressEvent } from '../helpers/egress.js' @@ -34,7 +34,7 @@ export const test = { const events = await Promise.all( Array.from( { length: maxEvents }, - () => randomEgressEvent(customer) + async () => await randomEgressEvent(customer) ) ) @@ -49,7 +49,7 @@ export const test = { const sqsEventBatch = { Records: events.map(e => ({ // @type {import('aws-lambda').SQSRecord} - body: encode(e).ok ?? '', + body: encodeStr(e).ok ?? '', messageId: Math.random().toString(), receiptHandle: Math.random().toString(), awsRegion: ctx.region, diff --git a/billing/utils/stripe.js b/billing/utils/stripe.js index 5fe8124e..5f05934d 100644 --- a/billing/utils/stripe.js +++ b/billing/utils/stripe.js @@ -64,3 +64,50 @@ export async function handleCustomerSubscriptionCreated(stripe, event, customerS }) } } + +/** + * Records an egress traffic event in the Stripe Billing Meter API for the given customer account. + * + * @param {import('stripe').Stripe} stripe + * @param {string} billingMeterEventName + * @param {import('../lib/api.js').EgressTrafficData} egressData + * @param {AccountID} customerAccount + */ +export async function recordBillingMeterEvent(stripe, billingMeterEventName, egressData, customerAccount) { + const stripeCustomerId = accountIDToStripeCustomerID(customerAccount) + /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ + const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) + if (stripeCustomer.deleted) { + return { + error: { + name: 'StripeCustomerNotFound', + message: `Customer ${stripeCustomerId} has been deleted from Stripe`, + } + } + } + + /** @type {import('stripe').Stripe.Billing.MeterEvent} */ + const meterEvent = await stripe.billing.meterEvents.create({ + event_name: billingMeterEventName, + payload: { + stripe_customer_id: stripeCustomerId, + value: egressData.bytes.toString(), + }, + timestamp: Math.floor(egressData.servedAt.getTime() / 1000), + }, + { + idempotencyKey: `${egressData.servedAt.toISOString()}-${egressData.space}-${egressData.customer}-${egressData.resource}` + } + ) + + // Identifier is only set if the event was successfully created + if (meterEvent.identifier) { + return { ok: { meterEvent } } + } + return { + error: { + name: 'StripeBillingMeterEventCreationFailed', + message: `Error creating meter event for egress traffic in Stripe for customer ${egressData.customer} @ ${egressData.servedAt.toISOString()}`, + } + } +} \ No newline at end of file diff --git a/upload-api/stores/usage.js b/upload-api/stores/usage.js index c4d38059..6617f0df 100644 --- a/upload-api/stores/usage.js +++ b/upload-api/stores/usage.js @@ -63,18 +63,22 @@ export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTraffi /** * Handle egress traffic data and enqueues it, so the billing system can process it and update the Stripe Billing Meter API. * + * @param {import('@web3-storage/upload-api').SpaceDID} space * @param {import('@web3-storage/upload-api').AccountDID} customer * @param {import('@web3-storage/upload-api').UnknownLink} resource * @param {bigint} bytes * @param {Date} servedAt + * @param {import('@web3-storage/upload-api').UnknownLink} cause * @returns {Promise>} */ - async record(customer, resource, bytes, servedAt) { + async record(space, customer, resource, bytes, servedAt, cause) { const record = { + space, customer, resource, bytes, - servedAt + servedAt, + cause } const result = await egressTrafficQueue.add(record) From 7ae26c470b7b8346271acbf5957a3d2141c08906 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Thu, 24 Oct 2024 09:53:56 -0300 Subject: [PATCH 12/15] egress-traffic table config --- stacks/billing-db-stack.js | 7 +++++-- stacks/billing-stack.js | 2 ++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/stacks/billing-db-stack.js b/stacks/billing-db-stack.js index f731a423..42712a2b 100644 --- a/stacks/billing-db-stack.js +++ b/stacks/billing-db-stack.js @@ -3,6 +3,7 @@ import { customerTableProps } from '../billing/tables/customer.js' import { spaceDiffTableProps } from '../billing/tables/space-diff.js' import { spaceSnapshotTableProps } from '../billing/tables/space-snapshot.js' import { usageTableProps } from '../billing/tables/usage.js' +import { egressTrafficTableProps } from '../billing/tables/egress-traffic.js' /** * @param {import('sst/constructs').StackContext} properties @@ -15,15 +16,17 @@ export const BillingDbStack = ({ stack }) => { ...usageTableProps, stream: 'new_image' }) + const egressTrafficTable = new Table(stack, 'egress-traffic', egressTrafficTableProps) stack.addOutputs({ customerTableName: customerTable.tableName, spaceSnapshotTableName: spaceSnapshotTable.tableName, spaceDiffTableName: spaceDiffTable.tableName, - usageTable: usageTable.tableName + usageTable: usageTable.tableName, + egressTrafficTableName: egressTrafficTable.tableName }) const stripeSecretKey = new Config.Secret(stack, 'STRIPE_SECRET_KEY') - return { customerTable, spaceSnapshotTable, spaceDiffTable, usageTable, stripeSecretKey } + return { customerTable, spaceSnapshotTable, spaceDiffTable, usageTable, egressTrafficTable, stripeSecretKey } } diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index fab37ba9..8ae4c2d5 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -17,6 +17,7 @@ export function BillingStack ({ stack, app }) { spaceSnapshotTable, spaceDiffTable, usageTable, + egressTrafficTable, stripeSecretKey } = use(BillingDbStack) const { subscriptionTable, consumerTable } = use(UploadDbStack) @@ -187,6 +188,7 @@ export function BillingStack ({ stack, app }) { bind: [stripeSecretKey], environment: { CUSTOMER_TABLE_NAME: customerTable.tableName, + EGRESS_TRAFFIC_TABLE_NAME: egressTrafficTable.tableName, // Billing Meter Event Name for Stripe Test and Production APIs STRIPE_BILLING_METER_EVENT_NAME: 'gateway-egress-traffic' } From 68f0a499a9e2e71da12a5471d7fc8f51641ba3af Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Thu, 24 Oct 2024 15:15:02 -0300 Subject: [PATCH 13/15] update packages and fix egress handler permissions --- billing/package.json | 2 +- package-lock.json | 154 +++++++++++++--------------------------- package.json | 6 +- stacks/billing-stack.js | 2 +- upload-api/package.json | 4 +- 5 files changed, 57 insertions(+), 111 deletions(-) diff --git a/billing/package.json b/billing/package.json index 48177251..6bef96da 100644 --- a/billing/package.json +++ b/billing/package.json @@ -14,7 +14,7 @@ "@sentry/serverless": "^7.74.1", "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.4.0", "big.js": "^6.2.1", "lru-cache": "^11.0.0", "multiformats": "^13.1.0", diff --git a/package-lock.json b/package-lock.json index 281a02e1..5ef5964b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -47,12 +47,12 @@ "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^20.1.0", "@web3-storage/blob-index": "^1.0.2", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/data-segment": "5.1.0", "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.0", - "@web3-storage/w3up-client": "^14.1.1", + "@web3-storage/upload-client": "^17.1.0", + "@web3-storage/w3up-client": "^16.4.0", "ava": "^4.3.3", "chalk": "4.1.2", "constructs": "10.3.0", @@ -79,7 +79,7 @@ "@sentry/serverless": "^7.74.1", "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.4.0", "big.js": "^6.2.1", "lru-cache": "^11.0.0", "multiformats": "^13.1.0", @@ -7947,7 +7947,6 @@ "version": "9.2.1", "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-9.2.1.tgz", "integrity": "sha512-nyY48yE7r3dnJVlxrdaimrbloh4RokQaNRdI//btfTkcTEZbpmSrbYcBQ4VKTf8ZxXAOUJy4VsRpkJo+y9RTnA==", - "license": "Apache-2.0 OR MIT", "dependencies": { "cborg": "^4.0.0", "multiformats": "^13.1.0" @@ -11427,8 +11426,7 @@ "node_modules/@storacha/one-webcrypto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/@storacha/one-webcrypto/-/one-webcrypto-1.0.1.tgz", - "integrity": "sha512-bD+vWmcgsEBqU0Dz04BR43SA03bBoLTAY29vaKasY9Oe8cb6XIP0/vkm0OS2UwKC13c8uRgFW4rjJUgDCNLejQ==", - "license": "MIT" + "integrity": "sha512-bD+vWmcgsEBqU0Dz04BR43SA03bBoLTAY29vaKasY9Oe8cb6XIP0/vkm0OS2UwKC13c8uRgFW4rjJUgDCNLejQ==" }, "node_modules/@trpc/server": { "version": "9.16.0", @@ -12204,7 +12202,6 @@ "version": "20.1.0", "resolved": "https://registry.npmjs.org/@web3-storage/access/-/access-20.1.0.tgz", "integrity": "sha512-IY6ICPRWE8++2jxvy+LzAiFvwAOIHR8cu9eNt+VT5sAFE796o4ma7GSU0eXRCiShmV2n6iSWAwWRT6XD5zIqPA==", - "license": "(Apache-2.0 OR MIT)", "dependencies": { "@ipld/car": "^5.1.1", "@ipld/dag-ucan": "^3.4.0", @@ -12236,14 +12233,15 @@ } }, "node_modules/@web3-storage/blob-index": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/@web3-storage/blob-index/-/blob-index-1.0.3.tgz", - "integrity": "sha512-VjGLhf6Gf4ZmzjJXS6wU4aRvnM+HLcuRCJHegjQ36ka52sR2WWOcqDNNVvabtlpnYjGtVFQCPUzaCcs18wpqHQ==", + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@web3-storage/blob-index/-/blob-index-1.0.4.tgz", + "integrity": "sha512-04+PrmVHFT+xzRhyIPdcvGc8Y2NDffUe8R1gJOyErVzEVz5N1I9Q/BrlFHYt/A4HrjM5JBsxqSrZgTIkjfPmLA==", "dependencies": { "@ipld/dag-cbor": "^9.0.6", + "@storacha/one-webcrypto": "^1.0.1", "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.2.0", "carstream": "^2.1.0", "multiformats": "^13.0.1", "uint8arrays": "^5.0.3" @@ -12261,10 +12259,9 @@ } }, "node_modules/@web3-storage/capabilities": { - "version": "17.3.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-17.3.0.tgz", - "integrity": "sha512-9415OPNVYO5gXDVf1vzZywkjndKTVA9IPnU04lQXxUaYfYZ5S5kzV2PI1SvySMOsCNE7u7uSCTiclblx5gPYAg==", - "license": "(Apache-2.0 OR MIT)", + "version": "17.4.0", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-17.4.0.tgz", + "integrity": "sha512-2VNLTTvv9qVewtXiek2Fb6W7WTQgOonq+FcNV9PyXAEgcXsQWsm8dOmbeB83W9bAuiwe9uIOzC1rftDHY3+uYA==", "dependencies": { "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", @@ -12279,7 +12276,6 @@ "version": "5.3.0", "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-5.3.0.tgz", "integrity": "sha512-zFJ4m+pEKqtKatJNsFrk/2lHeFSbkXZ6KKXjBe7/2ayA9wAar7T/unewnOcZrrZTnCWmaxKsXWqdMFy9bXK9dw==", - "license": "(Apache-2.0 AND MIT)", "dependencies": { "@ipld/dag-cbor": "^9.2.1", "multiformats": "^13.3.0", @@ -12357,9 +12353,9 @@ } }, "node_modules/@web3-storage/filecoin-api": { - "version": "7.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-api/-/filecoin-api-7.2.0.tgz", - "integrity": "sha512-0tr+vlLXQn4vbHZR2Sxxr62fKW60TejQyH3ZG1CNCFLhLkBg4pXTYSu5rxijYg3ob8DHkejKp7hXMgPQhFzOHw==", + "version": "7.3.2", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-api/-/filecoin-api-7.3.2.tgz", + "integrity": "sha512-DIhi6uheibt+JluLdfrCqH0xn/yZEIt+Nupf4OqhX9LBkaT30ySDraHz8yIolyFV7/8hxkSsglH1QQdM7M1F4A==", "dependencies": { "@ipld/dag-ucan": "^3.4.0", "@ucanto/client": "^9.0.1", @@ -12367,9 +12363,9 @@ "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", "@ucanto/transport": "^9.1.1", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.3.0", "@web3-storage/content-claims": "^5.0.0", - "@web3-storage/data-segment": "^4.0.0", + "@web3-storage/data-segment": "^5.2.0", "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.3.0", "p-map": "^6.0.0" }, @@ -12378,76 +12374,26 @@ } }, "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/data-segment": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-4.0.0.tgz", - "integrity": "sha512-AnNyJp3wHMa7LBzguQzm4rmXSi8vQBz4uFs+jiXnSNtLR5dAqHfhMvi9XdWonWPYvxNvT5ZhYCSF0mpDjymqKg==", + "version": "5.3.0", + "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-5.3.0.tgz", + "integrity": "sha512-zFJ4m+pEKqtKatJNsFrk/2lHeFSbkXZ6KKXjBe7/2ayA9wAar7T/unewnOcZrrZTnCWmaxKsXWqdMFy9bXK9dw==", "dependencies": { - "@ipld/dag-cbor": "^9.0.5", - "multiformats": "^11.0.2", + "@ipld/dag-cbor": "^9.2.1", + "multiformats": "^13.3.0", "sync-multihash-sha2": "^1.0.0" } }, - "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/data-segment/node_modules/multiformats": { - "version": "11.0.2", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", - "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, "node_modules/@web3-storage/filecoin-client": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-client/-/filecoin-client-3.3.3.tgz", - "integrity": "sha512-xFL8odr5PpTjQvpfw/4jphcm7ZvcBRMSKHn3ReEaVcFjxQL45Rojjleuq/QEdMwrNfsLCqqAxC54jk55o5/ERQ==", + "version": "3.3.4", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-client/-/filecoin-client-3.3.4.tgz", + "integrity": "sha512-T2xur1NPvuH09yajyjCWEl7MBH712nqHERj51w4nDp6f8libMCKY6lca0frCrm4OC5s8oy0ZtoRFhsRYxgTzSg==", "dependencies": { "@ipld/dag-ucan": "^3.4.0", "@ucanto/client": "^9.0.1", "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/capabilities": "^16.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/capabilities": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-16.0.0.tgz", - "integrity": "sha512-wCjLpYc6t8tFRZrF2k2vBteJDWzHkmQjoJG0Yy/fjA04IjNN48iVZaCMQIANHXZxDGlYRGxhwzDwl4dovAdSTQ==", - "dependencies": { - "@ucanto/core": "^10.0.1", - "@ucanto/interface": "^10.0.1", - "@ucanto/principal": "^9.0.1", - "@ucanto/transport": "^9.1.1", - "@ucanto/validator": "^9.0.2", - "@web3-storage/data-segment": "^3.2.0", - "uint8arrays": "^5.0.3" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/data-segment": { - "version": "3.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.2.0.tgz", - "integrity": "sha512-SM6eNumXzrXiQE2/J59+eEgCRZNYPxKhRoHX2QvV3/scD4qgcf4g+paWBc3UriLEY1rCboygGoPsnqYJNyZyfA==", - "dependencies": { - "@ipld/dag-cbor": "^9.0.5", - "multiformats": "^11.0.2", - "sync-multihash-sha2": "^1.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/data-segment/node_modules/multiformats": { - "version": "11.0.2", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", - "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" + "@web3-storage/capabilities": "^17.3.0" } }, "node_modules/@web3-storage/multipart-parser": { @@ -12466,9 +12412,9 @@ } }, "node_modules/@web3-storage/upload-api": { - "version": "18.0.2", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-18.0.2.tgz", - "integrity": "sha512-IxV95h8/kb2OpwPSv8/Rew1xBfSnW9s8DL4y1r3cjN57n35XXLnsP0to4YU+FuFo6MKFNx9Yu0UrW+7GowC2vw==", + "version": "18.1.0", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-18.1.0.tgz", + "integrity": "sha512-+28NInlExKJ7ltOrN6o6jeHT4p7fX4TJFRkgCWIcImvVTfIQQPE8z8YKOshokAXPNZebQFRo7a1UrtAIbEg4mA==", "dependencies": { "@ucanto/client": "^9.0.1", "@ucanto/interface": "^10.0.1", @@ -12476,12 +12422,12 @@ "@ucanto/server": "^10.0.0", "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", - "@web3-storage/access": "^20.0.0", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/access": "^20.1.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/filecoin-api": "^7.1.1", + "@web3-storage/filecoin-api": "^7.3.2", "multiformats": "^12.1.2", "uint8arrays": "^5.0.3" }, @@ -12512,9 +12458,9 @@ "integrity": "sha512-HzdtdBwxsIkzpeXzhQ5mAhhuxcHbjEHH+JQoxt7hG/2HGFjjwyolLo7hbaexcnhoEuV4e0TNJ8kkpMjiEYY4VQ==" }, "node_modules/@web3-storage/upload-client": { - "version": "16.1.1", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-client/-/upload-client-16.1.1.tgz", - "integrity": "sha512-aVGpcqnLxRk4u3uZAum1jwC5BbEbjuqAZZBrGNZ3UZVFnnJJXPm3DE+r1WC8FV2mbgC/yKKqDMu1FP1uB9wJkA==", + "version": "17.1.0", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-client/-/upload-client-17.1.0.tgz", + "integrity": "sha512-0tUMe4Ez9gmUZjgn1Nrl6HYdGEsYyeLa6JrpoXcCGTQDBW2FehALc+GZZeoIjYQexRpw+qt9JstuJNN9dUNETw==", "dev": true, "dependencies": { "@ipld/car": "^5.2.2", @@ -12525,10 +12471,10 @@ "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/data-segment": "^5.1.0", - "@web3-storage/filecoin-client": "^3.3.3", + "@web3-storage/filecoin-client": "^3.3.4", "ipfs-utils": "^9.0.14", "multiformats": "^12.1.2", "p-retry": "^5.1.2", @@ -12600,9 +12546,9 @@ "link": true }, "node_modules/@web3-storage/w3up-client": { - "version": "14.1.1", - "resolved": "https://registry.npmjs.org/@web3-storage/w3up-client/-/w3up-client-14.1.1.tgz", - "integrity": "sha512-brBmN1aCJpjtNLwWpz0Jg1OkfsYs7r27m5VMOET9x+j4jy3DCtSqlUkHFIumLYrNhUn5oZrqh0kbWokcaUvrLg==", + "version": "16.4.0", + "resolved": "https://registry.npmjs.org/@web3-storage/w3up-client/-/w3up-client-16.4.0.tgz", + "integrity": "sha512-ndHXVufBt6bVyZHHxpe+bgS9bnOtpREsYFtL/C2h/AL8O5LZ1QBM91wFHVvLhFOJcKVcbW2WagyivCr+NOp9WA==", "dev": true, "dependencies": { "@ipld/dag-ucan": "^3.4.0", @@ -12611,12 +12557,12 @@ "@ucanto/interface": "^10.0.1", "@ucanto/principal": "^9.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/access": "^20.0.0", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/access": "^20.1.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.1" + "@web3-storage/filecoin-client": "^3.3.4", + "@web3-storage/upload-client": "^17.1.0" }, "engines": { "node": ">=18" @@ -30353,9 +30299,9 @@ "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", "@web3-storage/access": "^20.0.0", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^18.0.2", + "@web3-storage/upload-api": "^18.1.0", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "p-map": "^7.0.2", diff --git a/package.json b/package.json index 3395c5a5..77d06ac8 100644 --- a/package.json +++ b/package.json @@ -40,12 +40,12 @@ "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^20.1.0", "@web3-storage/blob-index": "^1.0.2", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/data-segment": "5.1.0", "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.0", - "@web3-storage/w3up-client": "^14.1.1", + "@web3-storage/upload-client": "^17.1.0", + "@web3-storage/w3up-client": "^16.4.0", "ava": "^4.3.3", "chalk": "4.1.2", "constructs": "10.3.0", diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index 8ae4c2d5..870cb368 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -182,7 +182,7 @@ export function BillingStack ({ stack, app }) { // Lambda that handles egress traffic tracking const egressTrafficQueueHandler = new Function(stack, 'egress-traffic-queue-handler', { - permissions: [customerTable], + permissions: [customerTable, egressTrafficTable], handler: 'billing/functions/egress-traffic-queue.handler', timeout: '15 minutes', bind: [stripeSecretKey], diff --git a/upload-api/package.json b/upload-api/package.json index fd2f523e..4d4058dd 100644 --- a/upload-api/package.json +++ b/upload-api/package.json @@ -23,9 +23,9 @@ "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", "@web3-storage/access": "^20.0.0", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^18.0.2", + "@web3-storage/upload-api": "^18.1.0", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "p-map": "^7.0.2", From 5fd8afbaca2712259aea42c049f53c46a1ebff0c Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Thu, 24 Oct 2024 15:54:11 -0300 Subject: [PATCH 14/15] fix type/compilation issues after package updates --- billing/data/egress.js | 6 +++--- billing/lib/api.ts | 2 +- billing/test/helpers/egress.js | 2 +- billing/test/lib/api.ts | 2 +- upload-api/stores/usage.js | 16 ++++++++-------- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/billing/data/egress.js b/billing/data/egress.js index 66d68ade..3e7b630f 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -10,7 +10,7 @@ export const egressSchema = Schema.struct({ space: Schema.did({ method: 'key' }), customer: Schema.did({ method: 'mailto' }), resource: Schema.link(), - bytes: Schema.bigint(), + bytes: Schema.number(), servedAt: Schema.date(), cause: Schema.link(), }) @@ -26,7 +26,7 @@ export const encode = input => { space: input.space.toString(), customer: input.customer.toString(), resource: input.resource.toString(), - bytes: input.bytes.toString(), + bytes: Number(input.bytes), servedAt: input.servedAt.toISOString(), cause: input.cause.toString(), } @@ -59,7 +59,7 @@ export const decode = input => { space: Schema.did({ method: 'key' }).from(input.space), customer: Schema.did({ method: 'mailto' }).from(input.customer), resource: Link.parse(/** @type {string} */(input.resource)), - bytes: BigInt(input.bytes), + bytes: Number(input.bytes), servedAt: new Date(input.servedAt), cause: Link.parse(/** @type {string} */(input.cause)), } diff --git a/billing/lib/api.ts b/billing/lib/api.ts index 3aeb5e38..1eafd920 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -174,7 +174,7 @@ export interface EgressTrafficData { /** Resource that was served. */ resource: UnknownLink /** Number of bytes that were served. */ - bytes: bigint + bytes: number /** Time the egress traffic was served at. */ servedAt: Date /** UCAN invocation IDthat caused the egress traffic. */ diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js index 9baa94fd..f6f5cc9d 100644 --- a/billing/test/helpers/egress.js +++ b/billing/test/helpers/egress.js @@ -9,7 +9,7 @@ export const randomEgressEvent = async (customer) => ({ space: await randomDID(), customer: customer.customer, resource: randomLink(), - bytes: BigInt(Math.floor(Math.random() * 1000000)), + bytes: Math.floor(Math.random() * 1000000), // Random timestamp within the last 1 hour servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)), cause: randomLink() diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index e89ea108..317a8834 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -56,7 +56,7 @@ export interface UCANStreamTestContext { export interface EgressTrafficTestContext extends Context { egressTrafficQueue: EgressTrafficQueue & QueueRemover egressTrafficQueueUrl: string - egressTrafficHandler: Handler + egressTrafficHandler: Handler accountId: string region: string customerTable: string diff --git a/upload-api/stores/usage.js b/upload-api/stores/usage.js index 6617f0df..89c01a0f 100644 --- a/upload-api/stores/usage.js +++ b/upload-api/stores/usage.js @@ -63,13 +63,13 @@ export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTraffi /** * Handle egress traffic data and enqueues it, so the billing system can process it and update the Stripe Billing Meter API. * - * @param {import('@web3-storage/upload-api').SpaceDID} space - * @param {import('@web3-storage/upload-api').AccountDID} customer - * @param {import('@web3-storage/upload-api').UnknownLink} resource - * @param {bigint} bytes - * @param {Date} servedAt - * @param {import('@web3-storage/upload-api').UnknownLink} cause - * @returns {Promise>} + * @param {import('@web3-storage/upload-api').SpaceDID} space - The space that the egress traffic is associated with. + * @param {import('@web3-storage/upload-api').AccountDID} customer - The customer that will be billed for the egress traffic. + * @param {import('@web3-storage/upload-api').UnknownLink} resource - The resource that was served. + * @param {number} bytes - The number of bytes that were served. + * @param {Date} servedAt - The date and time when the egress traffic was served. + * @param {import('@web3-storage/upload-api').UnknownLink} cause - The UCAN invocation ID that caused the egress traffic. + * @returns {Promise>} */ async record(space, customer, resource, bytes, servedAt, cause) { const record = { @@ -84,7 +84,7 @@ export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTraffi const result = await egressTrafficQueue.add(record) if (result.error) return result - return { ok: record } + return { ok: { ...record, servedAt: servedAt.toISOString() } } } } } From a6fb7a5d55ef68bc10bcdf6a58c24a67e6c221f1 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Fri, 25 Oct 2024 09:58:27 -0300 Subject: [PATCH 15/15] fix(seedrun): missing env vars --- stacks/billing-stack.js | 2 +- stacks/upload-api-stack.js | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index 870cb368..f8b158f3 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -211,5 +211,5 @@ export function BillingStack ({ stack, app }) { EgressTrafficQueueURL: egressTrafficQueue.queueUrl }) - return { billingCron } + return { billingCron, egressTrafficQueue } } diff --git a/stacks/upload-api-stack.js b/stacks/upload-api-stack.js index de4f9cd5..74d6e789 100644 --- a/stacks/upload-api-stack.js +++ b/stacks/upload-api-stack.js @@ -9,6 +9,7 @@ import { import { StartingPosition, FilterCriteria, FilterRule } from 'aws-cdk-lib/aws-lambda' import { UploadDbStack } from './upload-db-stack.js' import { BillingDbStack } from './billing-db-stack.js' +import { BillingStack } from './billing-stack.js' import { CarparkStack } from './carpark-stack.js' import { FilecoinStack } from './filecoin-stack.js' import { UcanInvocationStack } from './ucan-invocation-stack.js' @@ -45,9 +46,10 @@ export function UploadApiStack({ stack, app }) { const { carparkBucket } = use(CarparkStack) const { allocationTable, storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey, contentClaimsPrivateKey } = use(UploadDbStack) const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack) - const { customerTable, spaceDiffTable, spaceSnapshotTable, stripeSecretKey } = use(BillingDbStack) + const { customerTable, spaceDiffTable, spaceSnapshotTable, egressTrafficTable, stripeSecretKey } = use(BillingDbStack) const { pieceOfferQueue, filecoinSubmitQueue } = use(FilecoinStack) const { blockAdvertPublisherQueue, blockIndexWriterQueue } = use(IndexerStack) + const { egressTrafficQueue } = use(BillingStack) // Setup API const customDomains = process.env.HOSTED_ZONES?.split(',').map(zone => getCustomDomain(stack.stage, zone)) @@ -82,6 +84,7 @@ export function UploadApiStack({ stack, app }) { pieceTable, spaceDiffTable, spaceSnapshotTable, + egressTrafficTable, carparkBucket, invocationBucket, taskBucket, @@ -91,6 +94,7 @@ export function UploadApiStack({ stack, app }) { filecoinSubmitQueue, blockAdvertPublisherQueue, blockIndexWriterQueue, + egressTrafficQueue, ], environment: { DID: process.env.UPLOAD_API_DID ?? '', @@ -119,6 +123,7 @@ export function UploadApiStack({ stack, app }) { FILECOIN_SUBMIT_QUEUE_URL: filecoinSubmitQueue.queueUrl, BLOCK_ADVERT_PUBLISHER_QUEUE_URL: blockAdvertPublisherQueue.queueUrl, BLOCK_INDEX_WRITER_QUEUE_URL: blockIndexWriterQueue.queueUrl, + EGRESS_TRAFFIC_QUEUE_URL: egressTrafficQueue.queueUrl, NAME: pkg.name, VERSION: pkg.version, COMMIT: git.commmit,