Skip to content

Commit

Permalink
Always buffer all publish framed together
Browse files Browse the repository at this point in the history
Even if the user doesn't await basicPublish all framed beloging to one
publish should be published together. So intead of waiting for a
potentially blocked socket to be drain enqueue all data and only await
for the last sent frame.

Fixes #49
  • Loading branch information
carlhoerberg committed Sep 16, 2024
1 parent 3dbd42d commit 55e9101
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions src/amqp-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,13 @@ export class AMQPChannel {
buffer.setUint8(j, 206); j += 1 // frame end byte
buffer.setUint32(headerStart + 3, j - headerStart - 8) // update frameSize

let lastFrame
// Send current frames if there's no body to send
if (body.byteLength === 0) {
await this.connection.send(new Uint8Array(buffer.buffer, 0, j))
lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j))
} else if (j >= buffer.byteLength - 8) {
// Send current frames if a body frame can't fit in the rest of the frame buffer
await this.connection.send(new Uint8Array(buffer.buffer, 0, j))
lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j))
j = 0
}

Expand All @@ -358,10 +359,11 @@ export class AMQPChannel {
const bodyView = new Uint8Array(buffer.buffer, j, frameSize)
bodyView.set(dataSlice); j += frameSize // body content
buffer.setUint8(j, 206); j += 1 // frame end byte
await this.connection.send(new Uint8Array(buffer.buffer, 0, j))
lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j))
bodyPos += frameSize
j = 0
}
await lastFrame // buffer all frames and only wait for the last as RabbitMQ requires all publish frames to be sent together
this.connection.bufferPool.push(buffer) // return buffer to buffer pool for later reuse
// if publish confirm is enabled, put a promise on a queue if the sends were ok
// the promise on the queue will be fullfilled by the read loop when an ack/nack
Expand Down

0 comments on commit 55e9101

Please sign in to comment.