Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/package batch queue #324

Merged
merged 90 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
be29738
feat: add base class BatchQueue
OsirisAnubiz Oct 3, 2024
4c7bc1c
feat: add MemoryChecker
OsirisAnubiz Oct 3, 2024
d0281c3
fix: correct add method logic
OsirisAnubiz Oct 3, 2024
cec1aed
refactor: renamed error to be more descriptive
OsirisAnubiz Oct 3, 2024
da995bd
fix: trigger methods in BatchQueue
OsirisAnubiz Oct 3, 2024
6d06b2e
style: fix trigger lint
OsirisAnubiz Oct 3, 2024
8b2755c
test(BatchQueue): add unit tests for BatchQueue class
OsirisAnubiz Oct 3, 2024
1f20e6f
feat: add base BatchQueueModule
OsirisAnubiz Oct 3, 2024
5342f3e
feat: add createCheckOnAdd method to BatchStore
OsirisAnubiz Oct 4, 2024
0486bfc
test: add tests to createCheckOnAdd
OsirisAnubiz Oct 4, 2024
5d6881c
style: batch queue and test batch queue
OsirisAnubiz Oct 4, 2024
808ac7a
chore: remove useless BatchQueueI
OsirisAnubiz Oct 4, 2024
32fd440
fix: resolve issue BatchQueueModule not working
OsirisAnubiz Oct 4, 2024
cab8694
fix: MemoryChecker use BatchQueue instead BatchQueueI
OsirisAnubiz Oct 4, 2024
d4c80d9
feat: add consumer
OsirisAnubiz Oct 4, 2024
efea6d0
fix: add index.ts for module
OsirisAnubiz Oct 4, 2024
6341670
fix: export module by main index.ts
OsirisAnubiz Oct 4, 2024
98fe10d
feat: add producer
OsirisAnubiz Oct 4, 2024
94d3515
fix: export producer
OsirisAnubiz Oct 4, 2024
124eb11
test: init integration test
OsirisAnubiz Oct 4, 2024
51edc76
style: BatchQueueModue providers
OsirisAnubiz Oct 4, 2024
86639c2
feat: add BatchChecker
OsirisAnubiz Oct 6, 2024
c398b7f
feat: add StateHandler
OsirisAnubiz Oct 6, 2024
f8cd891
refactor: extract Inject tokens for BatchQueue into a separate file
OsirisAnubiz Oct 6, 2024
2c0177f
refactor: inject checker into MemoryChecker
OsirisAnubiz Oct 6, 2024
7d53aa3
fix(BatchQueue): add mutexes for each queue to prevent overflow
OsirisAnubiz Oct 7, 2024
15b770e
feat: export errors from package
OsirisAnubiz Oct 7, 2024
5406f61
style: lint class BatchQueue
OsirisAnubiz Oct 7, 2024
f8fa06c
fix: type QueueName only can be string
OsirisAnubiz Oct 7, 2024
cea522f
style: lint Mutex
OsirisAnubiz Oct 7, 2024
fe78474
feat: add BatchQueue integration tests
OsirisAnubiz Oct 7, 2024
cd42e16
fix: integration tests
OsirisAnubiz Oct 8, 2024
cdb6eb8
fix: delete queue after processing
OsirisAnubiz Oct 8, 2024
b73a311
feat: add method createCheckOnAdd to Checker
OsirisAnubiz Oct 8, 2024
73e3b9b
test: checkOnAdd integration tests
OsirisAnubiz Oct 8, 2024
c943897
fix: delete mutex after processing
OsirisAnubiz Oct 8, 2024
7d0942b
feat: add memory check on each defined number of additions
OsirisAnubiz Oct 8, 2024
187fd78
style: fix lint
OsirisAnubiz Oct 8, 2024
2766d93
fix: add MemoryCheckerOptions provider
OsirisAnubiz Oct 8, 2024
33e9d8e
chore: update yarn
OsirisAnubiz Oct 8, 2024
c737127
fix: batch-queue integration tests
OsirisAnubiz Oct 8, 2024
b20ffb7
style: add Inject.. before injection decorator
OsirisAnubiz Oct 8, 2024
ebe1b5d
style: batch queue provide constant to constants
OsirisAnubiz Oct 8, 2024
ba1459e
refactor: extracted provide constants into a separate file
OsirisAnubiz Oct 8, 2024
f61882e
refactor: batch module register method
OsirisAnubiz Oct 8, 2024
546ccc1
refactor: move providers creation to separate file
OsirisAnubiz Oct 8, 2024
7a94baa
style: snake case memory checker job name
OsirisAnubiz Oct 8, 2024
190c182
refactor: readonly for inject checker class
OsirisAnubiz Oct 8, 2024
93c0671
fix: one check name for on add and func check
OsirisAnubiz Oct 8, 2024
1230fae
fix: method checkOnAddChecks in batch queue
OsirisAnubiz Oct 8, 2024
bab6109
fix: batch queue unit test
OsirisAnubiz Oct 8, 2024
236c771
refactor: delete memory checker
OsirisAnubiz Oct 8, 2024
f91e6f6
test: delete setTimeout from integration tests
OsirisAnubiz Oct 8, 2024
3777ad2
refactor: move mutex file top level
OsirisAnubiz Oct 8, 2024
bf00459
refactor: moved errors from folder to a sigle file
OsirisAnubiz Oct 8, 2024
7c7add8
style: fix lint issue
OsirisAnubiz Oct 9, 2024
dc1a7ca
chore: delete not use dependencies
OsirisAnubiz Oct 9, 2024
d4623ce
refactor: dev dependencies fix versions
OsirisAnubiz Oct 9, 2024
578b98e
refactor: brought out helpers in integration tests
OsirisAnubiz Oct 9, 2024
8589e00
refactor: moved constants to root directory
OsirisAnubiz Oct 9, 2024
08533b9
refactor: moved errors to root directory
OsirisAnubiz Oct 9, 2024
ccb8f32
refactor: separate types
OsirisAnubiz Oct 9, 2024
cae5213
refactor: unlock mutex with finally
OsirisAnubiz Oct 10, 2024
ccc9e17
fix: start batch timer only if not started before
OsirisAnubiz Oct 10, 2024
928602f
docs: add JSDoc to BatchQueue
OsirisAnubiz Oct 10, 2024
7ab7f94
docs: add JSDoc to Checker
OsirisAnubiz Oct 10, 2024
790cde6
docs: add JSDoc to Consumer
OsirisAnubiz Oct 10, 2024
537ec28
docs: add JSDoc to Producer
OsirisAnubiz Oct 10, 2024
c7d4614
docs: add JSDoc to StateHandler
OsirisAnubiz Oct 10, 2024
b058659
fix: increase batch wait time in tests
OsirisAnubiz Oct 10, 2024
4d74008
feat: create CheckManager
OsirisAnubiz Oct 10, 2024
9c91543
refactor: use CheckManager in BatchQueue
OsirisAnubiz Oct 10, 2024
1f621e0
refactor: add CheckManager types
OsirisAnubiz Oct 10, 2024
f1af644
test: add unit tests to CheckManager
OsirisAnubiz Oct 10, 2024
34ff346
test: rewrite BatchQueue unit tests
OsirisAnubiz Oct 10, 2024
f290d68
refactor: checker depend on check manager
OsirisAnubiz Oct 10, 2024
0fec374
refactor: state handler depend on check manager
OsirisAnubiz Oct 10, 2024
3b15f8b
refactor: add check manager provider
OsirisAnubiz Oct 10, 2024
ccd5a83
refactor: check failed error delete property failedChecks
OsirisAnubiz Oct 10, 2024
690f247
refactor: separate checker manager
OsirisAnubiz Oct 10, 2024
86967d7
refactor: separate check manager types
OsirisAnubiz Oct 10, 2024
f468aef
refactor: extract proxy clases
OsirisAnubiz Oct 10, 2024
cf70bf0
refactor: export proxy clases from package
OsirisAnubiz Oct 10, 2024
9382264
refactor: delete handleChangeState method from Checker
OsirisAnubiz Oct 10, 2024
386fbc7
refactor: export check manager from package
OsirisAnubiz Oct 10, 2024
9727ef0
refactor: integration tests
OsirisAnubiz Oct 10, 2024
03dac98
Merge branch 'master' into feat/package-batch-queue
OsirisAnubiz Oct 10, 2024
0082642
docs: add JSDoc to CheckManager
OsirisAnubiz Oct 10, 2024
43be19b
docs: add JSDoc to Checker
OsirisAnubiz Oct 10, 2024
214eab0
docs: add JSDoc to StateHandler
OsirisAnubiz Oct 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .config/husky/commit-msg
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

yarn commit message lint
yarn commit message lint
5 changes: 1 addition & 4 deletions .config/husky/pre-commit
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

yarn commit staged
yarn commit staged
5 changes: 1 addition & 4 deletions .config/husky/prepare-commit-msg
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

yarn commit message $@
yarn commit message $@
439 changes: 439 additions & 0 deletions .pnp.cjs

Large diffs are not rendered by default.

1,022 changes: 487 additions & 535 deletions .yarn/releases/yarn.cjs

Large diffs are not rendered by default.

298 changes: 298 additions & 0 deletions packages/nestjs-batch-queue/integration/test/batch-queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
import type { INestApplication } from '@nestjs/common'
import type { ChannelWrapper } from 'amqp-connection-manager'
import type { Channel } from 'amqplib'
import type { StartedTestContainer } from 'testcontainers'

import type { Producer } from '../../src/index.js'
import type { Consumer } from '../../src/index.js'
import type { Checker } from '../../src/index.js'
import type { StateHandler } from '../../src/index.js'
import type { ChangeStateCallback } from '../../src/index.js'

import { Test } from '@nestjs/testing'
import { describe } from '@jest/globals'
import { it } from '@jest/globals'
import { expect } from '@jest/globals'
import { beforeAll } from '@jest/globals'
import { afterAll } from '@jest/globals'
import { beforeEach } from '@jest/globals'
import { jest } from '@jest/globals'
import { GenericContainer } from 'testcontainers'
import { Wait } from 'testcontainers'
import amqp from 'amqp-connection-manager'

import { BatchQueueModule } from '../../src/index.js'
import { BATCH_QUEUE_CONSUMER } from '../../src/index.js'
import { BATCH_QUEUE_PRODUCER } from '../../src/index.js'
import { BATCH_QUEUE_CHECKER } from '../../src/index.js'
import { BATCH_QUEUE_STATE_HANDLER } from '../../src/index.js'
import { BaseQueueError } from '../../src/index.js'
import { waitForConsumeCount } from './helpers/index.js'

describe('external renderer', () => {
let app: INestApplication
let rabbitmq: StartedTestContainer
let channelWrapper: ChannelWrapper
let consumeBatchs: Array<[string, Array<string>]> = []
let consumeFn: (queueName: string, value: Array<string>) => Promise<void>
let succesProduceCount = 0

beforeAll(async () => {
rabbitmq = await new GenericContainer('rabbitmq:3-alpine')
.withWaitStrategy(Wait.forLogMessage('Starting broker'))
.withExposedPorts(5672)
.start()

const testingModule = await Test.createTestingModule({
imports: [
BatchQueueModule.registerAsync({
imports: [],
useFactory: () => ({
core: {
maxQueueLength: 10_000,
maxTotalQueueLength: 100_000,
maxQueues: 20,
timeoutDuration: 2_000,
},
}),
inject: [],
}),
],
}).compile()

const connection = amqp.connect([
`amqp://${rabbitmq.getHost()}:${rabbitmq.getMappedPort(5672)}`,
])
channelWrapper = connection.createChannel({
json: false,
confirm: false,
setup: async (channel: Channel) => {
await channel.assertQueue('test-queue', {
durable: true,
})
},
})

channelWrapper.consume('test-queue', (msg) => {
;(async (): Promise<void> => {
const producer: Producer<any> = app.get(BATCH_QUEUE_PRODUCER)
const parsed: { queueName: string; value: any } = JSON.parse(msg.content.toString())
try {
await producer.produce(parsed.queueName, parsed.value)
succesProduceCount += 1
channelWrapper.ack(msg)
} catch (e) {
if (e instanceof BaseQueueError) {
channelWrapper.nack(msg)
}
}
})()
})

await channelWrapper.waitForConnect()

app = testingModule.createNestApplication()
await app.init()

const batchConsumer: Consumer = app.get(BATCH_QUEUE_CONSUMER)
consumeFn = async (queueName: string, value: Array<string>): Promise<void> => {
consumeBatchs.push([queueName, value])
}
batchConsumer.consume(consumeFn)
})

afterAll(async () => {
await app.close()
await rabbitmq.stop()
})

beforeEach(async () => {
await channelWrapper.purgeQueue('test-queue')
consumeBatchs = []
succesProduceCount = 0
})

it('base test', async () => {
await channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: 'test-0-0' }))
)
await waitForConsumeCount(1, consumeBatchs)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
expect(result[1]).toEqual(['test-0-0'])
})

it('fill 90% queue', async () => {
const messages = []
for (let i = 0; i < 9_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-1-${i}` }))
)
)
}
await Promise.all(messages)
await waitForConsumeCount(1, consumeBatchs)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
const expectMessages = []
for (let i = 0; i < 9_000; i += 1) {
expectMessages.push(`test-1-${i}`)
}
expect(result[1]).toEqual(expectMessages)
})

it('fullfill queue', async () => {
const messages = []
for (let i = 0; i < 10_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-2-${i}` }))
)
)
}
await Promise.all(messages)
await waitForConsumeCount(1, consumeBatchs)
expect(consumeBatchs.length).toBe(1)
const result = consumeBatchs.pop()!
expect(result[0]).toBe('batch-queue')
const expectMessages = []
for (let i = 0; i < 10_000; i += 1) {
expectMessages.push(`test-2-${i}`)
}
expect(result[1]).toEqual(expectMessages)
})

it('handle multiple queues', async () => {
const messages: Array<Promise<any>> = []
const queues: Array<string> = ['queue-one', 'queue-two', 'queue-three']
const expectedResults: Record<string, Array<string>> = {
'queue-one': [],
'queue-two': [],
'queue-three': [],
}

for (let i = 0; i < 3_000; i += 1) {
// eslint-disable-next-line no-loop-func
queues.forEach((queue: string) => {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: queue, value: `test-3-${i}` }))
)
)
expectedResults[queue].push(`test-3-${i}`)
})
}

await Promise.all(messages)

await waitForConsumeCount(3, consumeBatchs)
expect(consumeBatchs.length).toBe(3)
consumeBatchs.forEach((result) => {
expect(result[1].length).toBe(3_000)
expect(result[1]).toEqual(expectedResults[result[0]])
})
})

it('fulfill a single queue with total of 12,000 messages, receiving batches of 10,000 and 2,000', async () => {
const messages = []

for (let i = 0; i < 12_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-4-${i}` }))
)
)
}
await Promise.all(messages)

await waitForConsumeCount(2, consumeBatchs)

expect(consumeBatchs.length).toBe(2)
expect(consumeBatchs[0][1].length).toBe(10_000)
expect(consumeBatchs[1][1].length).toBe(2_000)
})

it('two queues with 12,000 messages each', async () => {
const messages = []

for (let i = 0; i < 24_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(
JSON.stringify({
queueName: i % 2 === 0 ? 'queue-one' : 'queue-two',
value: `test-5-${i}`,
})
)
)
)
}
await Promise.all(messages)

await waitForConsumeCount(4, consumeBatchs)

expect(consumeBatchs.length).toBe(4)
expect(consumeBatchs[0][1].length).toBe(10_000)
expect(consumeBatchs[1][1].length).toBe(10_000)
expect(consumeBatchs[2][1].length).toBe(2_000)
expect(consumeBatchs[3][1].length).toBe(2_000)
})

it('should not consume batches when batch queue is unavailable', async () => {
const checker: Checker = app.get(BATCH_QUEUE_CHECKER)
checker.createCheck('mock-memory-1', false)
const stateHandler: StateHandler = app.get(BATCH_QUEUE_STATE_HANDLER)
const fnChangeState = jest.fn() as ChangeStateCallback
stateHandler.handleChangeState('mock-memory-1', fnChangeState)
const messages = []
for (let i = 0; i < 10_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-6-${i}` }))
)
)
}
await Promise.all(messages)
expect(succesProduceCount).toBe(0)
expect(fnChangeState).toBeCalledTimes(0)
await checker.changeState('mock-memory-1', true)
await waitForConsumeCount(1, consumeBatchs)
})

it('should not consume batches when batch queue is unavailable and then recover', async () => {
const checker: Checker = app.get(BATCH_QUEUE_CHECKER)
const stateHandler: StateHandler = app.get(BATCH_QUEUE_STATE_HANDLER)
const fnChangeState = jest.fn() as ChangeStateCallback
checker.createCheck('mock-memory-2', false)
stateHandler.handleChangeState('mock-memory-2', fnChangeState)
const messages = []
for (let i = 0; i < 12_000; i += 1) {
messages.push(
channelWrapper.sendToQueue(
'test-queue',
Buffer.from(JSON.stringify({ queueName: 'batch-queue', value: `test-${i}` }))
)
)
}
expect(succesProduceCount).toBe(0)
await Promise.all(messages)
expect(consumeBatchs.length).toBe(0)
await checker.changeState('mock-memory-2', true)
expect(fnChangeState).toBeCalledTimes(1)
expect(fnChangeState).toBeCalledWith(true)
await waitForConsumeCount(2, consumeBatchs)
expect(consumeBatchs.length).toBe(2)
expect(consumeBatchs[0][1].length).toBe(10_000)
expect(consumeBatchs[1][1].length).toBe(2_000)
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './wait-for-consume-count.js'
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export const waitForConsumeCount = async (
expectedCount: number,
consumeBatchs: Array<any>,
timeout = 5000
): Promise<void> => {
const endTime = Date.now() + timeout
return new Promise((resolve, reject) => {
const interval = setInterval(() => {
if (consumeBatchs.length >= expectedCount) {
clearInterval(interval)
resolve()
} else if (Date.now() > endTime) {
clearInterval(interval)
reject(new Error('Timeout waiting for messages to be processed'))
}
}, 100)
})
}
49 changes: 49 additions & 0 deletions packages/nestjs-batch-queue/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"name": "@atls/nestjs-batch-queue",
"version": "0.0.1",
"license": "BSD-3-Clause",
"type": "module",
"exports": {
"./package.json": "./package.json",
".": "./src/index.ts"
},
"main": "src/index.ts",
"files": [
"dist"
],
"scripts": {
"build": "yarn library build",
"prepack": "yarn run build",
"postpack": "rm -rf dist"
},
"devDependencies": {
"@jest/globals": "29.7.0",
Nelfimov marked this conversation as resolved.
Show resolved Hide resolved
"@nestjs/common": "10.0.5",
"@nestjs/core": "10.0.5",
"@nestjs/testing": "10.4.1",
"@types/amqplib": "0.10.1",
"amqp-connection-manager": "4.1.14",
"amqplib": "0.10.4",
"reflect-metadata": "0.1.13",
"rxjs": "7.8.1",
"testcontainers": "10.10.0"
},
"peerDependencies": {
"@nestjs/common": "^10",
"@nestjs/core": "^10",
"reflect-metadata": "^0.1",
"rxjs": "^7"
},
"publishConfig": {
"exports": {
"./package.json": "./package.json",
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"main": "dist/index.js",
"typings": "dist/index.d.ts"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface BatchQueueOptions {
maxQueueLength: number
maxTotalQueueLength: number
maxQueues: number
timeoutDuration: number
}
Loading
Loading