Skip to content

Commit

Permalink
fix streaming meta issue in case of random chunks order
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Aug 15, 2022
1 parent b8da3cc commit 308d00d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 22 deletions.
7 changes: 6 additions & 1 deletion dev/issue-1100-receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ broker.createService({
const participants = [];
ctx.params.on("data", d => participants.push(d));
ctx.params.on("end", () =>
this.logger.info("received stream data", participants.length, ctx.meta)
this.logger.info(
"received stream data",
participants.length,
ctx.meta,
participants
)
);
return "OK";
} else {
Expand Down
16 changes: 11 additions & 5 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ class Transit {

// Create a new pass stream
pass = new Transform({
// TODO: It's incorrect because the chunks may receive in random order, so it processes an empty meta.
// Meta is filled correctly only in the 0. chunk.
objectMode: payload.meta && payload.meta["$streamObjectMode"],
transform: function (chunk, encoding, done) {
this.push(chunk);
Expand All @@ -562,7 +564,7 @@ class Transit {
// TODO: check length of pool.
// TODO: reset seq

return isNew ? pass : null;
return null;
}

// the next stream chunk received
Expand Down Expand Up @@ -612,7 +614,7 @@ class Transit {
}
}

return isNew ? pass : null;
return pass && payload.seq == 0 ? pass : null;
}

/**
Expand Down Expand Up @@ -686,6 +688,8 @@ class Transit {
);

pass = new Transform({
// TODO: It's incorrect because the chunks may receive in random order, so it processes an empty meta.
// Meta is filled correctly only in the 0. chunk.
objectMode: packet.meta && packet.meta["$streamObjectMode"],
transform: function (chunk, encoding, done) {
this.push(chunk);
Expand All @@ -697,8 +701,6 @@ class Transit {
pass.$pool = new Map();

this.pendingResStreams.set(packet.id, pass);

req.resolve(pass);
}

if (packet.seq > pass.$prevSeq + 1) {
Expand All @@ -719,6 +721,10 @@ class Transit {
// the next stream chunk received
pass.$prevSeq = packet.seq;

if (pass && packet.seq == 0) {
req.resolve(pass);
}

if (pass.$prevSeq > 0) {
if (!packet.stream) {
// Received error?
Expand Down Expand Up @@ -749,7 +755,7 @@ class Transit {

// Check newer chunks in the pool
if (pass.$pool.size > 0) {
this.logger.warn(`Has stored packets. Size: ${pass.$pool.size}`);
this.logger.debug(`Has stored packets. Size: ${pass.$pool.size}`);
const nextSeq = pass.$prevSeq + 1;
const nextPacket = pass.$pool.get(nextSeq);
if (nextPacket) {
Expand Down
34 changes: 18 additions & 16 deletions test/unit/transit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1098,21 +1098,22 @@ describe("Test Transit._handleIncomingRequestStream", () => {
const payload = { ver: "4", sender: "remote", action: "posts.import", id: "124" };

it("should create new stream", () => {
expect(
transit._handleIncomingRequestStream(
Object.assign({}, payload, { stream: true, seq: 1, params: "CHUNK-1" })
)
).toBeNull();
});

it("should reorder chunks", () => {
const pass = transit._handleIncomingRequestStream(
Object.assign({}, payload, { stream: true, seq: 1, params: "CHUNK-1" })
Object.assign({}, payload, { stream: true, seq: 0 })
);
expect(pass).toBeInstanceOf(Transform);
pass.on("data", data => STORE.push(data.toString()));
pass.on("error", () => STORE.push("-- ERROR --"));
pass.on("end", () => STORE.push("-- END --"));
});

it("should reorder chunks", () => {
expect(
transit._handleIncomingRequestStream(
Object.assign({}, payload, { stream: true, seq: 0 })
)
).toBeNull();
expect(
transit._handleIncomingRequestStream(
Object.assign({}, payload, { stream: true, seq: 4, params: "CHUNK-4" })
Expand Down Expand Up @@ -1144,7 +1145,7 @@ describe("Test Transit._handleIncomingRequestStream", () => {
)
).toBeNull();

return broker.Promise.delay(100).then(() => {
return broker.Promise.delay(500).then(() => {
expect(STORE).toEqual([
"CHUNK-1",
"CHUNK-2",
Expand Down Expand Up @@ -1705,12 +1706,6 @@ describe("Test Transit._handleIncomingResponseStream", () => {
req
)
).toBe(true);
expect(req.resolve).toHaveBeenCalledTimes(1);
const pass = req.resolve.mock.calls[0][0];
expect(pass).toBeInstanceOf(Transform);
pass.on("data", data => STORE.push(data.toString()));
pass.on("error", errorHandler);
pass.on("end", () => STORE.push("-- END --"));
});

it("should reorder chunks", () => {
Expand All @@ -1720,6 +1715,13 @@ describe("Test Transit._handleIncomingResponseStream", () => {
req
)
).toBe(true);
expect(req.resolve).toHaveBeenCalledTimes(1);
const pass = req.resolve.mock.calls[0][0];
expect(pass).toBeInstanceOf(Transform);
pass.on("data", data => STORE.push(data.toString()));
pass.on("error", errorHandler);
pass.on("end", () => STORE.push("-- END --"));

expect(
transit._handleIncomingResponseStream(
Object.assign({}, payload, { stream: true, seq: 4, data: "CHUNK-4" }),
Expand Down Expand Up @@ -1757,7 +1759,7 @@ describe("Test Transit._handleIncomingResponseStream", () => {
)
).toBe(true);

return broker.Promise.delay(100).then(() => {
return broker.Promise.delay(500).then(() => {
expect(STORE).toEqual([
"CHUNK-1",
"CHUNK-2",
Expand Down

0 comments on commit 308d00d

Please sign in to comment.