Skip to content

Commit

Permalink
Added raw sql projections to support raw SQL handling
Browse files Browse the repository at this point in the history
Generalised also projection definition
  • Loading branch information
oskardudycz committed Aug 7, 2024
1 parent b33be40 commit c76df48
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import {
} from '@event-driven-io/emmett';
import pg from 'pg';
import {
defaultProjectionOptions,
defaultPostgreSQLProjectionOptions,
handleProjections,
type ProjectionDefintion,
type PostgreSQLProjectionDefintion,
} from './projections';
import { appendToStream, createEventStoreSchema, readStream } from './schema';

Expand Down Expand Up @@ -99,12 +99,12 @@ export type PostgresEventStoreConnectionOptions =
| PostgresEventStoreNotPooledOptions;

export type PostgresEventStoreOptions = {
projections: ProjectionDefintion[];
projections: PostgreSQLProjectionDefintion[];
connectionOptions?: PostgresEventStoreConnectionOptions;
};
export const getPostgreSQLEventStore = (
connectionString: string,
options: PostgresEventStoreOptions = defaultProjectionOptions,
options: PostgresEventStoreOptions = defaultPostgreSQLProjectionOptions,
): PostgresEventStore => {
const pool = dumbo({
connectionString,
Expand Down
84 changes: 64 additions & 20 deletions src/packages/emmett-postgresql/src/eventStore/projections/index.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,41 @@
import {
type NodePostgresClient,
type NodePostgresTransaction,
type SQL,
type SQLExecutor,
} from '@event-driven-io/dumbo';
import {
type Event,
type EventTypeOf,
type ProjectionDefintion,
type ProjectionHandler,
type ReadEvent,
} from '@event-driven-io/emmett';
import type { PostgresEventStoreOptions } from '../postgreSQLEventStore';

export type ProjectionHandlerContext = {
export type PostgreSQLProjectionHandlerContext = {
connectionString: string;
client: NodePostgresClient;
execute: SQLExecutor;
transaction: NodePostgresTransaction;
};

export type PostgresProjectionHandler<EventType extends Event = Event> = (
events: ReadEvent<EventType>[],
context: ProjectionHandlerContext,
) => Promise<void> | void;

export type ProjectionDefintion<EventType extends Event = Event> = {
type: 'inline';
name?: string;
canHandle: EventTypeOf<EventType>[];
handle: PostgresProjectionHandler<EventType>;
};
export type PostgreSQLProjectionHandler<EventType extends Event = Event> =
ProjectionHandler<EventType, PostgreSQLProjectionHandlerContext>;

export interface PostgreSQLProjectionDefintion<EventType extends Event = Event>
extends ProjectionDefintion<
'inline',
EventType,
PostgreSQLProjectionHandlerContext
> {}

export const defaultProjectionOptions: PostgresEventStoreOptions = {
export const defaultPostgreSQLProjectionOptions: PostgresEventStoreOptions = {
projections: [],
};

export const handleProjections = async <EventType extends Event = Event>(
allProjections: ProjectionDefintion<EventType>[],
allProjections: PostgreSQLProjectionDefintion<EventType>[],
connectionString: string,
transaction: NodePostgresTransaction,
events: ReadEvent<EventType>[],
Expand All @@ -57,12 +58,55 @@ export const handleProjections = async <EventType extends Event = Event>(
}
};

export const projection = <EventType extends Event>(
definition: ProjectionDefintion<EventType>,
): ProjectionDefintion => definition as unknown as ProjectionDefintion;
export const postgreSQLProjection = <EventType extends Event>(
definition: PostgreSQLProjectionDefintion<EventType>,
): PostgreSQLProjectionDefintion =>
definition as unknown as PostgreSQLProjectionDefintion;

/** @deprecated use postgreSQLProjection instead */
export const projection = postgreSQLProjection;

export const postgreSQLInlineProjection = <EventType extends Event>(
definition: Omit<PostgreSQLProjectionDefintion<EventType>, 'type'>,
): PostgreSQLProjectionDefintion =>
postgreSQLProjection({ type: 'inline', ...definition });

export const inlineProjection = <EventType extends Event>(
definition: Omit<ProjectionDefintion<EventType>, 'type'>,
): ProjectionDefintion => projection({ type: 'inline', ...definition });
/** @deprecated use postgreSQLSingleProjection instead */
export const inlineProjection = postgreSQLInlineProjection;

export const postgreSQLRawBatchSQLProjection = <EventType extends Event>(
handle: (
events: EventType[],
context: PostgreSQLProjectionHandlerContext,
) => Promise<SQL[]> | SQL[],
...canHandle: EventTypeOf<EventType>[]
): PostgreSQLProjectionDefintion =>
postgreSQLInlineProjection<EventType>({
canHandle,
handle: async (events, context) => {
const sqls: SQL[] = await handle(events, context);

await context.execute.batchCommand(sqls);
},
});

export const postgreSQLRawSQLProjection = <EventType extends Event>(
handle: (
event: EventType,
context: PostgreSQLProjectionHandlerContext,
) => Promise<SQL> | SQL,
...canHandle: EventTypeOf<EventType>[]
): PostgreSQLProjectionDefintion =>
postgreSQLRawBatchSQLProjection<EventType>(
async (events, context) => {
const sqls: SQL[] = [];

for (const event of events) {
sqls.push(await handle(event, context));
}
return sqls;
},
...canHandle,
);

export * from './pongo';
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import {
type PongoDocument,
} from '@event-driven-io/pongo';
import pg from 'pg';
import { inlineProjection, type ProjectionDefintion } from './';
import {
postgreSQLInlineProjection,
type PostgreSQLProjectionDefintion,
} from './';

export type PongoProjectionOptions<EventType extends Event> = {
documentId: (event: ReadEvent<EventType>) => string;
Expand Down Expand Up @@ -40,8 +43,8 @@ export type PongoDocumentEvolve<
export const pongoProjection = <EventType extends Event>(
handle: (pongo: PongoClient, events: ReadEvent<EventType>[]) => Promise<void>,
...canHandle: EventTypeOf<EventType>[]
): ProjectionDefintion =>
inlineProjection<EventType>({
): PostgreSQLProjectionDefintion =>
postgreSQLInlineProjection<EventType>({
canHandle,
handle: async (events, context) => {
const { connectionString, client } = context;
Expand All @@ -58,7 +61,7 @@ export const pongoMultiStreamProjection = <
getDocumentId: (event: ReadEvent<EventType>) => string,
evolve: PongoDocumentEvolve<Document, EventType>,
...canHandle: EventTypeOf<EventType>[]
): ProjectionDefintion =>
): PostgreSQLProjectionDefintion =>
pongoProjection(
async (pongo, events) => {
const collection = pongo.db().collection<Document>(collectionName);
Expand All @@ -79,7 +82,7 @@ export const pongoSingleProjection = <
collectionName: string,
evolve: PongoDocumentEvolve<Document, EventType>,
...canHandle: EventTypeOf<EventType>[]
): ProjectionDefintion =>
): PostgreSQLProjectionDefintion =>
pongoMultiStreamProjection(
collectionName,
(event) => event.metadata.streamName,
Expand Down
1 change: 1 addition & 0 deletions src/packages/emmett/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from './commandHandling';
export * from './errors';
export * from './eventStore';
export * from './messageBus';
export * from './projections';
export * from './serialization';
export * from './streaming';
export * from './subscriptions';
Expand Down
20 changes: 20 additions & 0 deletions src/packages/emmett/src/projections/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import type { DefaultRecord, Event, EventTypeOf, ReadEvent } from '../typing';

export type ProjectionHandler<
EventType extends Event = Event,
ProjectionHandlerContext extends DefaultRecord = DefaultRecord,
> = (
events: ReadEvent<EventType>[],
context: ProjectionHandlerContext,
) => Promise<void> | void;

export interface ProjectionDefintion<
ProjectionType extends 'inline' | 'async',
EventType extends Event = Event,
ProjectionHandlerContext extends DefaultRecord = DefaultRecord,
> {
type: ProjectionType;
name?: string;
canHandle: EventTypeOf<EventType>[];
handle: ProjectionHandler<EventType, ProjectionHandlerContext>;
}

0 comments on commit c76df48

Please sign in to comment.