diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b968740b..773a4ec4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -25,3 +25,5 @@ jobs: AWS_ACCESS_KEY_ID: 'NOSUCH' AWS_SECRET_ACCESS_KEY: 'NOSUCH' STRIPE_TEST_SECRET_KEY: ${{ secrets.STRIPE_TEST_SECRET_KEY }} + STRIPE_BILLING_METER_ID: ${{ vars.STRIPE_BILLING_METER_ID }} + STRIPE_BILLING_METER_EVENT_NAME: ${{ vars.STRIPE_BILLING_METER_EVENT_NAME }} diff --git a/billing/data/egress.js b/billing/data/egress.js new file mode 100644 index 00000000..3e7b630f --- /dev/null +++ b/billing/data/egress.js @@ -0,0 +1,110 @@ +import { Link } from '@ucanto/server' +import { DecodeFailure, EncodeFailure, Schema } from './lib.js' + +/** + * @typedef { import('../types').InferStoreRecord } EgressTrafficStoreRecord + * @typedef { import('../types').InferStoreRecord } EgressTrafficKeyStoreRecord + */ + +export const egressSchema = Schema.struct({ + space: Schema.did({ method: 'key' }), + customer: Schema.did({ method: 'mailto' }), + resource: Schema.link(), + bytes: Schema.number(), + servedAt: Schema.date(), + cause: Schema.link(), +}) + +/** @type {import('../lib/api').Validator} */ +export const validate = input => egressSchema.read(input) + +/** @type {import('../lib/api').Encoder} */ +export const encode = input => { + try { + return { + ok: { + space: input.space.toString(), + customer: input.customer.toString(), + resource: input.resource.toString(), + bytes: Number(input.bytes), + servedAt: input.servedAt.toISOString(), + cause: input.cause.toString(), + } + } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Encoder} */ +export const encodeStr = input => { + try { + const data = encode(input) + if (data.error) throw data.error + return { ok: JSON.stringify(data.ok) } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Decoder} */ +export const decode = input => { + try { + return { + ok: { + space: Schema.did({ method: 'key' }).from(input.space), + customer: Schema.did({ method: 'mailto' }).from(input.customer), + resource: Link.parse(/** @type {string} */(input.resource)), + bytes: Number(input.bytes), + servedAt: new Date(input.servedAt), + cause: Link.parse(/** @type {string} */(input.cause)), + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Decoder} */ +export const decodeStr = input => { + try { + return decode(JSON.parse(input)) + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err }) + } + } +} + +export const lister = { + /** @type {import('../lib/api').Encoder} */ + encodeKey: input => ({ + ok: { + space: input.space.toString(), + customer: input.customer.toString(), + from: input.from.toISOString() + } + }), + /** @type {import('../lib/api').Decoder} */ + decodeKey: input => { + try { + return { + ok: { + space: Schema.did({ method: 'key' }).from(input.space), + customer: Schema.did({ method: 'mailto' }).from(input.customer), + from: new Date(input.from) + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress traffic event list key: ${err.message}`, { cause: err }) + } + } + } +} \ No newline at end of file diff --git a/billing/functions/egress-traffic-queue.js b/billing/functions/egress-traffic-queue.js new file mode 100644 index 00000000..34a565f0 --- /dev/null +++ b/billing/functions/egress-traffic-queue.js @@ -0,0 +1,95 @@ +import * as Sentry from '@sentry/serverless' +import { expect } from './lib.js' +import { decodeStr } from '../data/egress.js' +import { mustGetEnv } from '../../lib/env.js' +import { createCustomerStore } from '../tables/customer.js' +import Stripe from 'stripe' +import { Config } from 'sst/node/config' +import { recordBillingMeterEvent } from '../utils/stripe.js' +import { createEgressTrafficEventStore } from '../tables/egress-traffic.js' + + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0 +}) + +/** + * @typedef {{ + * region?: 'us-west-2'|'us-east-2' + * egressTrafficQueueUrl?: string + * customerTable?: string + * billingMeterName?: string + * stripeSecretKey?: string + * customerStore?: import('../lib/api.js').CustomerStore + * egressTrafficTable?: string + * egressTrafficEventStore?: import('../lib/api.js').EgressTrafficEventStore + * }} CustomHandlerContext + */ + +/** + * AWS Lambda handler to process egress events from the egress traffic queue. + * Each event is a JSON object with `customer`, `resource`, `bytes` and `servedAt`. + * The message is then deleted from the queue when successful. + */ +export const handler = Sentry.AWSLambda.wrapHandler( + /** + * @param {import('aws-lambda').SQSEvent} event + * @param {import('aws-lambda').Context} context + */ + async (event, context) => { + /** @type {CustomHandlerContext|undefined} */ + const customContext = context?.clientContext?.Custom + const region = customContext?.region ?? mustGetEnv('AWS_REGION') + const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') + const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) + const egressTrafficTable = customContext?.egressTrafficTable ?? mustGetEnv('EGRESS_TRAFFIC_TABLE_NAME') + const egressTrafficEventStore = customContext?.egressTrafficEventStore ?? createEgressTrafficEventStore({ region }, { tableName: egressTrafficTable }) + + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY + if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') + + const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME') + if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') + + const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) + const batchItemFailures = [] + for (const record of event.Records) { + try { + const decoded = decodeStr(record.body) + const egressData = expect(decoded, 'Failed to decode egress event') + + const putResult = await egressTrafficEventStore.put(egressData) + if (putResult.error) throw putResult.error + + const response = await customerStore.get({ customer: egressData.customer }) + if (response.error) { + return { + error: { + name: 'CustomerNotFound', + message: `Error getting customer ${egressData.customer}`, + cause: response.error + } + } + } + const customerAccount = response.ok.account + + expect( + await recordBillingMeterEvent(stripe, billingMeterName, egressData, customerAccount), + `Failed to record egress event in Stripe API for customer: ${egressData.customer}, account: ${customerAccount}, bytes: ${egressData.bytes}, servedAt: ${egressData.servedAt.toISOString()}, resource: ${egressData.resource}` + ) + } catch (error) { + console.error('Error processing egress event:', error) + batchItemFailures.push({ itemIdentifier: record.messageId }) + } + } + + return { + statusCode: 200, + body: 'Egress events processed successfully', + // Return the failed records so they can be retried + batchItemFailures + } + }, +) \ No newline at end of file diff --git a/billing/lib/api.ts b/billing/lib/api.ts index 22df39de..1eafd920 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -1,4 +1,4 @@ -import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure } from '@ucanto/interface' +import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure, UnknownLink } from '@ucanto/interface' // Billing stores ///////////////////////////////////////////////////////////// @@ -133,6 +133,11 @@ export interface UsageListKey { customer: CustomerDID, from: Date } export type UsageStore = StorePutter +/** + * Store for egress traffic data. + */ +export type EgressTrafficEventStore = StorePutter & StoreLister + // Billing queues ///////////////////////////////////////////////////////////// /** @@ -158,6 +163,34 @@ export interface CustomerBillingInstruction { export type CustomerBillingQueue = QueueAdder +/** + * Captures details about egress traffic that should be billed for a given period + */ +export interface EgressTrafficData { + /** Space DID (did:key:...). */ + space: ConsumerDID + /** Customer DID (did:mailto:...). */ + customer: CustomerDID + /** Resource that was served. */ + resource: UnknownLink + /** Number of bytes that were served. */ + bytes: number + /** Time the egress traffic was served at. */ + servedAt: Date + /** UCAN invocation IDthat caused the egress traffic. */ + cause: UnknownLink +} + +/** + * Queue for egress traffic data. + */ +export type EgressTrafficQueue = QueueAdder + +/** + * List key for egress traffic data. + */ +export interface EgressTrafficEventListKey { space: ConsumerDID, customer: CustomerDID, from: Date } + /** * Captures details about a space that should be billed for a given customer in * the given period of usage. diff --git a/billing/package.json b/billing/package.json index b0a96027..6bef96da 100644 --- a/billing/package.json +++ b/billing/package.json @@ -4,6 +4,7 @@ "type": "module", "scripts": { "test": "entail '**/*.spec.js'", + "test-only": "entail", "coverage": "c8 -r text -r html npm test" }, "dependencies": { @@ -13,7 +14,7 @@ "@sentry/serverless": "^7.74.1", "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.4.0", "big.js": "^6.2.1", "lru-cache": "^11.0.0", "multiformats": "^13.1.0", diff --git a/billing/queues/egress-traffic.js b/billing/queues/egress-traffic.js new file mode 100644 index 00000000..173b79d2 --- /dev/null +++ b/billing/queues/egress-traffic.js @@ -0,0 +1,9 @@ +import { createQueueAdderClient } from './client.js' +import { encodeStr, validate } from '../data/egress.js' + +/** + * @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf + * @param {{ url: URL }} context + */ +export const createEgressTrafficQueue = (conf, { url }) => + createQueueAdderClient(conf, { url, encode: encodeStr, validate }) diff --git a/billing/tables/egress-traffic.js b/billing/tables/egress-traffic.js new file mode 100644 index 00000000..fadd160c --- /dev/null +++ b/billing/tables/egress-traffic.js @@ -0,0 +1,42 @@ +import { createStorePutterClient, createStoreListerClient } from './client.js' +import { validate, encode, lister, decode } from '../data/egress.js' + +/** + * Source of truth for egress traffic data. + * + * @type {import('sst/constructs').TableProps} + */ +export const egressTrafficTableProps = { + fields: { + /** Space DID (did:key:...). */ + space: 'string', + /** Customer DID (did:mailto:...). */ + customer: 'string', + /** Resource CID. */ + resource: 'string', + /** ISO timestamp of the event. */ + servedAt: 'string', + /** Bytes served. */ + bytes: 'number', + /** UCAN invocation ID that caused the egress traffic. */ + cause: 'string', + }, + primaryIndex: { partitionKey: 'space', sortKey: 'servedAt' }, + globalIndexes: { + customer: { + partitionKey: 'customer', + sortKey: 'servedAt', + projection: ['space', 'resource', 'bytes', 'cause', 'servedAt'] + } + } +} + +/** + * @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf + * @param {{ tableName: string }} context + * @returns {import('../lib/api.js').EgressTrafficEventStore} + */ +export const createEgressTrafficEventStore = (conf, { tableName }) => ({ + ...createStorePutterClient(conf, { tableName, validate, encode }), + ...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode }) +}) \ No newline at end of file diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 478c5d1f..ea321a73 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -1,3 +1,5 @@ +import dotenv from 'dotenv' +import path from 'node:path' import { createDynamoDB, createSQS, createQueue, createTable } from './aws.js' import { createCustomerStore, customerTableProps } from '../../tables/customer.js' import { encode as encodeCustomer, validate as validateCustomer } from '../../data/customer.js' @@ -6,6 +8,7 @@ import { decode as decodeSpaceBillingInstruction } from '../../data/space-billin import { encode as encodeSubscription, validate as validateSubscription } from '../../data/subscription.js' import { encode as encodeConsumer, validate as validateConsumer } from '../../data/consumer.js' import { decode as decodeUsage, lister as usageLister } from '../../data/usage.js' +import { decodeStr as decodeEgressTrafficEvent, validate as validateEgressTrafficEvent, encode as encodeEgressTrafficEvent } from '../../data/egress.js' import { createCustomerBillingQueue } from '../../queues/customer.js' import { createSpaceBillingQueue } from '../../queues/space.js' import { consumerTableProps, subscriptionTableProps } from '../../../upload-api/tables/index.js' @@ -16,6 +19,12 @@ import { createSpaceDiffStore, spaceDiffTableProps } from '../../tables/space-di import { createSpaceSnapshotStore, spaceSnapshotTableProps } from '../../tables/space-snapshot.js' import { createUsageStore, usageTableProps } from '../../tables/usage.js' import { createQueueRemoverClient } from './queue.js' +import { createEgressTrafficQueue } from '../../queues/egress-traffic.js' +import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-queue.js' +import Stripe from 'stripe' +import { createEgressTrafficEventStore, egressTrafficTableProps } from '../../tables/egress-traffic.js' + +dotenv.config({ path: path.resolve('../.env.local'), override: true, debug: true }) /** * @typedef {{ @@ -33,6 +42,26 @@ const createAWSServices = async () => { } } +/** + * @returns {{ stripe: Stripe, stripeSecretKey: string, billingMeterEventName: string, billingMeterId: string }} + */ +const createStripeService = () => { + const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY + if (!stripeSecretKey) { + throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set') + } + const billingMeterEventName = process.env.STRIPE_BILLING_METER_EVENT_NAME + if (!billingMeterEventName) { + throw new Error('STRIPE_BILLING_METER_EVENT_NAME environment variable is not set') + } + const billingMeterId = process.env.STRIPE_BILLING_METER_ID + if (!billingMeterId) { + throw new Error('STRIPE_BILLING_METER_ID environment variable is not set') + } + const stripe = new Stripe(stripeSecretKey, { apiVersion: "2023-10-16" }) + return { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } +} + export const createBillingCronTestContext = async () => { await createAWSServices() @@ -137,6 +166,61 @@ export const createUCANStreamTestContext = async () => { return { consumerStore, spaceDiffStore } } +/** + * @returns {Promise} + */ +export const createEgressTrafficTestContext = async () => { + await createAWSServices() + + const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-queue-')) + const egressTrafficQueue = { + add: createEgressTrafficQueue(awsServices.sqs.client, { url: egressQueueURL }).add, + remove: createQueueRemoverClient(awsServices.sqs.client, { url: egressQueueURL, decode: decodeEgressTrafficEvent }).remove, + } + + const accountId = (await awsServices.sqs.client.config.credentials()).accountId + const region = 'us-west-2' + + const customerTable = await createTable(awsServices.dynamo.client, customerTableProps, 'customer-') + const customerStore = { + ...createCustomerStore(awsServices.dynamo.client, { tableName: customerTable }), + ...createStorePutterClient(awsServices.dynamo.client, { + tableName: customerTable, + validate: validateCustomer, // assume test data is valid + encode: encodeCustomer + }) + } + + const egressTrafficTable = await createTable(awsServices.dynamo.client, egressTrafficTableProps, 'egress-traffic-') + const egressTrafficEventStore = { + ...createEgressTrafficEventStore(awsServices.dynamo.client, { tableName: egressTrafficTable }), + ...createStorePutterClient(awsServices.dynamo.client, { + tableName: egressTrafficTable, + validate: validateEgressTrafficEvent, // assume test data is valid + encode: encodeEgressTrafficEvent + }) + } + + const { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } = createStripeService() + + // @ts-expect-error -- Don't need to initialize the full lambda context for testing + return { + egressTrafficQueue, + egressTrafficQueueUrl: egressQueueURL.toString(), + egressTrafficHandler: createEgressTrafficHandler, + accountId: accountId ?? '', + region: region ?? '', + customerTable, + customerStore, + egressTrafficTable, + egressTrafficEventStore, + billingMeterEventName, + billingMeterId, + stripeSecretKey, + stripe, + } +} + /** * @template C * @param {import('../lib/api').TestSuite} suite diff --git a/billing/test/helpers/did.js b/billing/test/helpers/did.js index c58fb9e7..3f0bc759 100644 --- a/billing/test/helpers/did.js +++ b/billing/test/helpers/did.js @@ -8,7 +8,7 @@ const randomDomain = () => `${randomAlphas(randomInteger(1, 32))}.${tlds[randomInteger(0, tlds.length)]}` /** @returns {import("@ucanto/interface").DID<'mailto'>} */ -export const randomDIDMailto = () => +export const randomDIDMailto = () => `did:mailto:${randomDomain()}:${randomAlphas(randomInteger(1, 16))}` /** @returns {Promise} */ diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js new file mode 100644 index 00000000..f6f5cc9d --- /dev/null +++ b/billing/test/helpers/egress.js @@ -0,0 +1,16 @@ +import { randomLink } from './dag.js' +import { randomDID } from './did.js' + +/** + * @param {import('../../lib/api').Customer} customer + * @returns {Promise} + */ +export const randomEgressEvent = async (customer) => ({ + space: await randomDID(), + customer: customer.customer, + resource: randomLink(), + bytes: Math.floor(Math.random() * 1000000), + // Random timestamp within the last 1 hour + servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)), + cause: randomLink() +}) diff --git a/billing/test/lib.egress-traffic.spec.js b/billing/test/lib.egress-traffic.spec.js new file mode 100644 index 00000000..306b57a5 --- /dev/null +++ b/billing/test/lib.egress-traffic.spec.js @@ -0,0 +1,4 @@ +import * as EgressTrafficSuite from './lib/egress-traffic.js' +import { bindTestContext, createEgressTrafficTestContext } from './helpers/context.js' + +export const test = bindTestContext(EgressTrafficSuite.test, createEgressTrafficTestContext) \ No newline at end of file diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index 68814ca9..317a8834 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -18,8 +18,13 @@ import { SpaceSnapshotStore, UsageStore, UsageListKey, - Usage + Usage, + EgressTrafficQueue, + EgressTrafficData, + EgressTrafficEventStore } from '../../lib/api.js' +import { Context, Handler, SQSEvent } from 'aws-lambda' +import Stripe from 'stripe' export interface BillingCronTestContext { customerStore: CustomerStore & StorePutter @@ -47,12 +52,30 @@ export interface UCANStreamTestContext { consumerStore: ConsumerStore & StorePutter } + +export interface EgressTrafficTestContext extends Context { + egressTrafficQueue: EgressTrafficQueue & QueueRemover + egressTrafficQueueUrl: string + egressTrafficHandler: Handler + accountId: string + region: string + customerTable: string + customerStore: CustomerStore + egressTrafficTable: string + egressTrafficEventStore: EgressTrafficEventStore + billingMeterEventName: string + billingMeterId: string + stripeSecretKey: string + stripe: Stripe +} + export type TestContext = & BillingCronTestContext & CustomerBillingQueueTestContext & SpaceBillingQueueTestContext & StripeTestContext & UCANStreamTestContext + & EgressTrafficTestContext /** QueueRemover can remove items from the head of the queue. */ export interface QueueRemover { diff --git a/billing/test/lib/egress-traffic.js b/billing/test/lib/egress-traffic.js new file mode 100644 index 00000000..a5e27009 --- /dev/null +++ b/billing/test/lib/egress-traffic.js @@ -0,0 +1,109 @@ +import { encodeStr } from '../../data/egress.js' +import { randomCustomer } from '../helpers/customer.js' +import { randomDIDMailto } from '../helpers/did.js' +import { randomEgressEvent } from '../helpers/egress.js' +import * as DidMailto from '@web3-storage/did-mailto' + +/** @type {import('./api').TestSuite} */ +export const test = { + /** + * @param {import('entail').assert} assert + * @param {import('./api').EgressTrafficTestContext} ctx + */ + 'should process all the egress traffic events from the queue': async (assert, ctx) => { + let stripeCustomerId; + try { + // 0. Create a test customer email, add it to stripe and to the customer store + const didMailto = randomDIDMailto() + const email = DidMailto.toEmail(/** @type {`did:mailto:${string}:${string}`} */(didMailto)) + const stripeCustomer = await ctx.stripe.customers.create({ email }) + assert.ok(stripeCustomer.id, 'Error adding customer to stripe') + stripeCustomerId = stripeCustomer.id + + const customer = randomCustomer({ + customer: didMailto, + /** @type {`stripe:${string}`} */ + account: `stripe:${stripeCustomerId}` + }) + const { error } = await ctx.customerStore.put(customer) + assert.ok(!error, 'Error adding customer') + + // 1. Add egress events to the queue to simulate egress traffic from the Freeway worker + const maxEvents = 10 + /** @type {import('../../lib/api').EgressTrafficData[]} */ + const events = await Promise.all( + Array.from( + { length: maxEvents }, + async () => await randomEgressEvent(customer) + ) + ) + + for (const e of events) { + console.log(`Egress traffic for ${e.customer}, bytes: ${e.bytes}, servedAt: ${e.servedAt.toISOString()}, `) + const result = await ctx.egressTrafficQueue.add(e) + assert.ok(!result.error, 'Error adding egress event to the queue') + } + + // 2. Create a SQS event batch + // @type {import('aws-lambda').SQSEvent} + const sqsEventBatch = { + Records: events.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: encodeStr(e).ok ?? '', + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.servedAt.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), + }, + messageAttributes: {}, + })) + } + + // 3. Process the SQS event to trigger the handler using the custom context + const customCtx = { + clientContext: { + Custom: ctx, + }, + } + // @ts-expect-error -- Don't need to initialize the full lambda context for testing + await ctx.egressTrafficHandler(sqsEventBatch, customCtx, (err, res) => { + if (err) { + assert.fail(err) + } + assert.ok(res) + assert.equal(res.statusCode, 200) + assert.equal(res.body, 'Egress events processed successfully') + }) + + // 4. Check if the aggregated meter event exists and has a value greater than 0 + const aggregatedMeterEvent = await ctx.stripe.billing.meters.listEventSummaries( + ctx.billingMeterId, + { + customer: stripeCustomerId, + start_time: Math.floor(events[0].servedAt.getTime() / 1000), + end_time: Math.floor(Date.now() / 1000), + } + ) + assert.ok(aggregatedMeterEvent.data, 'No aggregated meter event found') + assert.equal(aggregatedMeterEvent.data.length, 1, 'Expected 1 aggregated meter event') + // We can't verify the total bytes served because the meter events are not immediately available in stripe + // and the test would fail intermittently + assert.ok(aggregatedMeterEvent.data[0].aggregated_value > 0, 'Aggregated value is 0') + } finally { + if (stripeCustomerId) { + // 5. Delete the test customer from stripe + const deletedCustomer = await ctx.stripe.customers.del(stripeCustomerId); + assert.ok(deletedCustomer.deleted, 'Error deleting customer from stripe') + } + } + } +} \ No newline at end of file diff --git a/billing/test/utils/stripe.js b/billing/test/utils/stripe.js index 72828296..2b7c6206 100644 --- a/billing/test/utils/stripe.js +++ b/billing/test/utils/stripe.js @@ -1,4 +1,7 @@ -import { handleCustomerSubscriptionCreated } from '../../utils/stripe.js' +import { + accountIDToStripeCustomerID, + stripeIDToAccountID, + handleCustomerSubscriptionCreated } from '../../utils/stripe.js' import * as DidMailto from '@web3-storage/did-mailto' @@ -44,5 +47,17 @@ export const test = { assert.ok(result.ok) const customerRecord = await ctx.customerStore.get({ customer }) assert.equal(customerRecord.ok?.product, product) + }, + + 'should convert an account ID to a stripe customer ID': (/** @type {import('entail').assert} */ assert) => { + const accountID = 'stripe:cus_1234567890' + const stripeCustomerId = accountIDToStripeCustomerID(accountID) + assert.equal(stripeCustomerId, 'cus_1234567890') + }, + + 'should convert a stripe customer ID to an account ID': (/** @type {import('entail').assert} */ assert) => { + const stripeCustomerId = 'cus_1234567890' + const accountID = stripeIDToAccountID(stripeCustomerId) + assert.equal(accountID, 'stripe:cus_1234567890') } } diff --git a/billing/utils/stripe.js b/billing/utils/stripe.js index b47b3ded..5f05934d 100644 --- a/billing/utils/stripe.js +++ b/billing/utils/stripe.js @@ -7,6 +7,9 @@ import * as DidMailto from '@web3-storage/did-mailto' */ /** + * Converts a Stripe customer ID to an Account ID. + * e.g: + * cus_1234567890 -> stripe:cus_1234567890 * * @param {string} stripeID * @returns {AccountID} @@ -15,6 +18,17 @@ export function stripeIDToAccountID(stripeID) { return /** @type {AccountID} */(`stripe:${stripeID}`) } +/** + * Converts an Account ID to a Stripe customer ID. + * e.g: + * stripe:cus_1234567890 -> cus_1234567890 + * + * @param {AccountID} accountID + * @returns {string} + */ +export const accountIDToStripeCustomerID = (accountID) => accountID.slice('stripe:'.length) + + /** * * @param {Stripe} stripe @@ -50,3 +64,50 @@ export async function handleCustomerSubscriptionCreated(stripe, event, customerS }) } } + +/** + * Records an egress traffic event in the Stripe Billing Meter API for the given customer account. + * + * @param {import('stripe').Stripe} stripe + * @param {string} billingMeterEventName + * @param {import('../lib/api.js').EgressTrafficData} egressData + * @param {AccountID} customerAccount + */ +export async function recordBillingMeterEvent(stripe, billingMeterEventName, egressData, customerAccount) { + const stripeCustomerId = accountIDToStripeCustomerID(customerAccount) + /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ + const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) + if (stripeCustomer.deleted) { + return { + error: { + name: 'StripeCustomerNotFound', + message: `Customer ${stripeCustomerId} has been deleted from Stripe`, + } + } + } + + /** @type {import('stripe').Stripe.Billing.MeterEvent} */ + const meterEvent = await stripe.billing.meterEvents.create({ + event_name: billingMeterEventName, + payload: { + stripe_customer_id: stripeCustomerId, + value: egressData.bytes.toString(), + }, + timestamp: Math.floor(egressData.servedAt.getTime() / 1000), + }, + { + idempotencyKey: `${egressData.servedAt.toISOString()}-${egressData.space}-${egressData.customer}-${egressData.resource}` + } + ) + + // Identifier is only set if the event was successfully created + if (meterEvent.identifier) { + return { ok: { meterEvent } } + } + return { + error: { + name: 'StripeBillingMeterEventCreationFailed', + message: `Error creating meter event for egress traffic in Stripe for customer ${egressData.customer} @ ${egressData.servedAt.toISOString()}`, + } + } +} \ No newline at end of file diff --git a/filecoin/test/filecoin-events.test.js b/filecoin/test/filecoin-events.test.js index dfd5663a..5added75 100644 --- a/filecoin/test/filecoin-events.test.js +++ b/filecoin/test/filecoin-events.test.js @@ -90,12 +90,15 @@ test.after(async t => { }) for (const [title, unit] of Object.entries(filecoinApiTest.events.storefront)) { - const define = title.startsWith('only ') + let define; + if (title.startsWith('only ')) { // eslint-disable-next-line no-only-tests/no-only-tests - ? test.only - : title.startsWith('skip ') - ? test.skip - : test + define = test.only; + } else if (title.startsWith('skip ')) { + define = test.skip; + } else { + define = test; + } define(title, async (t) => { const queues = getQueues(t.context) diff --git a/filecoin/test/filecoin-service.test.js b/filecoin/test/filecoin-service.test.js index 7fc853e0..54d60ea1 100644 --- a/filecoin/test/filecoin-service.test.js +++ b/filecoin/test/filecoin-service.test.js @@ -91,12 +91,15 @@ test.after(async t => { }) for (const [title, unit] of Object.entries(filecoinApiTest.service.storefront)) { - const define = title.startsWith('only ') + let define; + if (title.startsWith('only ')) { // eslint-disable-next-line no-only-tests/no-only-tests - ? test.only - : title.startsWith('skip ') - ? test.skip - : test + define = test.only; + } else if (title.startsWith('skip ')) { + define = test.skip; + } else { + define = test; + } define(title, async (t) => { const queues = getQueues(t.context) diff --git a/package-lock.json b/package-lock.json index 281a02e1..5ef5964b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -47,12 +47,12 @@ "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^20.1.0", "@web3-storage/blob-index": "^1.0.2", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/data-segment": "5.1.0", "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.0", - "@web3-storage/w3up-client": "^14.1.1", + "@web3-storage/upload-client": "^17.1.0", + "@web3-storage/w3up-client": "^16.4.0", "ava": "^4.3.3", "chalk": "4.1.2", "constructs": "10.3.0", @@ -79,7 +79,7 @@ "@sentry/serverless": "^7.74.1", "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.4.0", "big.js": "^6.2.1", "lru-cache": "^11.0.0", "multiformats": "^13.1.0", @@ -7947,7 +7947,6 @@ "version": "9.2.1", "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-9.2.1.tgz", "integrity": "sha512-nyY48yE7r3dnJVlxrdaimrbloh4RokQaNRdI//btfTkcTEZbpmSrbYcBQ4VKTf8ZxXAOUJy4VsRpkJo+y9RTnA==", - "license": "Apache-2.0 OR MIT", "dependencies": { "cborg": "^4.0.0", "multiformats": "^13.1.0" @@ -11427,8 +11426,7 @@ "node_modules/@storacha/one-webcrypto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/@storacha/one-webcrypto/-/one-webcrypto-1.0.1.tgz", - "integrity": "sha512-bD+vWmcgsEBqU0Dz04BR43SA03bBoLTAY29vaKasY9Oe8cb6XIP0/vkm0OS2UwKC13c8uRgFW4rjJUgDCNLejQ==", - "license": "MIT" + "integrity": "sha512-bD+vWmcgsEBqU0Dz04BR43SA03bBoLTAY29vaKasY9Oe8cb6XIP0/vkm0OS2UwKC13c8uRgFW4rjJUgDCNLejQ==" }, "node_modules/@trpc/server": { "version": "9.16.0", @@ -12204,7 +12202,6 @@ "version": "20.1.0", "resolved": "https://registry.npmjs.org/@web3-storage/access/-/access-20.1.0.tgz", "integrity": "sha512-IY6ICPRWE8++2jxvy+LzAiFvwAOIHR8cu9eNt+VT5sAFE796o4ma7GSU0eXRCiShmV2n6iSWAwWRT6XD5zIqPA==", - "license": "(Apache-2.0 OR MIT)", "dependencies": { "@ipld/car": "^5.1.1", "@ipld/dag-ucan": "^3.4.0", @@ -12236,14 +12233,15 @@ } }, "node_modules/@web3-storage/blob-index": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/@web3-storage/blob-index/-/blob-index-1.0.3.tgz", - "integrity": "sha512-VjGLhf6Gf4ZmzjJXS6wU4aRvnM+HLcuRCJHegjQ36ka52sR2WWOcqDNNVvabtlpnYjGtVFQCPUzaCcs18wpqHQ==", + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@web3-storage/blob-index/-/blob-index-1.0.4.tgz", + "integrity": "sha512-04+PrmVHFT+xzRhyIPdcvGc8Y2NDffUe8R1gJOyErVzEVz5N1I9Q/BrlFHYt/A4HrjM5JBsxqSrZgTIkjfPmLA==", "dependencies": { "@ipld/dag-cbor": "^9.0.6", + "@storacha/one-webcrypto": "^1.0.1", "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.2.0", "carstream": "^2.1.0", "multiformats": "^13.0.1", "uint8arrays": "^5.0.3" @@ -12261,10 +12259,9 @@ } }, "node_modules/@web3-storage/capabilities": { - "version": "17.3.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-17.3.0.tgz", - "integrity": "sha512-9415OPNVYO5gXDVf1vzZywkjndKTVA9IPnU04lQXxUaYfYZ5S5kzV2PI1SvySMOsCNE7u7uSCTiclblx5gPYAg==", - "license": "(Apache-2.0 OR MIT)", + "version": "17.4.0", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-17.4.0.tgz", + "integrity": "sha512-2VNLTTvv9qVewtXiek2Fb6W7WTQgOonq+FcNV9PyXAEgcXsQWsm8dOmbeB83W9bAuiwe9uIOzC1rftDHY3+uYA==", "dependencies": { "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", @@ -12279,7 +12276,6 @@ "version": "5.3.0", "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-5.3.0.tgz", "integrity": "sha512-zFJ4m+pEKqtKatJNsFrk/2lHeFSbkXZ6KKXjBe7/2ayA9wAar7T/unewnOcZrrZTnCWmaxKsXWqdMFy9bXK9dw==", - "license": "(Apache-2.0 AND MIT)", "dependencies": { "@ipld/dag-cbor": "^9.2.1", "multiformats": "^13.3.0", @@ -12357,9 +12353,9 @@ } }, "node_modules/@web3-storage/filecoin-api": { - "version": "7.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-api/-/filecoin-api-7.2.0.tgz", - "integrity": "sha512-0tr+vlLXQn4vbHZR2Sxxr62fKW60TejQyH3ZG1CNCFLhLkBg4pXTYSu5rxijYg3ob8DHkejKp7hXMgPQhFzOHw==", + "version": "7.3.2", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-api/-/filecoin-api-7.3.2.tgz", + "integrity": "sha512-DIhi6uheibt+JluLdfrCqH0xn/yZEIt+Nupf4OqhX9LBkaT30ySDraHz8yIolyFV7/8hxkSsglH1QQdM7M1F4A==", "dependencies": { "@ipld/dag-ucan": "^3.4.0", "@ucanto/client": "^9.0.1", @@ -12367,9 +12363,9 @@ "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", "@ucanto/transport": "^9.1.1", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.3.0", "@web3-storage/content-claims": "^5.0.0", - "@web3-storage/data-segment": "^4.0.0", + "@web3-storage/data-segment": "^5.2.0", "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.3.0", "p-map": "^6.0.0" }, @@ -12378,76 +12374,26 @@ } }, "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/data-segment": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-4.0.0.tgz", - "integrity": "sha512-AnNyJp3wHMa7LBzguQzm4rmXSi8vQBz4uFs+jiXnSNtLR5dAqHfhMvi9XdWonWPYvxNvT5ZhYCSF0mpDjymqKg==", + "version": "5.3.0", + "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-5.3.0.tgz", + "integrity": "sha512-zFJ4m+pEKqtKatJNsFrk/2lHeFSbkXZ6KKXjBe7/2ayA9wAar7T/unewnOcZrrZTnCWmaxKsXWqdMFy9bXK9dw==", "dependencies": { - "@ipld/dag-cbor": "^9.0.5", - "multiformats": "^11.0.2", + "@ipld/dag-cbor": "^9.2.1", + "multiformats": "^13.3.0", "sync-multihash-sha2": "^1.0.0" } }, - "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/data-segment/node_modules/multiformats": { - "version": "11.0.2", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", - "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, "node_modules/@web3-storage/filecoin-client": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-client/-/filecoin-client-3.3.3.tgz", - "integrity": "sha512-xFL8odr5PpTjQvpfw/4jphcm7ZvcBRMSKHn3ReEaVcFjxQL45Rojjleuq/QEdMwrNfsLCqqAxC54jk55o5/ERQ==", + "version": "3.3.4", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-client/-/filecoin-client-3.3.4.tgz", + "integrity": "sha512-T2xur1NPvuH09yajyjCWEl7MBH712nqHERj51w4nDp6f8libMCKY6lca0frCrm4OC5s8oy0ZtoRFhsRYxgTzSg==", "dependencies": { "@ipld/dag-ucan": "^3.4.0", "@ucanto/client": "^9.0.1", "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/capabilities": "^16.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/capabilities": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-16.0.0.tgz", - "integrity": "sha512-wCjLpYc6t8tFRZrF2k2vBteJDWzHkmQjoJG0Yy/fjA04IjNN48iVZaCMQIANHXZxDGlYRGxhwzDwl4dovAdSTQ==", - "dependencies": { - "@ucanto/core": "^10.0.1", - "@ucanto/interface": "^10.0.1", - "@ucanto/principal": "^9.0.1", - "@ucanto/transport": "^9.1.1", - "@ucanto/validator": "^9.0.2", - "@web3-storage/data-segment": "^3.2.0", - "uint8arrays": "^5.0.3" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/data-segment": { - "version": "3.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.2.0.tgz", - "integrity": "sha512-SM6eNumXzrXiQE2/J59+eEgCRZNYPxKhRoHX2QvV3/scD4qgcf4g+paWBc3UriLEY1rCboygGoPsnqYJNyZyfA==", - "dependencies": { - "@ipld/dag-cbor": "^9.0.5", - "multiformats": "^11.0.2", - "sync-multihash-sha2": "^1.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/data-segment/node_modules/multiformats": { - "version": "11.0.2", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", - "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" + "@web3-storage/capabilities": "^17.3.0" } }, "node_modules/@web3-storage/multipart-parser": { @@ -12466,9 +12412,9 @@ } }, "node_modules/@web3-storage/upload-api": { - "version": "18.0.2", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-18.0.2.tgz", - "integrity": "sha512-IxV95h8/kb2OpwPSv8/Rew1xBfSnW9s8DL4y1r3cjN57n35XXLnsP0to4YU+FuFo6MKFNx9Yu0UrW+7GowC2vw==", + "version": "18.1.0", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-18.1.0.tgz", + "integrity": "sha512-+28NInlExKJ7ltOrN6o6jeHT4p7fX4TJFRkgCWIcImvVTfIQQPE8z8YKOshokAXPNZebQFRo7a1UrtAIbEg4mA==", "dependencies": { "@ucanto/client": "^9.0.1", "@ucanto/interface": "^10.0.1", @@ -12476,12 +12422,12 @@ "@ucanto/server": "^10.0.0", "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", - "@web3-storage/access": "^20.0.0", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/access": "^20.1.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/filecoin-api": "^7.1.1", + "@web3-storage/filecoin-api": "^7.3.2", "multiformats": "^12.1.2", "uint8arrays": "^5.0.3" }, @@ -12512,9 +12458,9 @@ "integrity": "sha512-HzdtdBwxsIkzpeXzhQ5mAhhuxcHbjEHH+JQoxt7hG/2HGFjjwyolLo7hbaexcnhoEuV4e0TNJ8kkpMjiEYY4VQ==" }, "node_modules/@web3-storage/upload-client": { - "version": "16.1.1", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-client/-/upload-client-16.1.1.tgz", - "integrity": "sha512-aVGpcqnLxRk4u3uZAum1jwC5BbEbjuqAZZBrGNZ3UZVFnnJJXPm3DE+r1WC8FV2mbgC/yKKqDMu1FP1uB9wJkA==", + "version": "17.1.0", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-client/-/upload-client-17.1.0.tgz", + "integrity": "sha512-0tUMe4Ez9gmUZjgn1Nrl6HYdGEsYyeLa6JrpoXcCGTQDBW2FehALc+GZZeoIjYQexRpw+qt9JstuJNN9dUNETw==", "dev": true, "dependencies": { "@ipld/car": "^5.2.2", @@ -12525,10 +12471,10 @@ "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/data-segment": "^5.1.0", - "@web3-storage/filecoin-client": "^3.3.3", + "@web3-storage/filecoin-client": "^3.3.4", "ipfs-utils": "^9.0.14", "multiformats": "^12.1.2", "p-retry": "^5.1.2", @@ -12600,9 +12546,9 @@ "link": true }, "node_modules/@web3-storage/w3up-client": { - "version": "14.1.1", - "resolved": "https://registry.npmjs.org/@web3-storage/w3up-client/-/w3up-client-14.1.1.tgz", - "integrity": "sha512-brBmN1aCJpjtNLwWpz0Jg1OkfsYs7r27m5VMOET9x+j4jy3DCtSqlUkHFIumLYrNhUn5oZrqh0kbWokcaUvrLg==", + "version": "16.4.0", + "resolved": "https://registry.npmjs.org/@web3-storage/w3up-client/-/w3up-client-16.4.0.tgz", + "integrity": "sha512-ndHXVufBt6bVyZHHxpe+bgS9bnOtpREsYFtL/C2h/AL8O5LZ1QBM91wFHVvLhFOJcKVcbW2WagyivCr+NOp9WA==", "dev": true, "dependencies": { "@ipld/dag-ucan": "^3.4.0", @@ -12611,12 +12557,12 @@ "@ucanto/interface": "^10.0.1", "@ucanto/principal": "^9.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/access": "^20.0.0", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/access": "^20.1.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.1" + "@web3-storage/filecoin-client": "^3.3.4", + "@web3-storage/upload-client": "^17.1.0" }, "engines": { "node": ">=18" @@ -30353,9 +30299,9 @@ "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", "@web3-storage/access": "^20.0.0", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^18.0.2", + "@web3-storage/upload-api": "^18.1.0", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "p-map": "^7.0.2", diff --git a/package.json b/package.json index 3395c5a5..77d06ac8 100644 --- a/package.json +++ b/package.json @@ -40,12 +40,12 @@ "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^20.1.0", "@web3-storage/blob-index": "^1.0.2", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/data-segment": "5.1.0", "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.0", - "@web3-storage/w3up-client": "^14.1.1", + "@web3-storage/upload-client": "^17.1.0", + "@web3-storage/w3up-client": "^16.4.0", "ava": "^4.3.3", "chalk": "4.1.2", "constructs": "10.3.0", diff --git a/stacks/billing-db-stack.js b/stacks/billing-db-stack.js index f731a423..42712a2b 100644 --- a/stacks/billing-db-stack.js +++ b/stacks/billing-db-stack.js @@ -3,6 +3,7 @@ import { customerTableProps } from '../billing/tables/customer.js' import { spaceDiffTableProps } from '../billing/tables/space-diff.js' import { spaceSnapshotTableProps } from '../billing/tables/space-snapshot.js' import { usageTableProps } from '../billing/tables/usage.js' +import { egressTrafficTableProps } from '../billing/tables/egress-traffic.js' /** * @param {import('sst/constructs').StackContext} properties @@ -15,15 +16,17 @@ export const BillingDbStack = ({ stack }) => { ...usageTableProps, stream: 'new_image' }) + const egressTrafficTable = new Table(stack, 'egress-traffic', egressTrafficTableProps) stack.addOutputs({ customerTableName: customerTable.tableName, spaceSnapshotTableName: spaceSnapshotTable.tableName, spaceDiffTableName: spaceDiffTable.tableName, - usageTable: usageTable.tableName + usageTable: usageTable.tableName, + egressTrafficTableName: egressTrafficTable.tableName }) const stripeSecretKey = new Config.Secret(stack, 'STRIPE_SECRET_KEY') - return { customerTable, spaceSnapshotTable, spaceDiffTable, usageTable, stripeSecretKey } + return { customerTable, spaceSnapshotTable, spaceDiffTable, usageTable, egressTrafficTable, stripeSecretKey } } diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index 13011b88..f8b158f3 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -17,6 +17,7 @@ export function BillingStack ({ stack, app }) { spaceSnapshotTable, spaceDiffTable, usageTable, + egressTrafficTable, stripeSecretKey } = use(BillingDbStack) const { subscriptionTable, consumerTable } = use(UploadDbStack) @@ -179,5 +180,36 @@ export function BillingStack ({ stack, app }) { CustomDomain: customDomain ? `https://${customDomain.domainName}` : 'Set BILLING_HOSTED_ZONE in env to deploy to a custom domain' }) - return { billingCron } + // Lambda that handles egress traffic tracking + const egressTrafficQueueHandler = new Function(stack, 'egress-traffic-queue-handler', { + permissions: [customerTable, egressTrafficTable], + handler: 'billing/functions/egress-traffic-queue.handler', + timeout: '15 minutes', + bind: [stripeSecretKey], + environment: { + CUSTOMER_TABLE_NAME: customerTable.tableName, + EGRESS_TRAFFIC_TABLE_NAME: egressTrafficTable.tableName, + // Billing Meter Event Name for Stripe Test and Production APIs + STRIPE_BILLING_METER_EVENT_NAME: 'gateway-egress-traffic' + } + }) + + // Queue for egress traffic tracking + const egressTrafficDLQ = new Queue(stack, 'egress-traffic-dlq', { + cdk: { queue: { retentionPeriod: Duration.days(14) } } + }) + const egressTrafficQueue = new Queue(stack, 'egress-traffic-queue', { + consumer: { + function: egressTrafficQueueHandler, + deadLetterQueue: egressTrafficDLQ.cdk.queue, + cdk: { eventSource: { batchSize: 1 } } + }, + cdk: { queue: { visibilityTimeout: Duration.minutes(15) } } + }) + + stack.addOutputs({ + EgressTrafficQueueURL: egressTrafficQueue.queueUrl + }) + + return { billingCron, egressTrafficQueue } } diff --git a/stacks/upload-api-stack.js b/stacks/upload-api-stack.js index de4f9cd5..74d6e789 100644 --- a/stacks/upload-api-stack.js +++ b/stacks/upload-api-stack.js @@ -9,6 +9,7 @@ import { import { StartingPosition, FilterCriteria, FilterRule } from 'aws-cdk-lib/aws-lambda' import { UploadDbStack } from './upload-db-stack.js' import { BillingDbStack } from './billing-db-stack.js' +import { BillingStack } from './billing-stack.js' import { CarparkStack } from './carpark-stack.js' import { FilecoinStack } from './filecoin-stack.js' import { UcanInvocationStack } from './ucan-invocation-stack.js' @@ -45,9 +46,10 @@ export function UploadApiStack({ stack, app }) { const { carparkBucket } = use(CarparkStack) const { allocationTable, storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey, contentClaimsPrivateKey } = use(UploadDbStack) const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack) - const { customerTable, spaceDiffTable, spaceSnapshotTable, stripeSecretKey } = use(BillingDbStack) + const { customerTable, spaceDiffTable, spaceSnapshotTable, egressTrafficTable, stripeSecretKey } = use(BillingDbStack) const { pieceOfferQueue, filecoinSubmitQueue } = use(FilecoinStack) const { blockAdvertPublisherQueue, blockIndexWriterQueue } = use(IndexerStack) + const { egressTrafficQueue } = use(BillingStack) // Setup API const customDomains = process.env.HOSTED_ZONES?.split(',').map(zone => getCustomDomain(stack.stage, zone)) @@ -82,6 +84,7 @@ export function UploadApiStack({ stack, app }) { pieceTable, spaceDiffTable, spaceSnapshotTable, + egressTrafficTable, carparkBucket, invocationBucket, taskBucket, @@ -91,6 +94,7 @@ export function UploadApiStack({ stack, app }) { filecoinSubmitQueue, blockAdvertPublisherQueue, blockIndexWriterQueue, + egressTrafficQueue, ], environment: { DID: process.env.UPLOAD_API_DID ?? '', @@ -119,6 +123,7 @@ export function UploadApiStack({ stack, app }) { FILECOIN_SUBMIT_QUEUE_URL: filecoinSubmitQueue.queueUrl, BLOCK_ADVERT_PUBLISHER_QUEUE_URL: blockAdvertPublisherQueue.queueUrl, BLOCK_INDEX_WRITER_QUEUE_URL: blockIndexWriterQueue.queueUrl, + EGRESS_TRAFFIC_QUEUE_URL: egressTrafficQueue.queueUrl, NAME: pkg.name, VERSION: pkg.version, COMMIT: git.commmit, diff --git a/upload-api/functions/ucan-invocation-router.js b/upload-api/functions/ucan-invocation-router.js index 16a3a6f6..0d0735a1 100644 --- a/upload-api/functions/ucan-invocation-router.js +++ b/upload-api/functions/ucan-invocation-router.js @@ -41,6 +41,7 @@ import { createStripeBillingProvider } from '../billing.js' import { createIPNIService } from '../external-services/ipni-service.js' import * as UploadAPI from '@web3-storage/upload-api' import { mustGetEnv } from '../../lib/env.js' +import { createEgressTrafficQueue } from '@web3-storage/w3infra-billing/queues/egress-traffic.js' Sentry.AWSLambda.init({ environment: process.env.SST_STAGE, @@ -120,6 +121,7 @@ export async function ucanInvocationRouter(request) { dealTrackerUrl, pieceOfferQueueUrl, filecoinSubmitQueueUrl, + egressTrafficQueueUrl, requirePaymentPlan, // set for testing dbEndpoint, @@ -201,7 +203,8 @@ export async function ucanInvocationRouter(request) { const revocationsStorage = createRevocationsTable(AWS_REGION, revocationTableName) const spaceDiffStore = createSpaceDiffStore({ region: AWS_REGION }, { tableName: spaceDiffTableName }) const spaceSnapshotStore = createSpaceSnapshotStore({ region: AWS_REGION }, { tableName: spaceSnapshotTableName }) - const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore }) + const egressTrafficQueue = createEgressTrafficQueue({ region: AWS_REGION }, { url: new URL(egressTrafficQueueUrl) }) + const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore, egressTrafficQueue }) const dealTrackerConnection = getServiceConnection({ did: dealTrackerDid, @@ -346,6 +349,7 @@ function getLambdaEnv () { spaceSnapshotTableName: mustGetEnv('SPACE_SNAPSHOT_TABLE_NAME'), pieceOfferQueueUrl: mustGetEnv('PIECE_OFFER_QUEUE_URL'), filecoinSubmitQueueUrl: mustGetEnv('FILECOIN_SUBMIT_QUEUE_URL'), + egressTrafficQueueUrl: mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL'), r2DelegationBucketEndpoint: mustGetEnv('R2_ENDPOINT'), r2DelegationBucketAccessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'), r2DelegationBucketSecretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'), diff --git a/upload-api/package.json b/upload-api/package.json index fd2f523e..4d4058dd 100644 --- a/upload-api/package.json +++ b/upload-api/package.json @@ -23,9 +23,9 @@ "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", "@web3-storage/access": "^20.0.0", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^18.0.2", + "@web3-storage/upload-api": "^18.1.0", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "p-map": "^7.0.2", diff --git a/upload-api/stores/provisions.js b/upload-api/stores/provisions.js index bff7987d..19630e4e 100644 --- a/upload-api/stores/provisions.js +++ b/upload-api/stores/provisions.js @@ -109,7 +109,8 @@ export function useProvisionStore (subscriptionTable, consumerTable, spaceMetric did: consumer, allocated, limit: 1_000_000_000, // set to an arbitrarily high number because we currently don't enforce any limits - subscription: consumerRecord.subscription + subscription: consumerRecord.subscription, + customer: consumerRecord.customer } }) : ( { error: { name: 'ConsumerNotFound', message: `could not find ${consumer}` } } diff --git a/upload-api/stores/usage.js b/upload-api/stores/usage.js index 45720254..89c01a0f 100644 --- a/upload-api/stores/usage.js +++ b/upload-api/stores/usage.js @@ -4,15 +4,16 @@ import { iterateSpaceDiffs } from '@web3-storage/w3infra-billing/lib/space-billi * @param {object} conf * @param {import('@web3-storage/w3infra-billing/lib/api').SpaceSnapshotStore} conf.spaceSnapshotStore * @param {import('@web3-storage/w3infra-billing/lib/api').SpaceDiffStore} conf.spaceDiffStore + * @param {import('@web3-storage/w3infra-billing/lib/api').EgressTrafficQueue} conf.egressTrafficQueue */ -export function useUsageStore ({ spaceSnapshotStore, spaceDiffStore }) { +export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTrafficQueue }) { return { /** * @param {import('@web3-storage/upload-api').ProviderDID} provider * @param {import('@web3-storage/upload-api').SpaceDID} space * @param {{ from: Date, to: Date }} period */ - async report (provider, space, period) { + async report(provider, space, period) { const snapResult = await spaceSnapshotStore.get({ provider, space, @@ -57,6 +58,33 @@ export function useUsageStore ({ spaceSnapshotStore, spaceDiffStore }) { events, } return { ok: report } + }, + + /** + * Handle egress traffic data and enqueues it, so the billing system can process it and update the Stripe Billing Meter API. + * + * @param {import('@web3-storage/upload-api').SpaceDID} space - The space that the egress traffic is associated with. + * @param {import('@web3-storage/upload-api').AccountDID} customer - The customer that will be billed for the egress traffic. + * @param {import('@web3-storage/upload-api').UnknownLink} resource - The resource that was served. + * @param {number} bytes - The number of bytes that were served. + * @param {Date} servedAt - The date and time when the egress traffic was served. + * @param {import('@web3-storage/upload-api').UnknownLink} cause - The UCAN invocation ID that caused the egress traffic. + * @returns {Promise>} + */ + async record(space, customer, resource, bytes, servedAt, cause) { + const record = { + space, + customer, + resource, + bytes, + servedAt, + cause + } + + const result = await egressTrafficQueue.add(record) + if (result.error) return result + + return { ok: { ...record, servedAt: servedAt.toISOString() } } } } } diff --git a/upload-api/tables/consumer.js b/upload-api/tables/consumer.js index 0cf48b8d..b2036c5d 100644 --- a/upload-api/tables/consumer.js +++ b/upload-api/tables/consumer.js @@ -98,7 +98,8 @@ export function useConsumerTable (dynamoDb, tableName) { // provider/consumer pair, but I suspect we'll never get there const record = response.Items?.map(i => unmarshall(i)).find(i => i.provider === provider) return record ? { - subscription: record.subscription + subscription: record.subscription, + customer: record.customer } : null }, diff --git a/upload-api/types.ts b/upload-api/types.ts index 20eab517..f7a5e5b7 100644 --- a/upload-api/types.ts +++ b/upload-api/types.ts @@ -184,7 +184,7 @@ export interface ConsumerListRecord { export interface ConsumerTable { /** get a consumer record for a given provider */ - get: (provider: ProviderDID, consumer: DIDKey) => Promise<{ subscription: string } | null> + get: (provider: ProviderDID, consumer: DIDKey) => Promise<{ subscription: string, customer: AccountDID } | null> /** get a consumer record for a given subscription */ getBySubscription: (provider: ProviderDID, subscription: string) => Promise<{ consumer: DID } | null> /** add a consumer - a relationship between a provider, subscription and consumer */