From 9f913acfe9fd5e3e32fbd65cb275309e988c1d11 Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini <48031343+ecioppettini@users.noreply.github.com> Date: Sat, 10 Aug 2024 02:44:21 -0300 Subject: [PATCH] emit mqtt events + refactor the api slightly --- packages/engine/paima-sm/src/index.ts | 80 +++++++++++++++---- packages/engine/paima-sm/src/types.ts | 31 ++++--- packages/engine/paima-standalone/src/sm.ts | 15 ++-- .../paima-standalone/src/utils/import.ts | 12 +-- .../paima-standalone/src/utils/input.ts | 4 +- .../paima-sdk/paima-events/src/app-events.ts | 41 ++++++++++ packages/paima-sdk/paima-events/src/index.ts | 1 + packages/paima-sdk/paima-events/src/types.ts | 9 --- .../src/helpers/paima-node-rest-schema.d.ts | 58 ++++++++++++++ 9 files changed, 204 insertions(+), 47 deletions(-) create mode 100644 packages/paima-sdk/paima-events/src/app-events.ts diff --git a/packages/engine/paima-sm/src/index.ts b/packages/engine/paima-sm/src/index.ts index e94d4877..32e64ba3 100644 --- a/packages/engine/paima-sm/src/index.ts +++ b/packages/engine/paima-sm/src/index.ts @@ -52,9 +52,9 @@ import type { import { ConfigNetworkType } from '@paima/utils'; import assertNever from 'assert-never'; import { keccak_256 } from 'js-sha3'; -import { toTopicHash } from '@paima/events'; -import type { LogEventFields } from '@paima/events'; -import type { TSchema } from '@sinclair/typebox'; +import type { EventPathAndDef, generateAppEvents, ResolvedPath } from '@paima/events'; +import { PaimaEventManager } from '@paima/events'; +import { PaimaEventBroker } from '@paima/broker'; export * from './types.js'; export type * from './types.js'; @@ -72,6 +72,10 @@ const SM: GameStateMachineInitializer = { const persistentReadonlyDBConn: Client = getPersistentConnection(databaseInfo); const readonlyDBConn: Pool = getConnection(databaseInfo, true); + if (ENV.MQTT_BROKER) { + new PaimaEventBroker('Paima-Engine').getServer(); + } + return { latestProcessedBlockHeight: async ( dbTx: PoolClient | Pool = readonlyDBConn @@ -109,8 +113,8 @@ const SM: GameStateMachineInitializer = { dbTx, Object.values(events).flatMap(eventsByName => eventsByName.flatMap(event => - event.fields.map(f => ({ - topic: toTopicHash(event), + event.definition.fields.map(f => ({ + topic: event.topicHash, fieldName: f.name, indexed: f.indexed, })) @@ -125,8 +129,8 @@ const SM: GameStateMachineInitializer = { dbTx, Object.values(events).flatMap(eventByName => eventByName.flatMap(event => ({ - name: event.name, - topic: toTopicHash(event), + name: event.definition.name, + topic: event.topicHash, })) ) ) @@ -293,7 +297,8 @@ const SM: GameStateMachineInitializer = { gameStateTransition, randomnessGenerator, precompiles.precompiles, - indexForEventByTx + indexForEventByTx, + events ); // Execute user submitted input data @@ -303,7 +308,8 @@ const SM: GameStateMachineInitializer = { gameStateTransition, randomnessGenerator, indexForEventByTx, - scheduledInputsLength + scheduledInputsLength, + events ); const processedCount = cdeDataLength + userInputsLength + scheduledInputsLength; @@ -422,13 +428,14 @@ async function processPaginatedCdeData( // Process all of the scheduled data inputs by running each of them through the game STF, // saving the results to the DB, and deleting the schedule data all together in one postgres tx. // Function returns number of scheduled inputs that were processed. -async function processScheduledData( +async function processScheduledData( latestChainData: ChainData, DBConn: PoolClient, - gameStateTransition: GameStateTransitionFunction, + gameStateTransition: GameStateTransitionFunction, randomnessGenerator: Prando, precompiles: Precompiles['precompiles'], - indexForEvent: (txHash: string) => number + indexForEvent: (txHash: string) => number, + eventDefinitions: ReturnType ): Promise { const scheduledData = await getScheduledDataByBlockHeight.run( { block_height: latestChainData.blockNumber }, @@ -503,6 +510,7 @@ async function processScheduledData( // Trigger STF let sqlQueries: SQLUpdate[] = []; + let eventsToEmit: [any, ResolvedPath & Event['type']][] = []; try { const { stateTransitions, events } = await gameStateTransition( inputData, @@ -526,6 +534,16 @@ async function processScheduledData( block_height: latestChainData.blockNumber, }, ]); + + const eventDefinition = eventDefinitions[event.data.name].find( + eventDefinition => eventDefinition.topicHash === event.data.topic + ); + + if (!eventDefinition) { + throw new Error('Event definition not found'); + } + + eventsToEmit.push([eventDefinition as any, event.data.fields]); } } catch (err) { // skip scheduled data where the STF fails @@ -533,11 +551,19 @@ async function processScheduledData( continue; } if (sqlQueries.length !== 0) { - await tryOrRollback(DBConn, async () => { + const success = await tryOrRollback(DBConn, async () => { for (const [query, params] of sqlQueries) { await query.run(params, DBConn); } + + return true; }); + + if (ENV.MQTT_BROKER && success) { + for (const [eventDefinition, fields] of eventsToEmit) { + await PaimaEventManager.Instance.sendMessage(eventDefinition, fields); + } + } } } catch (e) { logError(e); @@ -554,13 +580,14 @@ async function processScheduledData( // Process all of the user inputs data inputs by running each of them through the game STF, // saving the results to the DB with the nonces, all together in one postgres tx. // Function returns number of user inputs that were processed. -async function processUserInputs( +async function processUserInputs( latestChainData: ChainData, DBConn: PoolClient, - gameStateTransition: GameStateTransitionFunction, + gameStateTransition: GameStateTransitionFunction, randomnessGenerator: Prando, indexForEvent: (txHash: string) => number, - txIndexInBlock: number + txIndexInBlock: number, + eventDefinitions: ReturnType ): Promise { for (const submittedData of latestChainData.submittedData) { // Check nonce is valid @@ -608,6 +635,7 @@ async function processUserInputs( // Trigger STF let sqlQueries: SQLUpdate[] = []; + let eventsToEmit: [any, ResolvedPath & Event['type']][] = []; try { const { stateTransitions, events } = await gameStateTransition( inputData, @@ -631,6 +659,16 @@ async function processUserInputs( block_height: latestChainData.blockNumber, }, ]); + + const eventDefinition = eventDefinitions[event.data.name].find( + eventDefinition => eventDefinition.topicHash === event.data.topic + ); + + if (!eventDefinition) { + throw new Error('Event definition not found'); + } + + eventsToEmit.push([eventDefinition as any, event.data.fields]); } } catch (err) { // skip inputs where the STF fails @@ -638,11 +676,19 @@ async function processUserInputs( continue; } if (sqlQueries.length !== 0) { - await tryOrRollback(DBConn, async () => { + const success = await tryOrRollback(DBConn, async () => { for (const [query, params] of sqlQueries) { await query.run(params, DBConn); } + + return true; }); + + if (ENV.MQTT_BROKER && success) { + for (const [eventDefinition, fields] of eventsToEmit) { + await PaimaEventManager.Instance.sendMessage(eventDefinition, fields); + } + } } } finally { txIndexInBlock += 1; diff --git a/packages/engine/paima-sm/src/types.ts b/packages/engine/paima-sm/src/types.ts index 49330381..aff63c44 100644 --- a/packages/engine/paima-sm/src/types.ts +++ b/packages/engine/paima-sm/src/types.ts @@ -18,10 +18,17 @@ import type { IERC1155Contract, BlockHeader, } from '@paima/utils'; -import { Type } from '@sinclair/typebox'; +import { Type, TSchema } from '@sinclair/typebox'; import type { Static } from '@sinclair/typebox'; import type { ProjectedNftStatus } from '@dcspark/carp-client'; -import type { genEvent } from '@paima/events'; +import type { + EventPathAndDef, + generateAppEvents, + genEvent, + LogEvent, + LogEventFields, + ResolvedPath, +} from '@paima/events'; export { SubmittedChainData, SubmittedData }; @@ -645,31 +652,37 @@ export type ChainDataExtension = ( | ChainDataExtensionDynamicEvmPrimitive ) & { network: string | undefined }; -export type GameStateTransitionFunctionRouter = ( +export type GameStateTransitionFunctionRouter = ( blockHeight: number -) => GameStateTransitionFunction; +) => GameStateTransitionFunction; -export type GameStateTransitionFunction = ( +export type GameStateTransitionFunction = ( inputData: STFSubmittedData, blockHeader: BlockHeader, randomnessGenerator: any, DBConn: PoolClient ) => Promise<{ stateTransitions: SQLUpdate[]; - events: { address: string; data: { name: string; fields: any; topic: string } }[]; + events: { + address: string; + data: { + name: string; + fields: ResolvedPath & Event['type']; + topic: string; + }; + }[]; }>; export type Precompiles = { precompiles: { [name: string]: `0x${string}` } }; -export type UserEvents = Record[]>; export interface GameStateMachineInitializer { initialize: ( databaseInfo: PoolConfig, randomnessProtocolEnum: number, - gameStateTransitionRouter: GameStateTransitionFunctionRouter, + gameStateTransitionRouter: GameStateTransitionFunctionRouter, startBlockHeight: number, precompiles: Precompiles, - events: UserEvents + events: ReturnType ) => GameStateMachine; } diff --git a/packages/engine/paima-standalone/src/sm.ts b/packages/engine/paima-standalone/src/sm.ts index 225c5625..a5ee001a 100644 --- a/packages/engine/paima-standalone/src/sm.ts +++ b/packages/engine/paima-standalone/src/sm.ts @@ -1,20 +1,25 @@ import type { GameStateMachine } from '@paima/sm'; import PaimaSM from '@paima/sm'; -import type { PreCompilesImport } from './utils/import.js'; -import { importGameStateTransitionRouter, importEvents } from './utils/import.js'; +import type { AppEventsImport, PreCompilesImport } from './utils/import.js'; +import { importGameStateTransitionRouter } from './utils/import.js'; import { poolConfig } from './utils/index.js'; import { ENV } from '@paima/utils'; -export const gameSM = (precompiles: PreCompilesImport): GameStateMachine => { +export const gameSM = ( + precompiles: PreCompilesImport, + gameEvents: AppEventsImport +): GameStateMachine => { const gameStateTransitionRouter = importGameStateTransitionRouter(); - const events = importEvents(); return PaimaSM.initialize( poolConfig, 4, // https://xkcd.com/221/ + // there is no way of statically generating the event type here, since it's + // imported at runtime. + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument gameStateTransitionRouter, ENV.START_BLOCKHEIGHT, precompiles, - events.GameEvents + gameEvents.events ); }; diff --git a/packages/engine/paima-standalone/src/utils/import.ts b/packages/engine/paima-standalone/src/utils/import.ts index 2d973870..7729f884 100644 --- a/packages/engine/paima-standalone/src/utils/import.ts +++ b/packages/engine/paima-standalone/src/utils/import.ts @@ -7,7 +7,7 @@ import fs from 'fs'; import type { GameStateTransitionFunctionRouter } from '@paima/sm'; import type { TsoaFunction } from '@paima/runtime'; import type { AchievementMetadata } from '@paima/utils-backend'; -import type { genEvent } from '@paima/events'; +import type { generateAppEvents } from '@paima/events'; /** * Checks that the user packed their game code and it is available for Paima Engine to use to run @@ -25,13 +25,13 @@ function importFile(path: string): T { } export interface GameCodeImport { - default: GameStateTransitionFunctionRouter; + default: GameStateTransitionFunctionRouter; } const ROUTER_FILENAME = 'packaged/gameCode.cjs'; /** * Reads repackaged user's code placed next to the executable in `gameCode.cjs` file */ -export function importGameStateTransitionRouter(): GameStateTransitionFunctionRouter { +export function importGameStateTransitionRouter(): GameStateTransitionFunctionRouter { return importFile(ROUTER_FILENAME).default; } @@ -70,11 +70,11 @@ export function importPrecompiles(): PreCompilesImport { } const EVENTS_FILENAME = 'packaged/events.cjs'; -export type UserEvents = { GameEvents: Record[]> }; +export type AppEventsImport = { events: ReturnType }; /** * Reads repackaged user's code placed next to the executable in `events.cjs` file */ -export function importEvents(): UserEvents { - return importFile(EVENTS_FILENAME); +export function importEvents(): AppEventsImport { + return importFile(EVENTS_FILENAME); } diff --git a/packages/engine/paima-standalone/src/utils/input.ts b/packages/engine/paima-standalone/src/utils/input.ts index 855723af..75c86472 100644 --- a/packages/engine/paima-standalone/src/utils/input.ts +++ b/packages/engine/paima-standalone/src/utils/input.ts @@ -28,6 +28,7 @@ import { importOpenApiJson, importPrecompiles, importEndpoints, + importEvents, } from './import.js'; import type { Template } from './types.js'; import RegisterRoutes, { EngineService } from '@paima/rest'; @@ -141,7 +142,8 @@ export const runPaimaEngine = async (): Promise => { // Import & initialize state machine const precompilesImport = importPrecompiles(); - const stateMachine = gameSM(precompilesImport); + const eventsImport = importEvents(); + const stateMachine = gameSM(precompilesImport, eventsImport); console.log(`Connecting to database at ${poolConfig.host}:${poolConfig.port}`); const dbConn = await stateMachine.getReadonlyDbConn().connect(); const funnelFactory = await FunnelFactory.initialize(dbConn); diff --git a/packages/paima-sdk/paima-events/src/app-events.ts b/packages/paima-sdk/paima-events/src/app-events.ts new file mode 100644 index 00000000..4f962c7f --- /dev/null +++ b/packages/paima-sdk/paima-events/src/app-events.ts @@ -0,0 +1,41 @@ +import type { LogEvent, LogEventFields } from './types.js'; +import { toPath, TopicPrefix } from './types.js'; +import type { TSchema } from '@sinclair/typebox'; +import { keccak_256 } from 'js-sha3'; + +export function toSignature[]>>(event: T): string { + return event.name + '(' + event.fields.map(f => f.type.type).join(',') + ')'; +} + +export function toTopicHash[]>>(event: T): string { + return keccak_256(toSignature(event)); +} + +export const generateAppEvents = < + const T extends ReadonlyArray[]>>, +>( + entries: T +): { + [K in T[number] as K['name']]: ReturnType> & + { definition: T[0]; topicHash: string }[]; +} => { + let result: Record = {}; // we can't know the type here + for (const event of entries) { + if (!result[event.name]) { + result[event.name] = []; + } + + result[event.name].push({ + ...toPath(TopicPrefix.App, event), + // keep the original definition around since it's nicer to work with, it + // also has the advantage that it allows recovering the initial order in + // case the signature/topicHash needs to be computed again, which can't be + // done from the path (since you don't know which non indexed fields go in + // between each indexed field). + definition: event, + // we add this to avoid having to re-compute it all the time. + topicHash: toTopicHash(event), + }); + } + return result as any; +}; diff --git a/packages/paima-sdk/paima-events/src/index.ts b/packages/paima-sdk/paima-events/src/index.ts index 4606674a..752f2d1d 100644 --- a/packages/paima-sdk/paima-events/src/index.ts +++ b/packages/paima-sdk/paima-events/src/index.ts @@ -1,3 +1,4 @@ +export * from './app-events.js'; export * from './event-manager.js'; export * from './builtin-events.js'; export * from './builtin-event-utils.js'; diff --git a/packages/paima-sdk/paima-events/src/types.ts b/packages/paima-sdk/paima-events/src/types.ts index 3ce491d5..3b8d2168 100644 --- a/packages/paima-sdk/paima-events/src/types.ts +++ b/packages/paima-sdk/paima-events/src/types.ts @@ -11,7 +11,6 @@ import type { TSchema, } from '@sinclair/typebox'; import assertNever from 'assert-never'; -import { keccak_256 } from 'js-sha3'; export enum PaimaEventBrokerNames { PaimaEngine = 'paima-engine', @@ -414,11 +413,3 @@ type DisallowComplexFields[]> = { type DisallowComplexEventFields[] }> = { fields: DisallowComplexFields; }; - -export function toSignature[]>>(event: T): string { - return event.name + '(' + event.fields.map(f => f.type.type).join(',') + ')'; -} - -export function toTopicHash[]>>(event: T): string { - return keccak_256(toSignature(event)); -} diff --git a/packages/paima-sdk/paima-mw-core/src/helpers/paima-node-rest-schema.d.ts b/packages/paima-sdk/paima-mw-core/src/helpers/paima-node-rest-schema.d.ts index 7f58371e..a1d16c79 100644 --- a/packages/paima-sdk/paima-mw-core/src/helpers/paima-node-rest-schema.d.ts +++ b/packages/paima-sdk/paima-mw-core/src/helpers/paima-node-rest-schema.d.ts @@ -148,6 +148,22 @@ export interface paths { patch?: never; trace?: never; }; + "/get_logs": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + get?: never; + put?: never; + post: operations["GetLogsPost"]; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/achievements/public/list": { parameters: { query?: never; @@ -281,6 +297,24 @@ export interface components { result: components["schemas"]["TransactionContentResponse"]; }; Result_TransactionContentResponse_: components["schemas"]["SuccessfulResult_TransactionContentResponse_"] | components["schemas"]["FailedResult"]; + "SuccessfulResult_any-Array_": { + /** @enum {boolean} */ + success: true; + result: unknown[]; + }; + "Result_any-Array_": components["schemas"]["SuccessfulResult_any-Array_"] | components["schemas"]["FailedResult"]; + GetLogsResponse: components["schemas"]["Result_any-Array_"]; + GetLogsParams: { + topic: string; + filters?: { + [key: string]: string; + }; + address?: string; + /** Format: double */ + toBlock?: number; + /** Format: double */ + fromBlock?: number; + }; Achievement: { /** @description Unique Achievement String */ name: string; @@ -769,6 +803,30 @@ export interface operations { }; }; }; + GetLogsPost: { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + requestBody: { + content: { + "application/json": components["schemas"]["GetLogsParams"]; + }; + }; + responses: { + /** @description Ok */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["GetLogsResponse"]; + }; + }; + }; + }; AchievementsPublic_list: { parameters: { query?: {