Skip to content

Commit

Permalink
Merge pull request #1362 from tulios/handle-unassigned-members-1361
Browse files Browse the repository at this point in the history
Remain in the consumer group even when not assigned partitions
  • Loading branch information
Nevon authored May 17, 2022
2 parents 34fac96 + 7985ae8 commit 068067e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
19 changes: 17 additions & 2 deletions src/consumer/__tests__/emptyAssignment.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
createTopic,
newLogger,
waitForConsumerToJoinGroup,
waitForNextEvent,
} = require('testHelpers')

describe('Consumer', () => {
Expand All @@ -25,7 +26,7 @@ describe('Consumer', () => {
consumer2 && (await consumer2.disconnect())
})

test('can join the group without receiving any assignment', async () => {
test('remains in the group without receiving any assignment', async () => {
// Assigns all topic-partitions to the first member.
const UnbalancedAssigner = ({ cluster }) => ({
name: 'UnbalancedAssigner',
Expand Down Expand Up @@ -89,9 +90,23 @@ describe('Consumer', () => {
consumer2.run({ eachMessage: () => {} })

// Ensure that both consumers manage to join
await Promise.all([
const groupJoinEvents = await Promise.all([
waitForConsumerToJoinGroup(consumer1),
waitForConsumerToJoinGroup(consumer2),
])

const emptyAssignments = groupJoinEvents.filter(
({ payload }) => Object.entries(payload.memberAssignment).length === 0
)
expect(emptyAssignments).toHaveLength(1)

await Promise.all(
[consumer1, consumer2].map(consumer => waitForNextEvent(consumer, consumer.events.FETCH))
)

// Both consumers should continue to heartbeat even without receiving any assignments
await Promise.all(
[consumer1, consumer2].map(consumer => waitForNextEvent(consumer, consumer.events.HEARTBEAT))
)
})
})
4 changes: 4 additions & 0 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ module.exports = class Runner extends EventEmitter {
nodeId,
})

if (batches.length === 0) {
await this.heartbeat()
}

return batches
}

Expand Down

0 comments on commit 068067e

Please sign in to comment.