diff --git a/src/consumer/assigners/index.js b/src/consumer/assigners/index.js index 170910678..b565d6516 100644 --- a/src/consumer/assigners/index.js +++ b/src/consumer/assigners/index.js @@ -1,5 +1,7 @@ const roundRobin = require('./roundRobinAssigner') +const range = require('./rangeAssigner') module.exports = { roundRobin, + range, } diff --git a/src/consumer/assigners/rangeAssigner/index.js b/src/consumer/assigners/rangeAssigner/index.js new file mode 100644 index 000000000..3e8e32291 --- /dev/null +++ b/src/consumer/assigners/rangeAssigner/index.js @@ -0,0 +1,77 @@ +const { MemberMetadata, MemberAssignment } = require('../../assignerProtocol') + +/** + * RangeAssigner + * @type {import('types').PartitionAssigner} + */ +module.exports = ({ cluster }) => ({ + name: 'RangeAssigner', + version: 0, + + /** + * Assign the topics to the provided members using range strategy. + * + * The members array contains information about each member, `memberMetadata` is the result of the + * `protocol` operation. + * + * @param {object} group + * @param {import('types').GroupMember[]} group.members array of members, e.g: + [{ memberId: 'test-5f93f5a3', memberMetadata: Buffer }] + * @param {string[]} group.topics + * @returns {Promise} object partitions per topic per member + */ + async assign({ members, topics }) { + // logic inspired from kafka java client: + // https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html + // https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java + + const sortedMembers = members.map(({ memberId }) => memberId).sort() + const membersCount = sortedMembers.length + const assignment = {} + + for (const topic of topics) { + const partitionMetadata = cluster.findTopicPartitionMetadata(topic) + + const numPartitionsForTopic = partitionMetadata.length + const numPartitionsPerConsumer = Math.floor(numPartitionsForTopic / membersCount) + const consumersWithExtraPartition = numPartitionsForTopic % membersCount + + for (var i = 0; i < membersCount; i++) { + const start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition) + const length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1) + + const assignee = sortedMembers[i] + + for (let partition = start; partition < start + length; partition++) { + if (!assignment[assignee]) { + assignment[assignee] = Object.create(null) + } + + if (!assignment[assignee][topic]) { + assignment[assignee][topic] = [] + } + + assignment[assignee][topic].push(partition) + } + } + } + + return Object.keys(assignment).map(memberId => ({ + memberId, + memberAssignment: MemberAssignment.encode({ + version: this.version, + assignment: assignment[memberId], + }), + })) + }, + + protocol({ topics }) { + return { + name: this.name, + metadata: MemberMetadata.encode({ + version: this.version, + topics, + }), + } + }, +}) diff --git a/src/consumer/assigners/rangeAssigner/index.spec.js b/src/consumer/assigners/rangeAssigner/index.spec.js new file mode 100644 index 000000000..b76fb68ae --- /dev/null +++ b/src/consumer/assigners/rangeAssigner/index.spec.js @@ -0,0 +1,132 @@ +const RoundRobinAssigner = require('./index') +const { MemberAssignment, MemberMetadata } = require('../../assignerProtocol') + +describe('Consumer > assigners > RoundRobinAssigner', () => { + let cluster, topics, metadata, assigner + + beforeEach(() => { + metadata = {} + cluster = { findTopicPartitionMetadata: topic => metadata[topic] } + assigner = RoundRobinAssigner({ cluster }) + }) + + describe('#assign', () => { + test('java example topic-partitions assignment', async () => { + topics = ['t0', 't1'] + metadata['t0'] = Array(3) + .fill() + .map((_, i) => ({ partitionId: i })) + + metadata['t1'] = Array(3) + .fill() + .map((_, i) => ({ partitionId: i })) + + const members = [{ memberId: 'C0' }, { memberId: 'C1' }] + + const assignment = await assigner.assign({ members, topics }) + + expect(assignment).toEqual([ + { + memberId: 'C0', + memberAssignment: MemberAssignment.encode({ + version: assigner.version, + assignment: { + t0: [0, 1], + t1: [0, 1], + }, + }), + }, + { + memberId: 'C1', + memberAssignment: MemberAssignment.encode({ + version: assigner.version, + assignment: { + t0: [2], + t1: [2], + }, + }), + }, + ]) + }) + + test('more complex topic-partitions assignment', async () => { + topics = ['topic-A', 'topic-B', 'topic-C'] + + metadata['topic-A'] = Array(12) + .fill() + .map((_, i) => ({ partitionId: i })) + + metadata['topic-B'] = Array(12) + .fill() + .map((_, i) => ({ partitionId: i })) + + metadata['topic-C'] = Array(3) + .fill() + .map((_, i) => ({ partitionId: i })) + + const members = [ + { memberId: 'member-3' }, + { memberId: 'member-1' }, + { memberId: 'member-4' }, + { memberId: 'member-2' }, + ] + + const assignment = await assigner.assign({ members, topics }) + + expect(assignment).toEqual([ + { + memberId: 'member-1', + memberAssignment: MemberAssignment.encode({ + version: assigner.version, + assignment: { + 'topic-A': [0, 1, 2], + 'topic-B': [0, 1, 2], + 'topic-C': [0], + }, + }), + }, + { + memberId: 'member-2', + memberAssignment: MemberAssignment.encode({ + version: assigner.version, + assignment: { + 'topic-A': [3, 4, 5], + 'topic-B': [3, 4, 5], + 'topic-C': [1], + }, + }), + }, + { + memberId: 'member-3', + memberAssignment: MemberAssignment.encode({ + version: assigner.version, + assignment: { + 'topic-A': [6, 7, 8], + 'topic-B': [6, 7, 8], + 'topic-C': [2], + }, + }), + }, + { + memberId: 'member-4', + memberAssignment: MemberAssignment.encode({ + version: assigner.version, + assignment: { + 'topic-A': [9, 10, 11], + 'topic-B': [9, 10, 11], + }, + }), + }, + ]) + }) + }) + + describe('#protocol', () => { + test('returns the assigner name and metadata', () => { + expect(assigner.protocol({ topics })).toEqual({ + name: assigner.name, + metadata: MemberMetadata.encode({ version: assigner.version, topics }), + }) + }) + }) +}) diff --git a/types/index.d.ts b/types/index.d.ts index 26bcbfea3..e6d31c58f 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -577,7 +577,7 @@ export type Admin = { readonly events: AdminEvents } -export const PartitionAssigners: { roundRobin: PartitionAssigner } +export const PartitionAssigners: { roundRobin: PartitionAssigner, range: PartitionAssigner } export interface ISerializer { encode(value: T): Buffer