Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Egress Traffic Tracking + Stripe Billing Meters #430

Merged
merged 15 commits into from
Oct 30, 2024
Merged
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ jobs:
AWS_ACCESS_KEY_ID: 'NOSUCH'
AWS_SECRET_ACCESS_KEY: 'NOSUCH'
STRIPE_TEST_SECRET_KEY: ${{ secrets.STRIPE_TEST_SECRET_KEY }}
STRIPE_BILLING_METER_ID: ${{ vars.STRIPE_BILLING_METER_ID }}
STRIPE_BILLING_METER_EVENT_NAME: ${{ vars.STRIPE_BILLING_METER_EVENT_NAME }}
110 changes: 110 additions & 0 deletions billing/data/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { Link } from '@ucanto/server'
import { DecodeFailure, EncodeFailure, Schema } from './lib.js'

/**
* @typedef { import('../types').InferStoreRecord<import('../lib/api').EgressTrafficData> } EgressTrafficStoreRecord
* @typedef { import('../types').InferStoreRecord<import('../lib/api').EgressTrafficEventListKey> } EgressTrafficKeyStoreRecord
*/

export const egressSchema = Schema.struct({
fforbeck marked this conversation as resolved.
Show resolved Hide resolved
space: Schema.did({ method: 'key' }),
customer: Schema.did({ method: 'mailto' }),
resource: Schema.link(),
bytes: Schema.number(),
servedAt: Schema.date(),
cause: Schema.link(),
})

/** @type {import('../lib/api').Validator<import('../lib/api').EgressTrafficData>} */
export const validate = input => egressSchema.read(input)

/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficData, EgressTrafficStoreRecord>} */
export const encode = input => {
try {
return {
ok: {
space: input.space.toString(),
customer: input.customer.toString(),
resource: input.resource.toString(),
bytes: Number(input.bytes),
servedAt: input.servedAt.toISOString(),
cause: input.cause.toString(),
}
}
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficData, string>} */
export const encodeStr = input => {
try {
const data = encode(input)
if (data.error) throw data.error
return { ok: JSON.stringify(data.ok) }
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<import('../types.js').StoreRecord, import('../lib/api').EgressTrafficData>} */
export const decode = input => {
try {
return {
ok: {
space: Schema.did({ method: 'key' }).from(input.space),
customer: Schema.did({ method: 'mailto' }).from(input.customer),
resource: Link.parse(/** @type {string} */(input.resource)),
bytes: Number(input.bytes),
servedAt: new Date(input.servedAt),
cause: Link.parse(/** @type {string} */(input.cause)),
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<string, import('../lib/api').EgressTrafficData>} */
export const decodeStr = input => {
try {
return decode(JSON.parse(input))
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err })
}
}
}

export const lister = {
/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficEventListKey, EgressTrafficKeyStoreRecord>} */
encodeKey: input => ({
ok: {
space: input.space.toString(),
customer: input.customer.toString(),
from: input.from.toISOString()
}
}),
/** @type {import('../lib/api').Decoder<EgressTrafficKeyStoreRecord, import('../lib/api').EgressTrafficEventListKey>} */
decodeKey: input => {
try {
return {
ok: {
space: Schema.did({ method: 'key' }).from(input.space),
customer: Schema.did({ method: 'mailto' }).from(input.customer),
from: new Date(input.from)
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress traffic event list key: ${err.message}`, { cause: err })
}
}
}
}
95 changes: 95 additions & 0 deletions billing/functions/egress-traffic-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import * as Sentry from '@sentry/serverless'
import { expect } from './lib.js'
import { decodeStr } from '../data/egress.js'
import { mustGetEnv } from '../../lib/env.js'
import { createCustomerStore } from '../tables/customer.js'
import Stripe from 'stripe'
import { Config } from 'sst/node/config'
import { recordBillingMeterEvent } from '../utils/stripe.js'
import { createEgressTrafficEventStore } from '../tables/egress-traffic.js'


Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0
})

/**
* @typedef {{
* region?: 'us-west-2'|'us-east-2'
* egressTrafficQueueUrl?: string
* customerTable?: string
* billingMeterName?: string
* stripeSecretKey?: string
* customerStore?: import('../lib/api.js').CustomerStore
* egressTrafficTable?: string
* egressTrafficEventStore?: import('../lib/api.js').EgressTrafficEventStore
* }} CustomHandlerContext
*/

/**
* AWS Lambda handler to process egress events from the egress traffic queue.
* Each event is a JSON object with `customer`, `resource`, `bytes` and `servedAt`.
* The message is then deleted from the queue when successful.
*/
export const handler = Sentry.AWSLambda.wrapHandler(
/**
* @param {import('aws-lambda').SQSEvent} event
* @param {import('aws-lambda').Context} context
*/
async (event, context) => {
/** @type {CustomHandlerContext|undefined} */
const customContext = context?.clientContext?.Custom
const region = customContext?.region ?? mustGetEnv('AWS_REGION')
const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME')
const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable })
const egressTrafficTable = customContext?.egressTrafficTable ?? mustGetEnv('EGRESS_TRAFFIC_TABLE_NAME')
const egressTrafficEventStore = customContext?.egressTrafficEventStore ?? createEgressTrafficEventStore({ region }, { tableName: egressTrafficTable })

const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY
if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY')

const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME')
if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME')

const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' })
const batchItemFailures = []
for (const record of event.Records) {
try {
const decoded = decodeStr(record.body)
const egressData = expect(decoded, 'Failed to decode egress event')

const putResult = await egressTrafficEventStore.put(egressData)
if (putResult.error) throw putResult.error

const response = await customerStore.get({ customer: egressData.customer })
if (response.error) {
return {
error: {
name: 'CustomerNotFound',
message: `Error getting customer ${egressData.customer}`,
cause: response.error
}
}
}
const customerAccount = response.ok.account

expect(
await recordBillingMeterEvent(stripe, billingMeterName, egressData, customerAccount),
`Failed to record egress event in Stripe API for customer: ${egressData.customer}, account: ${customerAccount}, bytes: ${egressData.bytes}, servedAt: ${egressData.servedAt.toISOString()}, resource: ${egressData.resource}`
)
} catch (error) {
console.error('Error processing egress event:', error)
batchItemFailures.push({ itemIdentifier: record.messageId })
}
}

return {
statusCode: 200,
body: 'Egress events processed successfully',
// Return the failed records so they can be retried
batchItemFailures
}
},
)
35 changes: 34 additions & 1 deletion billing/lib/api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure } from '@ucanto/interface'
import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure, UnknownLink } from '@ucanto/interface'

// Billing stores /////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -133,6 +133,11 @@ export interface UsageListKey { customer: CustomerDID, from: Date }

export type UsageStore = StorePutter<Usage>

/**
* Store for egress traffic data.
*/
export type EgressTrafficEventStore = StorePutter<EgressTrafficData> & StoreLister<EgressTrafficEventListKey, EgressTrafficData>

// Billing queues /////////////////////////////////////////////////////////////

/**
Expand All @@ -158,6 +163,34 @@ export interface CustomerBillingInstruction {

export type CustomerBillingQueue = QueueAdder<CustomerBillingInstruction>

/**
* Captures details about egress traffic that should be billed for a given period
*/
export interface EgressTrafficData {
/** Space DID (did:key:...). */
space: ConsumerDID
/** Customer DID (did:mailto:...). */
customer: CustomerDID
/** Resource that was served. */
resource: UnknownLink
/** Number of bytes that were served. */
bytes: number
/** Time the egress traffic was served at. */
servedAt: Date
/** UCAN invocation IDthat caused the egress traffic. */
cause: UnknownLink
}

/**
* Queue for egress traffic data.
*/
export type EgressTrafficQueue = QueueAdder<EgressTrafficData>

/**
* List key for egress traffic data.
*/
export interface EgressTrafficEventListKey { space: ConsumerDID, customer: CustomerDID, from: Date }

/**
* Captures details about a space that should be billed for a given customer in
* the given period of usage.
Expand Down
3 changes: 2 additions & 1 deletion billing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"type": "module",
"scripts": {
"test": "entail '**/*.spec.js'",
"test-only": "entail",
"coverage": "c8 -r text -r html npm test"
},
"dependencies": {
Expand All @@ -13,7 +14,7 @@
"@sentry/serverless": "^7.74.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/server": "^10.0.0",
"@web3-storage/capabilities": "^17.1.1",
"@web3-storage/capabilities": "^17.4.0",
"big.js": "^6.2.1",
"lru-cache": "^11.0.0",
"multiformats": "^13.1.0",
Expand Down
9 changes: 9 additions & 0 deletions billing/queues/egress-traffic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { createQueueAdderClient } from './client.js'
import { encodeStr, validate } from '../data/egress.js'

/**
* @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf
* @param {{ url: URL }} context
*/
export const createEgressTrafficQueue = (conf, { url }) =>
createQueueAdderClient(conf, { url, encode: encodeStr, validate })
42 changes: 42 additions & 0 deletions billing/tables/egress-traffic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { createStorePutterClient, createStoreListerClient } from './client.js'
import { validate, encode, lister, decode } from '../data/egress.js'

/**
* Source of truth for egress traffic data.
*
* @type {import('sst/constructs').TableProps}
*/
export const egressTrafficTableProps = {
fields: {
/** Space DID (did:key:...). */
space: 'string',
/** Customer DID (did:mailto:...). */
customer: 'string',
/** Resource CID. */
resource: 'string',
/** ISO timestamp of the event. */
servedAt: 'string',
/** Bytes served. */
bytes: 'number',
/** UCAN invocation ID that caused the egress traffic. */
cause: 'string',
},
primaryIndex: { partitionKey: 'space', sortKey: 'servedAt' },
globalIndexes: {
customer: {
partitionKey: 'customer',
sortKey: 'servedAt',
projection: ['space', 'resource', 'bytes', 'cause', 'servedAt']
}
}
}

/**
* @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf
* @param {{ tableName: string }} context
* @returns {import('../lib/api.js').EgressTrafficEventStore}
*/
export const createEgressTrafficEventStore = (conf, { tableName }) => ({
...createStorePutterClient(conf, { tableName, validate, encode }),
...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode })
})
Loading
Loading