From 57bd709cde69d415e246cbf87be346e612be74f6 Mon Sep 17 00:00:00 2001 From: Sai Ranjit Tummalapalli Date: Fri, 13 Sep 2024 12:13:09 +0530 Subject: [PATCH] refactor: inbound transport to use AgentMessageReceivedEvent Signed-off-by: Sai Ranjit Tummalapalli --- packages/core/src/agent/Agent.ts | 1 + packages/core/src/agent/Events.ts | 2 ++ .../src/transport/HttpInboundTransport.ts | 20 ++++++++++++---- .../node/src/transport/WsInboundTransport.ts | 24 ++++++++++++++----- 4 files changed, 36 insertions(+), 11 deletions(-) diff --git a/packages/core/src/agent/Agent.ts b/packages/core/src/agent/Agent.ts index 521528a00d..6445c592ff 100644 --- a/packages/core/src/agent/Agent.ts +++ b/packages/core/src/agent/Agent.ts @@ -158,6 +158,7 @@ export class Agent extends BaseAge .receiveMessage(e.payload.message, { connection: e.payload.connection, contextCorrelationId: e.payload.contextCorrelationId, + session: e.payload.session, }) .catch((error) => { this.logger.error('Failed to process message', { error }) diff --git a/packages/core/src/agent/Events.ts b/packages/core/src/agent/Events.ts index 8a889a237c..e10fbfe09a 100644 --- a/packages/core/src/agent/Events.ts +++ b/packages/core/src/agent/Events.ts @@ -1,4 +1,5 @@ import type { AgentMessage } from './AgentMessage' +import type { TransportSession } from './TransportService' import type { OutboundMessageContext, OutboundMessageSendStatus } from './models' import type { ConnectionRecord } from '../modules/connections' import type { Observable } from 'rxjs' @@ -34,6 +35,7 @@ export interface AgentMessageReceivedEvent extends BaseEvent { connection?: ConnectionRecord contextCorrelationId?: string receivedAt?: Date + session?: TransportSession } } diff --git a/packages/node/src/transport/HttpInboundTransport.ts b/packages/node/src/transport/HttpInboundTransport.ts index ac86d80cec..6cddf89609 100644 --- a/packages/node/src/transport/HttpInboundTransport.ts +++ b/packages/node/src/transport/HttpInboundTransport.ts @@ -1,8 +1,15 @@ -import type { InboundTransport, Agent, TransportSession, EncryptedMessage, AgentContext } from '@credo-ts/core' +import type { + InboundTransport, + Agent, + TransportSession, + EncryptedMessage, + AgentContext, + AgentMessageReceivedEvent, +} from '@credo-ts/core' import type { Express, Request, Response } from 'express' import type { Server } from 'http' -import { DidCommMimeType, CredoError, TransportService, utils, MessageReceiver } from '@credo-ts/core' +import { DidCommMimeType, CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core' import express, { text } from 'express' const supportedContentTypes: string[] = [DidCommMimeType.V0, DidCommMimeType.V1] @@ -29,7 +36,6 @@ export class HttpInboundTransport implements InboundTransport { public async start(agent: Agent) { const transportService = agent.dependencyManager.resolve(TransportService) - const messageReceiver = agent.dependencyManager.resolve(MessageReceiver) agent.config.logger.debug(`Starting HTTP inbound transport`, { port: this.port, @@ -52,8 +58,12 @@ export class HttpInboundTransport implements InboundTransport { try { const message = req.body const encryptedMessage = JSON.parse(message) - await messageReceiver.receiveMessage(encryptedMessage, { - session, + agent.events.emit(agent.context, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: encryptedMessage, + session: session, + }, }) // If agent did not use session when processing message we need to send response here. diff --git a/packages/node/src/transport/WsInboundTransport.ts b/packages/node/src/transport/WsInboundTransport.ts index ff23807fed..16d6a8cc4f 100644 --- a/packages/node/src/transport/WsInboundTransport.ts +++ b/packages/node/src/transport/WsInboundTransport.ts @@ -1,6 +1,14 @@ -import type { Agent, InboundTransport, Logger, TransportSession, EncryptedMessage, AgentContext } from '@credo-ts/core' - -import { CredoError, TransportService, utils, MessageReceiver } from '@credo-ts/core' +import type { + Agent, + InboundTransport, + Logger, + TransportSession, + EncryptedMessage, + AgentContext, + AgentMessageReceivedEvent, +} from '@credo-ts/core' + +import { CredoError, TransportService, utils, AgentEventTypes } from '@credo-ts/core' // eslint-disable-next-line import/no-named-as-default import WebSocket, { Server } from 'ws' @@ -58,13 +66,17 @@ export class WsInboundTransport implements InboundTransport { } private listenOnWebSocketMessages(agent: Agent, socket: WebSocket, session: TransportSession) { - const messageReceiver = agent.dependencyManager.resolve(MessageReceiver) - // eslint-disable-next-line @typescript-eslint/no-explicit-any socket.addEventListener('message', async (event: any) => { this.logger.debug('WebSocket message event received.', { url: event.target.url }) try { - await messageReceiver.receiveMessage(JSON.parse(event.data), { session }) + agent.events.emit(agent.context, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: JSON.parse(event.data), + session: session, + }, + }) } catch (error) { this.logger.error(`Error processing message: ${error}`) }