Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Range partition assigner #1631

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/consumer/assigners/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const roundRobin = require('./roundRobinAssigner')
const range = require('./rangeAssigner')

module.exports = {
roundRobin,
range,
}
77 changes: 77 additions & 0 deletions src/consumer/assigners/rangeAssigner/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
const { MemberMetadata, MemberAssignment } = require('../../assignerProtocol')

/**
* RangeAssigner
* @type {import('types').PartitionAssigner}
*/
module.exports = ({ cluster }) => ({
name: 'RangeAssigner',
version: 0,

/**
* Assign the topics to the provided members using range strategy.
*
* The members array contains information about each member, `memberMetadata` is the result of the
* `protocol` operation.
*
* @param {object} group
* @param {import('types').GroupMember[]} group.members array of members, e.g:
[{ memberId: 'test-5f93f5a3', memberMetadata: Buffer }]
* @param {string[]} group.topics
* @returns {Promise<import('types').GroupMemberAssignment[]>} object partitions per topic per member
*/
async assign({ members, topics }) {
// logic inspired from kafka java client:
// https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
// https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java

const sortedMembers = members.map(({ memberId }) => memberId).sort()
const membersCount = sortedMembers.length
const assignment = {}

for (const topic of topics) {
const partitionMetadata = cluster.findTopicPartitionMetadata(topic)

const numPartitionsForTopic = partitionMetadata.length
const numPartitionsPerConsumer = Math.floor(numPartitionsForTopic / membersCount)
const consumersWithExtraPartition = numPartitionsForTopic % membersCount

for (var i = 0; i < membersCount; i++) {
const start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition)
const length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1)

const assignee = sortedMembers[i]

for (let partition = start; partition < start + length; partition++) {
if (!assignment[assignee]) {
Copy link
Contributor

@Saeger Saeger Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have 3 levels of for loop which doesn't seem ideal, on the other hand, this is only used once assigning members to the consumers right? So it should be fine I guess. I wanted to ask if you have reasoned about this before as well?

Edit: Well, I noticed the same behaviour from the java version as well :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, same flow in java client. There isn't that much data to loop through and like you said it happens once. I see no reason to optimise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

assignment[assignee] = Object.create(null)
}

if (!assignment[assignee][topic]) {
assignment[assignee][topic] = []
}

assignment[assignee][topic].push(partition)
}
}
}

return Object.keys(assignment).map(memberId => ({
memberId,
memberAssignment: MemberAssignment.encode({
version: this.version,
assignment: assignment[memberId],
}),
}))
},

protocol({ topics }) {
return {
name: this.name,
metadata: MemberMetadata.encode({
version: this.version,
topics,
}),
}
},
})
132 changes: 132 additions & 0 deletions src/consumer/assigners/rangeAssigner/index.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
const RoundRobinAssigner = require('./index')
const { MemberAssignment, MemberMetadata } = require('../../assignerProtocol')

describe('Consumer > assigners > RoundRobinAssigner', () => {
let cluster, topics, metadata, assigner

beforeEach(() => {
metadata = {}
cluster = { findTopicPartitionMetadata: topic => metadata[topic] }
assigner = RoundRobinAssigner({ cluster })
})

describe('#assign', () => {
test('java example topic-partitions assignment', async () => {
topics = ['t0', 't1']
metadata['t0'] = Array(3)
.fill()
.map((_, i) => ({ partitionId: i }))

metadata['t1'] = Array(3)
.fill()
.map((_, i) => ({ partitionId: i }))

const members = [{ memberId: 'C0' }, { memberId: 'C1' }]

const assignment = await assigner.assign({ members, topics })

expect(assignment).toEqual([
{
memberId: 'C0',
memberAssignment: MemberAssignment.encode({
version: assigner.version,
assignment: {
t0: [0, 1],
t1: [0, 1],
},
}),
},
{
memberId: 'C1',
memberAssignment: MemberAssignment.encode({
version: assigner.version,
assignment: {
t0: [2],
t1: [2],
},
}),
},
])
})

test('more complex topic-partitions assignment', async () => {
topics = ['topic-A', 'topic-B', 'topic-C']

metadata['topic-A'] = Array(12)
.fill()
.map((_, i) => ({ partitionId: i }))

metadata['topic-B'] = Array(12)
.fill()
.map((_, i) => ({ partitionId: i }))

metadata['topic-C'] = Array(3)
.fill()
.map((_, i) => ({ partitionId: i }))

const members = [
{ memberId: 'member-3' },
{ memberId: 'member-1' },
{ memberId: 'member-4' },
{ memberId: 'member-2' },
]

const assignment = await assigner.assign({ members, topics })

expect(assignment).toEqual([
{
memberId: 'member-1',
memberAssignment: MemberAssignment.encode({
version: assigner.version,
assignment: {
'topic-A': [0, 1, 2],
'topic-B': [0, 1, 2],
'topic-C': [0],
},
}),
},
{
memberId: 'member-2',
memberAssignment: MemberAssignment.encode({
version: assigner.version,
assignment: {
'topic-A': [3, 4, 5],
'topic-B': [3, 4, 5],
'topic-C': [1],
},
}),
},
{
memberId: 'member-3',
memberAssignment: MemberAssignment.encode({
version: assigner.version,
assignment: {
'topic-A': [6, 7, 8],
'topic-B': [6, 7, 8],
'topic-C': [2],
},
}),
},
{
memberId: 'member-4',
memberAssignment: MemberAssignment.encode({
version: assigner.version,
assignment: {
'topic-A': [9, 10, 11],
'topic-B': [9, 10, 11],
},
}),
},
])
})
})

describe('#protocol', () => {
test('returns the assigner name and metadata', () => {
expect(assigner.protocol({ topics })).toEqual({
name: assigner.name,
metadata: MemberMetadata.encode({ version: assigner.version, topics }),
})
})
})
})
2 changes: 1 addition & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ export type Admin = {
readonly events: AdminEvents
}

export const PartitionAssigners: { roundRobin: PartitionAssigner }
export const PartitionAssigners: { roundRobin: PartitionAssigner, range: PartitionAssigner }

export interface ISerializer<T> {
encode(value: T): Buffer
Expand Down
Loading