Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransformStream implementation stopped working with Bun 1.1.30 #14718

Open
mohas opened this issue Oct 21, 2024 · 4 comments
Open

TransformStream implementation stopped working with Bun 1.1.30 #14718

mohas opened this issue Oct 21, 2024 · 4 comments
Labels

Comments

@mohas
Copy link

mohas commented Oct 21, 2024

What version of Bun is running?

1.1.31

What platform is your computer?

Microsoft Windows NT 10.0.19045.0 x64

What steps can reproduce the bug?

I have a Brotli transform stream:

import { createBrotliCompress, createBrotliDecompress } from 'node:zlib'
export class BrotliTransformer extends TransformStream {
  constructor() {
    const brotli = createBrotliCompress()
    const _transformer: Transformer = {
      start(controller) {
        brotli.on('data', chunk => {
          controller.enqueue(chunk)
        })
        brotli.on('end', () => {
          controller.terminate()
        })
      },
      transform(chunk) {
        brotli.write(chunk)
      },
      flush() {
        brotli.end()
      },
    }
    super(_transformer)
  }
}
export class BrotliDecompressTransformer extends TransformStream {
  constructor() {
    const brotli = createBrotliDecompress()
    const _transformer: Transformer = {
      start(controller) {
        brotli.on('data', chunk => {
          controller.enqueue(chunk)
        })
        brotli.on('end', () => {
          controller.terminate()
        })
      },
      transform(chunk, controller) {
        brotli.write(chunk)
      },
      flush(controller) {
        brotli.end()
      },
    }
    super(_transformer)
  }
}

this is a unit test for this transformer:

import { describe, expect, it } from 'bun:test'
import { BrotliDecompressTransformer, BrotliTransformer } from './index'

class collectionStream extends WritableStream<Uint8Array> {
  constructor(outputBuffer: number[]) {
    super({
      write(chunk) {
        outputBuffer.splice(outputBuffer.length - 1, 0, ...Array.from(chunk))
      },
    })
  }
}

describe('BrotliTransformers', () => {
  it('should compress and decompress data correctly', async () => {
    const inputString =
      'psam quia adipisci et sequi fuga rerum maiores. Fugit corrupti tempore odio ut fugiat sint id quia. Nostrum perferendis alias iste quia et consequatur. Dolor consequatur perspiciatis voluptates veritatis ipsam at et.'
    const inputBuffer = new TextEncoder().encode(inputString)
    let outputBuffer: number[] = []

    const compressedStream = new ReadableStream<Uint8Array>({
      start(controller) {
        controller.enqueue(inputBuffer)
        controller.close()
      },
    })

    await compressedStream
      .pipeThrough(new BrotliTransformer())
      .pipeTo(new collectionStream(outputBuffer))

    // ensure that compression was effective
    expect(outputBuffer.length).not.toBe(0)
    expect(outputBuffer.length).toBeLessThan(inputBuffer.byteLength)

    // now decompress
    const compressedDataStream = new ReadableStream<Uint8Array>({
      start(controller) {
        controller.enqueue(Uint8Array.from(outputBuffer))
        controller.close()
      },
    })

    outputBuffer = []

    await compressedDataStream
      .pipeThrough(new BrotliDecompressTransformer())
      .pipeTo(new collectionStream(outputBuffer))

    const result = new TextDecoder('utf-8').decode(Buffer.from(outputBuffer))

    expect(result).toBe(inputString)
  })
})

this test pass with bun 1.1.29 but fails starting with bun 1.1.30

What is the expected behavior?

the test should pass with 1.1.30 and 1.1.3

What do you see instead?

No response

Additional information

# Unhandled error between tests
-------------------------------
4 |   constructor() {
5 |     const brotli = createBrotliCompress()
6 |     const _transformer: Transformer = {
7 |       start(controller) {
8 |         brotli.on('data', chunk => {
9 |           controller.enqueue(chunk)
              ^
TypeError: TransformStream.readable cannot close or enqueue
      at E:\src\my-project\packages\lib\compression\index.ts:9:22
      at emit (node:events:183:48)
      at addChunk (node:stream:1914:22)
      at readableAddChunk (node:stream:1868:30)
      at processCallback (node:zlib:113:25)
-------------------------------
@guest271314
Copy link
Contributor

The issue is brotli data handler callback is dispatched after flush() is invoked in BrotliTransformer. It's not possible to write to a closed stream. Thus the error.

{
  transform: Uint8Array(216) [
    112, 115,  97, 109,  32, 113, 117, 105,  97,  32,  97, 100,
    105, 112, 105, 115,  99, 105,  32, 101, 116,  32, 115, 101,
    113, 117, 105,  32, 102, 117, 103,  97,  32, 114, 101, 114,
    117, 109,  32, 109,  97, 105, 111, 114, 101, 115,  46,  32,
     70, 117, 103, 105, 116,  32,  99, 111, 114, 114, 117, 112,
    116, 105,  32, 116, 101, 109, 112, 111, 114, 101,  32, 111,
    100, 105, 111,  32, 117, 116,  32, 102, 117, 103, 105,  97,
    116,  32, 115, 105, 110, 116,  32, 105, 100,  32, 113, 117,
    105,  97,  46,  32,
    ... 116 more items
  ]
}
Flush
{
  data: <Buffer 1b d7 00 50 8c d4 58 cd cc d1 54 dd 94 ad 50 97 7e 81 42 9a 70 7e b6 8e 32 72 72 83 13 c0 3c d0 c0 a2 44 33 5e 74 d9 6d a7 dc 3c f0 30 4a 52 2b 39 fd ... >
}

I would suggest using an identity TrasnsformStream and getting rid of the extension of WritableStream in collectionStream, and just use Promises, and streams chaining instead of trying to use a WritableStream just to pass an Array to.

@guest271314
Copy link
Contributor

Here's your working code using identity TransformStreams

import { createBrotliCompress, createBrotliDecompress } from "node:zlib";

const inputString =
  "psam quia adipisci et sequi fuga rerum maiores. Fugit corrupti tempore odio ut fugiat sint id quia. Nostrum perferendis alias iste quia et consequatur. Dolor consequatur perspiciatis voluptates veritatis ipsam at et.";
const inputBuffer = new TextEncoder().encode(inputString);

const {
  readable: brotliCompressorReadable,
  writable: brotliCompressorWritable,
} = new TransformStream();

const brotliCompressorWriter = brotliCompressorWritable.getWriter();

const brotliCompressor = createBrotliCompress();

brotliCompressor.on("data", async (chunk) => {
  await brotliCompressorWriter.write(new Uint8Array(chunk));
});

brotliCompressor.on("end", async () => {
  await brotliCompressorWriter.close();
});

brotliCompressor.write(inputBuffer);
brotliCompressor.end();

const {
  readable: brotliDecompressorReadable,
  writable: brotliDecompressorWritable,
} = new TransformStream();

const brotliDecompressorWriter = brotliDecompressorWritable.getWriter();

const brotliDecompress = createBrotliDecompress();

brotliDecompress.on("data", async (chunk) => {
  await brotliDecompressorWriter.write(new Uint8Array(chunk));
});

brotliDecompress.on("end", async () => {
  await brotliDecompressorWriter.close();
});

const input = new Uint8Array(
  await new Response(brotliCompressorReadable).arrayBuffer(),
);

brotliDecompress.write(input);

brotliDecompress.end();

const output = new Uint8Array(
  await new Response(brotliDecompressorReadable).arrayBuffer(),
);

const result = new TextDecoder("utf-8").decode(output);
console.log(result);

@guest271314
Copy link
Contributor

It looks like that end event is the problem, not the TransformStream. It can take a long time to dispatch.

Using bun and node the end event never dispatches.

Using deno we can see that the end event doesn't dispatch until long after we have completed the process.

class BrotliTransformer extends TransformStream {
  constructor() {
    const brotli = createBrotliCompress();
    const _transformer = {
      start(controller) {
        brotli.on("data", (chunk) => {
          console.log({ chunk });
          controller.enqueue(new Uint8Array(chunk));
        });
        brotli.on("end", () => {
          console.log("end");
          controller.terminate();
        });
      },
      transform(chunk, controller) {
        console.log("transform", chunk);

        brotli.write(chunk);
        brotli.end();
      },
      flush() {
        console.log("BrotliTransformer flush");
      },
    };
    super(_transformer);
  }
}

class BrotliDecompressTransformer extends TransformStream {
  constructor() {
    const brotli = createBrotliDecompress();
    const _transformer = {
      start(controller) {
        brotli.on("data", (chunk) => {
          controller.enqueue(new Uint8Array(chunk));
          brotli.end();
        });
        brotli.on("end", () => {
          controller.terminate();
        });
      },
      transform(chunk, controller) {
        brotli.write(chunk);
        brotli.end();
      },
      flush() {
        console.log("BrotliDecompressTransformer flush");
      },
    };
    super(_transformer);
  }
}

const compressedStream = new Blob([inputBuffer]).stream();

const compressedOutputBuffer = await new Response(compressedStream
  .pipeThrough(new BrotliTransformer())).bytes();

// ensure that compression was effective
console.log(compressedOutputBuffer.length);
console.log(compressedOutputBuffer.length, inputBuffer.byteLength);

// now decompress
const compressedDataStream = new Blob([compressedOutputBuffer]).stream();

const decompressedOutputBuffer = await new Response(compressedDataStream
  .pipeThrough(new BrotliDecompressTransformer())).bytes();
//  .pipeTo(new collectionStream(outputBuffer));

const result = new TextDecoder("utf-8").decode(decompressedOutputBuffer);
console.log(result);

Notice how we never see "end" printed to output when we use bun or node

Bun

bun run index.js
transform Uint8Array(216) [ 112, 115, 97, 109, 32, 113, 117, 105, 97, 32, 97, 100, 105, 112, 105, 115, 99, 105, 32, 101, 116, 32, 115, 101, 113, 117, 105, 32, 102, 117, 103, 97, 32, 114, 101, 114, 117, 109, 32, 109, 97, 105, 111, 114, 101, 115, 46, 32, 70, 117, 103, 105, 116, 32, 99, 111, 114, 114, 117, 112, 116, 105, 32, 116, 101, 109, 112, 111, 114, 101, 32, 111, 100, 105, 111, 32, 117, 116, 32, 102, 117, 103, 105, 97, 116, 32, 115, 105, 110, 116, 32, 105, 100, 32, 113, 117, 105, 97, 46, 32, 78, 111, 115, 116, 114, 117, 109, 32, 112, 101, 114, 102, 101, 114, 101, 110, 100, 105, 115, 32, 97, 108, 105, 97, 115, 32, 105, 115, 116, 101, 32, 113, 117, 105, 97, 32, 101, 116, 32, 99, 111, 110, 115, 101, 113, 117, 97, 116, 117, 114, 46, 32, 68, 111, 108, 111, 114, 32, 99, 111, 110, 115, 101, 113, 117, 97, 116, 117, 114, 32, 112, 101, 114, 115, 112, 105, 99, 105, 97, 116, 105, 115, 32, 118, 111, 108, 117, 112, 116, 97, 116, 101, 115, 32, 118, 101, 114, 105, 116, 97, 116, 105, 115, 32, 105, 112, 115, 97, 109, 32, 97, 116, 32, 101, 116, 46 ]
BrotliTransformer flush
0
0 216
BrotliDecompressTransformer flush

{
  chunk: Buffer(144) [ 27, 215, 0, 80, 140, 212, 88, 205, 204, 209, 84, 221, 148, 173, 80, 151, 126, 129, 66, 154, 112, 126, 182, 142, 50, 114, 114, 131, 19, 192, 60, 208, 192, 162, 68, 51, 94, 116, 217, 109, 167, 220, 60, 240, 48, 74, 82, 43, 57, 253, 201, 174, 152, 248, 90, 129, 56, 181, 85, 135, 64, 163, 248, 181, 112, 245, 29, 72, 102, 79, 204, 208, 74, 214, 128, 190, 101, 28, 203, 128, 201, 221, 137, 117, 106, 161, 45, 39, 133, 81, 122, 13, 157, 198, 122, 224, 42, 103, 79, 108, 230, 197, 228, 123, 170, 16, 143, 162, 160, 50, 19, 69, 227, 153, 94, 225, 206, 209, 15, 36, 221, 97, 51, 107, 235, 80, 88, 133, 127, 61, 189, 29, 102, 225, 103, 202, 97, 21, 180, 43, 38, 64, 122, 0 ],
}
62 |     const brotli = createBrotliCompress();
63 |     const _transformer = {
64 |       start(controller) {
65 |         brotli.on("data", (chunk) => {
66 |           console.log({ chunk });
67 |           controller.enqueue(new Uint8Array(chunk));
               ^
TypeError: TransformStream.readable cannot close or enqueue
      at /home/user/bin/bun-tests/index.js:67:22
      at emit (node:events:183:48)
      at addChunk (node:stream:1914:22)
      at readableAddChunk (node:stream:1868:30)
      at processCallback (node:zlib:113:25)

Bun v1.1.31 (Linux x64 baseline)

Node.js

node index.js
transform Uint8Array(216) [
  112, 115,  97, 109,  32, 113, 117, 105,  97,  32,  97, 100,
  105, 112, 105, 115,  99, 105,  32, 101, 116,  32, 115, 101,
  113, 117, 105,  32, 102, 117, 103,  97,  32, 114, 101, 114,
  117, 109,  32, 109,  97, 105, 111, 114, 101, 115,  46,  32,
   70, 117, 103, 105, 116,  32,  99, 111, 114, 114, 117, 112,
  116, 105,  32, 116, 101, 109, 112, 111, 114, 101,  32, 111,
  100, 105, 111,  32, 117, 116,  32, 102, 117, 103, 105,  97,
  116,  32, 115, 105, 110, 116,  32, 105, 100,  32, 113, 117,
  105,  97,  46,  32,
  ... 116 more items
]
BrotliTransformer flush
0
0 216
BrotliDecompressTransformer flush

{
  chunk: <Buffer 1b d7 00 50 8c d4 58 cd cc d1 54 dd 94 ad 50 97 7e 81 42 9a 70 7e b6 8e 32 72 72 83 13 c0 3c d0 c0 a2 44 33 5e 74 d9 6d a7 dc 3c f0 30 4a 52 2b 39 fd ... 94 more bytes>
}
node:internal/webstreams/transformstream:506
    throw new ERR_INVALID_STATE.TypeError('Unable to enqueue');
    ^

TypeError [ERR_INVALID_STATE]: Invalid state: Unable to enqueue
    at transformStreamDefaultControllerEnqueue (node:internal/webstreams/transformstream:506:11)
    at TransformStreamDefaultController.enqueue (node:internal/webstreams/transformstream:324:5)
    at BrotliCompress.<anonymous> (file:///home/user/bin/bun-tests/index.js:67:22)
    at BrotliCompress.emit (node:events:507:28)
    at addChunk (node:internal/streams/readable:559:12)
    at readableAddChunkPushByteMode (node:internal/streams/readable:510:3)
    at Readable.push (node:internal/streams/readable:390:5)
    at BrotliEncoder.processCallback (node:zlib:504:32) {
  code: 'ERR_INVALID_STATE'
}

Node.js v24.0.0-nightly2024102078b72ca7ba

Deno

Now we see the "end" printed - after we have already decompressed the ReadableStream!

deno -A index.js
transform Uint8Array(216) [
  112, 115,  97, 109,  32, 113, 117, 105,  97,  32,  97, 100,
  105, 112, 105, 115,  99, 105,  32, 101, 116,  32, 115, 101,
  113, 117, 105,  32, 102, 117, 103,  97,  32, 114, 101, 114,
  117, 109,  32, 109,  97, 105, 111, 114, 101, 115,  46,  32,
   70, 117, 103, 105, 116,  32,  99, 111, 114, 114, 117, 112,
  116, 105,  32, 116, 101, 109, 112, 111, 114, 101,  32, 111,
  100, 105, 111,  32, 117, 116,  32, 102, 117, 103, 105,  97,
  116,  32, 115, 105, 110, 116,  32, 105, 100,  32, 113, 117,
  105,  97,  46,  32,
  ... 116 more items
]
{
  chunk: <Buffer 1b d7 00 50 8c d4 58 cd cc d1 54 dd 94 ad 50 97 7e 81 42 9a 70 7e b6 8e 32 72 72 83 13 c0 3c d0 c0 a2 44 33 5e 74 d9 6d a7 dc 3c f0 30 4a 52 2b 39 fd ... >
}
BrotliTransformer flush
144
144 216
BrotliDecompressTransformer flush
psam quia adipisci et sequi fuga rerum maiores. Fugit corrupti tempore odio ut fugiat sint id quia. Nostrum perferendis alias iste quia et conse
end
error: Uncaught TypeError: Readable stream is unavailable.
          controller.enqueue(new Uint8Array(chunk));
                     ^
    at transformStreamDefaultControllerEnqueue (ext:deno_web/06_streams.js:4023:11)
    at TransformStreamDefaultController.enqueue (ext:deno_web/06_streams.js:6296:5)
    at BrotliDecompress.<anonymous> (file:///home/user/bin/bun-tests/index.js:94:22)
    at BrotliDecompress.emit (ext:deno_node/_events.mjs:393:28)
    at addChunk (ext:deno_node/_stream.mjs:2438:16)
    at readableAddChunk (ext:deno_node/_stream.mjs:2417:13)
    at BrotliDecompress.Readable.push (ext:deno_node/_stream.mjs:2356:14)
    at BrotliDecompress.flush [as _flush] (ext:deno_node/_brotli.js:79:16)
    at BrotliDecompress.final [as _final] (ext:deno_node/_stream.mjs:4526:14)
    at callFinal (ext:deno_node/_stream.mjs:3778:16)


@guest271314
Copy link
Contributor

I don't think this is an issue with TransformStream implementation, with regard to how and when data and end events of the object created by createBrotliCompress() and createBrotliDecompress() are dispatched.

Using an identity TransformStream we get the expected result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants