Skip to content

Commit

Permalink
emit mqtt events + refactor the api slightly
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Aug 10, 2024
1 parent e11a13f commit 9f913ac
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 47 deletions.
80 changes: 63 additions & 17 deletions packages/engine/paima-sm/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ import type {
import { ConfigNetworkType } from '@paima/utils';
import assertNever from 'assert-never';
import { keccak_256 } from 'js-sha3';
import { toTopicHash } from '@paima/events';
import type { LogEventFields } from '@paima/events';
import type { TSchema } from '@sinclair/typebox';
import type { EventPathAndDef, generateAppEvents, ResolvedPath } from '@paima/events';
import { PaimaEventManager } from '@paima/events';
import { PaimaEventBroker } from '@paima/broker';

export * from './types.js';
export type * from './types.js';
Expand All @@ -72,6 +72,10 @@ const SM: GameStateMachineInitializer = {
const persistentReadonlyDBConn: Client = getPersistentConnection(databaseInfo);
const readonlyDBConn: Pool = getConnection(databaseInfo, true);

if (ENV.MQTT_BROKER) {
new PaimaEventBroker('Paima-Engine').getServer();
}

return {
latestProcessedBlockHeight: async (
dbTx: PoolClient | Pool = readonlyDBConn
Expand Down Expand Up @@ -109,8 +113,8 @@ const SM: GameStateMachineInitializer = {
dbTx,
Object.values(events).flatMap(eventsByName =>
eventsByName.flatMap(event =>
event.fields.map(f => ({
topic: toTopicHash(event),
event.definition.fields.map(f => ({
topic: event.topicHash,
fieldName: f.name,
indexed: f.indexed,
}))
Expand All @@ -125,8 +129,8 @@ const SM: GameStateMachineInitializer = {
dbTx,
Object.values(events).flatMap(eventByName =>
eventByName.flatMap(event => ({
name: event.name,
topic: toTopicHash(event),
name: event.definition.name,
topic: event.topicHash,
}))
)
)
Expand Down Expand Up @@ -293,7 +297,8 @@ const SM: GameStateMachineInitializer = {
gameStateTransition,
randomnessGenerator,
precompiles.precompiles,
indexForEventByTx
indexForEventByTx,
events
);

// Execute user submitted input data
Expand All @@ -303,7 +308,8 @@ const SM: GameStateMachineInitializer = {
gameStateTransition,
randomnessGenerator,
indexForEventByTx,
scheduledInputsLength
scheduledInputsLength,
events
);

const processedCount = cdeDataLength + userInputsLength + scheduledInputsLength;
Expand Down Expand Up @@ -422,13 +428,14 @@ async function processPaginatedCdeData(
// Process all of the scheduled data inputs by running each of them through the game STF,
// saving the results to the DB, and deleting the schedule data all together in one postgres tx.
// Function returns number of scheduled inputs that were processed.
async function processScheduledData(
async function processScheduledData<Event extends EventPathAndDef>(
latestChainData: ChainData,
DBConn: PoolClient,
gameStateTransition: GameStateTransitionFunction,
gameStateTransition: GameStateTransitionFunction<Event>,
randomnessGenerator: Prando,
precompiles: Precompiles['precompiles'],
indexForEvent: (txHash: string) => number
indexForEvent: (txHash: string) => number,
eventDefinitions: ReturnType<typeof generateAppEvents>
): Promise<number> {
const scheduledData = await getScheduledDataByBlockHeight.run(
{ block_height: latestChainData.blockNumber },
Expand Down Expand Up @@ -503,6 +510,7 @@ async function processScheduledData(

// Trigger STF
let sqlQueries: SQLUpdate[] = [];
let eventsToEmit: [any, ResolvedPath<Event['path']> & Event['type']][] = [];
try {
const { stateTransitions, events } = await gameStateTransition(
inputData,
Expand All @@ -526,18 +534,36 @@ async function processScheduledData(
block_height: latestChainData.blockNumber,
},
]);

const eventDefinition = eventDefinitions[event.data.name].find(
eventDefinition => eventDefinition.topicHash === event.data.topic
);

if (!eventDefinition) {
throw new Error('Event definition not found');
}

eventsToEmit.push([eventDefinition as any, event.data.fields]);
}
} catch (err) {
// skip scheduled data where the STF fails
doLog(`[paima-sm] Error on scheduled data STF call. Skipping`, err);
continue;
}
if (sqlQueries.length !== 0) {
await tryOrRollback(DBConn, async () => {
const success = await tryOrRollback(DBConn, async () => {
for (const [query, params] of sqlQueries) {
await query.run(params, DBConn);
}

return true;
});

if (ENV.MQTT_BROKER && success) {
for (const [eventDefinition, fields] of eventsToEmit) {
await PaimaEventManager.Instance.sendMessage(eventDefinition, fields);
}
}
}
} catch (e) {
logError(e);
Expand All @@ -554,13 +580,14 @@ async function processScheduledData(
// Process all of the user inputs data inputs by running each of them through the game STF,
// saving the results to the DB with the nonces, all together in one postgres tx.
// Function returns number of user inputs that were processed.
async function processUserInputs(
async function processUserInputs<Event extends EventPathAndDef>(
latestChainData: ChainData,
DBConn: PoolClient,
gameStateTransition: GameStateTransitionFunction,
gameStateTransition: GameStateTransitionFunction<Event>,
randomnessGenerator: Prando,
indexForEvent: (txHash: string) => number,
txIndexInBlock: number
txIndexInBlock: number,
eventDefinitions: ReturnType<typeof generateAppEvents>
): Promise<number> {
for (const submittedData of latestChainData.submittedData) {
// Check nonce is valid
Expand Down Expand Up @@ -608,6 +635,7 @@ async function processUserInputs(

// Trigger STF
let sqlQueries: SQLUpdate[] = [];
let eventsToEmit: [any, ResolvedPath<Event['path']> & Event['type']][] = [];
try {
const { stateTransitions, events } = await gameStateTransition(
inputData,
Expand All @@ -631,18 +659,36 @@ async function processUserInputs(
block_height: latestChainData.blockNumber,
},
]);

const eventDefinition = eventDefinitions[event.data.name].find(
eventDefinition => eventDefinition.topicHash === event.data.topic
);

if (!eventDefinition) {
throw new Error('Event definition not found');
}

eventsToEmit.push([eventDefinition as any, event.data.fields]);
}
} catch (err) {
// skip inputs where the STF fails
doLog(`[paima-sm] Error on user input STF call. Skipping`, err);
continue;
}
if (sqlQueries.length !== 0) {
await tryOrRollback(DBConn, async () => {
const success = await tryOrRollback(DBConn, async () => {
for (const [query, params] of sqlQueries) {
await query.run(params, DBConn);
}

return true;
});

if (ENV.MQTT_BROKER && success) {
for (const [eventDefinition, fields] of eventsToEmit) {
await PaimaEventManager.Instance.sendMessage(eventDefinition, fields);
}
}
}
} finally {
txIndexInBlock += 1;
Expand Down
31 changes: 22 additions & 9 deletions packages/engine/paima-sm/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ import type {
IERC1155Contract,
BlockHeader,
} from '@paima/utils';
import { Type } from '@sinclair/typebox';
import { Type, TSchema } from '@sinclair/typebox';
import type { Static } from '@sinclair/typebox';
import type { ProjectedNftStatus } from '@dcspark/carp-client';
import type { genEvent } from '@paima/events';
import type {
EventPathAndDef,
generateAppEvents,
genEvent,
LogEvent,
LogEventFields,
ResolvedPath,
} from '@paima/events';

export { SubmittedChainData, SubmittedData };

Expand Down Expand Up @@ -645,31 +652,37 @@ export type ChainDataExtension = (
| ChainDataExtensionDynamicEvmPrimitive
) & { network: string | undefined };

export type GameStateTransitionFunctionRouter = (
export type GameStateTransitionFunctionRouter<Event extends EventPathAndDef> = (
blockHeight: number
) => GameStateTransitionFunction;
) => GameStateTransitionFunction<Event>;

export type GameStateTransitionFunction = (
export type GameStateTransitionFunction<Event extends EventPathAndDef> = (
inputData: STFSubmittedData,
blockHeader: BlockHeader,
randomnessGenerator: any,
DBConn: PoolClient
) => Promise<{
stateTransitions: SQLUpdate[];
events: { address: string; data: { name: string; fields: any; topic: string } }[];
events: {
address: string;
data: {
name: string;
fields: ResolvedPath<Event['path']> & Event['type'];
topic: string;
};
}[];
}>;

export type Precompiles = { precompiles: { [name: string]: `0x${string}` } };
export type UserEvents = Record<string, ReturnType<typeof genEvent>[]>;

export interface GameStateMachineInitializer {
initialize: (
databaseInfo: PoolConfig,
randomnessProtocolEnum: number,
gameStateTransitionRouter: GameStateTransitionFunctionRouter,
gameStateTransitionRouter: GameStateTransitionFunctionRouter<any>,
startBlockHeight: number,
precompiles: Precompiles,
events: UserEvents
events: ReturnType<typeof generateAppEvents>
) => GameStateMachine;
}

Expand Down
15 changes: 10 additions & 5 deletions packages/engine/paima-standalone/src/sm.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import type { GameStateMachine } from '@paima/sm';
import PaimaSM from '@paima/sm';
import type { PreCompilesImport } from './utils/import.js';
import { importGameStateTransitionRouter, importEvents } from './utils/import.js';
import type { AppEventsImport, PreCompilesImport } from './utils/import.js';
import { importGameStateTransitionRouter } from './utils/import.js';
import { poolConfig } from './utils/index.js';
import { ENV } from '@paima/utils';

export const gameSM = (precompiles: PreCompilesImport): GameStateMachine => {
export const gameSM = (
precompiles: PreCompilesImport,
gameEvents: AppEventsImport
): GameStateMachine => {
const gameStateTransitionRouter = importGameStateTransitionRouter();
const events = importEvents();

return PaimaSM.initialize(
poolConfig,
4, // https://xkcd.com/221/
// there is no way of statically generating the event type here, since it's
// imported at runtime.
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
gameStateTransitionRouter,
ENV.START_BLOCKHEIGHT,
precompiles,
events.GameEvents
gameEvents.events
);
};
12 changes: 6 additions & 6 deletions packages/engine/paima-standalone/src/utils/import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import fs from 'fs';
import type { GameStateTransitionFunctionRouter } from '@paima/sm';
import type { TsoaFunction } from '@paima/runtime';
import type { AchievementMetadata } from '@paima/utils-backend';
import type { genEvent } from '@paima/events';
import type { generateAppEvents } from '@paima/events';

/**
* Checks that the user packed their game code and it is available for Paima Engine to use to run
Expand All @@ -25,13 +25,13 @@ function importFile<T>(path: string): T {
}

export interface GameCodeImport {
default: GameStateTransitionFunctionRouter;
default: GameStateTransitionFunctionRouter<any>;
}
const ROUTER_FILENAME = 'packaged/gameCode.cjs';
/**
* Reads repackaged user's code placed next to the executable in `gameCode.cjs` file
*/
export function importGameStateTransitionRouter(): GameStateTransitionFunctionRouter {
export function importGameStateTransitionRouter(): GameStateTransitionFunctionRouter<any> {
return importFile<GameCodeImport>(ROUTER_FILENAME).default;
}

Expand Down Expand Up @@ -70,11 +70,11 @@ export function importPrecompiles(): PreCompilesImport {
}

const EVENTS_FILENAME = 'packaged/events.cjs';
export type UserEvents = { GameEvents: Record<string, ReturnType<typeof genEvent>[]> };
export type AppEventsImport = { events: ReturnType<typeof generateAppEvents> };

/**
* Reads repackaged user's code placed next to the executable in `events.cjs` file
*/
export function importEvents(): UserEvents {
return importFile<UserEvents>(EVENTS_FILENAME);
export function importEvents(): AppEventsImport {
return importFile<AppEventsImport>(EVENTS_FILENAME);
}
4 changes: 3 additions & 1 deletion packages/engine/paima-standalone/src/utils/input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
importOpenApiJson,
importPrecompiles,
importEndpoints,
importEvents,
} from './import.js';
import type { Template } from './types.js';
import RegisterRoutes, { EngineService } from '@paima/rest';
Expand Down Expand Up @@ -141,7 +142,8 @@ export const runPaimaEngine = async (): Promise<void> => {

// Import & initialize state machine
const precompilesImport = importPrecompiles();
const stateMachine = gameSM(precompilesImport);
const eventsImport = importEvents();
const stateMachine = gameSM(precompilesImport, eventsImport);
console.log(`Connecting to database at ${poolConfig.host}:${poolConfig.port}`);
const dbConn = await stateMachine.getReadonlyDbConn().connect();
const funnelFactory = await FunnelFactory.initialize(dbConn);
Expand Down
41 changes: 41 additions & 0 deletions packages/paima-sdk/paima-events/src/app-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import type { LogEvent, LogEventFields } from './types.js';
import { toPath, TopicPrefix } from './types.js';
import type { TSchema } from '@sinclair/typebox';
import { keccak_256 } from 'js-sha3';

export function toSignature<T extends LogEvent<LogEventFields<TSchema>[]>>(event: T): string {
return event.name + '(' + event.fields.map(f => f.type.type).join(',') + ')';
}

export function toTopicHash<T extends LogEvent<LogEventFields<TSchema>[]>>(event: T): string {
return keccak_256(toSignature(event));
}

export const generateAppEvents = <
const T extends ReadonlyArray<LogEvent<LogEventFields<TSchema>[]>>,
>(
entries: T
): {
[K in T[number] as K['name']]: ReturnType<typeof toPath<K, typeof TopicPrefix.App>> &
{ definition: T[0]; topicHash: string }[];
} => {
let result: Record<string, any> = {}; // we can't know the type here
for (const event of entries) {
if (!result[event.name]) {
result[event.name] = [];
}

result[event.name].push({
...toPath(TopicPrefix.App, event),
// keep the original definition around since it's nicer to work with, it
// also has the advantage that it allows recovering the initial order in
// case the signature/topicHash needs to be computed again, which can't be
// done from the path (since you don't know which non indexed fields go in
// between each indexed field).
definition: event,
// we add this to avoid having to re-compute it all the time.
topicHash: toTopicHash(event),
});
}
return result as any;
};
1 change: 1 addition & 0 deletions packages/paima-sdk/paima-events/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './app-events.js';
export * from './event-manager.js';
export * from './builtin-events.js';
export * from './builtin-event-utils.js';
Expand Down
Loading

0 comments on commit 9f913ac

Please sign in to comment.