Skip to content

Commit

Permalink
fix: add processMessagesConcurrently config for processing message co…
Browse files Browse the repository at this point in the history
…ncurrently

Signed-off-by: Pritam Singh <[email protected]>
  • Loading branch information
Zzocker authored and sairanjit committed Sep 9, 2024
1 parent 5ddd37a commit bf29c01
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
23 changes: 12 additions & 11 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { InitConfig } from '../types'
import type { Subscription } from 'rxjs'

import { Subject } from 'rxjs'
import { concatMap, takeUntil } from 'rxjs/operators'
import { mergeMap, takeUntil } from 'rxjs/operators'

import { InjectionSymbols } from '../constants'
import { SigningProviderToken } from '../crypto'
Expand Down Expand Up @@ -152,16 +152,17 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(stop$),
concatMap((e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
receivedAt: e.payload.receivedAt,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
})
mergeMap(
(e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
}),
this.agentConfig.processMessagesConcurrently ? undefined : 1
)
)
.subscribe()
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ export class AgentConfig {
return this.initConfig.backupBeforeStorageUpdate ?? true
}

public get processMessagesConcurrently() {
return this.initConfig.processMessagesConcurrently ?? false
}

public extend(config: Partial<InitConfig>): AgentConfig {
return new AgentConfig(
{ ...this.initConfig, logger: this.logger, label: this.label, ...config },
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export interface InitConfig {
connectionImageUrl?: string
autoUpdateStorageOnStartup?: boolean
backupBeforeStorageUpdate?: boolean
processMessagesConcurrently?: boolean
}

export type ProtocolVersion = `${number}.${number}`
Expand Down

0 comments on commit bf29c01

Please sign in to comment.