Skip to content

Commit

Permalink
fixup! Put all publish frames in a single huge buffer and send it once
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Sep 18, 2024
1 parent 64fccd0 commit ed11ed7
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions src/amqp-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,15 @@ export class AMQPChannel {
buffer.setUint8(j, 206); j += 1 // frame end byte
buffer.setUint32(headerStart + 3, j - headerStart - 8) // update frameSize

if (buffer.byteLength - j < 8 + body.byteLength) { // the body doesn't fit in the buffer, expand it
const bodyFrameCount = Math.ceil(body.byteLength / (this.connection.frameMax - 8))
const newBuffer = new ArrayBuffer(j + body.byteLength + 8 * bodyFrameCount)
new Uint8Array(newBuffer).set(new Uint8Array(buffer.buffer, 0, j)) // copy the old buffer to the new buffer
let bufferView = new Uint8Array(buffer.buffer)
const bodyFrameCount = Math.ceil(body.byteLength / (this.connection.frameMax - 8))
const bufferSize = j + body.byteLength + 8 * bodyFrameCount
if (buffer.byteLength < bufferSize) { // the body doesn't fit in the buffer, expand it
const newBuffer = new ArrayBuffer(bufferSize)
const newBufferView = new Uint8Array(newBuffer)
newBufferView.set(bufferView.subarray(0, j))
buffer = new AMQPView(newBuffer)
bufferView = newBufferView
}

// split body into multiple frames if body > frameMax
Expand All @@ -353,12 +357,11 @@ export class AMQPChannel {
buffer.setUint8(j, 3); j += 1 // type: body
buffer.setUint16(j, this.id); j += 2 // channel
buffer.setUint32(j, frameSize); j += 4 // frameSize
const bodyView = new Uint8Array(buffer.buffer, j, frameSize)
bodyView.set(dataSlice); j += frameSize // body content
bufferView.set(dataSlice, j); j += frameSize // body content
buffer.setUint8(j, 206); j += 1 // frame end byte
bodyPos += frameSize
}
const sendFrames = this.connection.send(new Uint8Array(buffer.buffer, 0, j))
const sendFrames = this.connection.send(bufferView.subarray(0, j))

this.connection.bufferPool.push(buffer) // return buffer to buffer pool for later reuse
// if publish confirm is enabled, put a promise on a queue if the sends were ok
Expand Down

0 comments on commit ed11ed7

Please sign in to comment.