-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathforEach.js
128 lines (102 loc) · 2.92 KB
/
forEach.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import { promisify } from 'node:util'
import { context } from '@opentelemetry/api'
import stream from 'readable-stream'
import ReadableToReadable from 'readable-to-readable'
const { finished, Duplex } = stream
/**
* @typedef {Pick<import('barnard59-core').Context, 'createPipeline' | 'variables'> & {
* pipeline: PipelineStream
* variable: string
* }} ForEachOptions
*
* @typedef {import('stream').Duplex & {
* pipeline: import('barnard59-core').Pipeline
* }} PipelineStream
*/
async function nextLoop() {
return new Promise(resolve => setTimeout(resolve, 0))
}
class ForEach extends Duplex {
/**
* @param {ForEachOptions} context
*/
constructor({ createPipeline, pipeline, variable, variables }) {
super({ objectMode: true })
// Bind the read and write function to the current context so the trace ID
// gets properly propagated
this._read = context.bind(context.active(), this._read)
this._write = context.bind(context.active(), this._write)
this.createPipeline = createPipeline
// we only need the ptr of the pipeline to create new copies...
this.ptr = pipeline.pipeline.ptr
// ...so let's destroy it immediately
pipeline.destroy()
this.variable = variable
this.variables = variables
this.pull = null
}
/**
* @type import('barnard59-core').Pipeline
*/
get subPipeline() {
// @ts-ignore
return this.step.children[0]
}
set subPipeline(subPipeline) {
// @ts-ignore
this.step.children[0] = subPipeline
}
/**
* @param {*} chunk
* @param {string} encoding
* @param {(error?: (Error | null)) => void} callback
*/
async _write(chunk, encoding, callback) {
// @ts-ignore
try {
const variables = new Map(this.variables)
if (this.variable) {
variables.set(this.variable, chunk)
}
this.subPipeline = this.createPipeline(this.ptr, { variables })
// @ts-ignore
this.pull = ReadableToReadable.readFrom(this.subPipeline.stream, { end: false })
if (this.subPipeline.writable) {
// @ts-ignore
this.subPipeline.stream.end(chunk)
}
await promisify(finished)(this.subPipeline.stream)
this.pull = null
return callback()
} catch (/** @type {any} */ cause) {
const err = new Error(`error in forEach sub-pipeline ${this.ptr.value}`)
err.stack += `\nCaused by: ${cause.stack}`
return callback(err)
}
}
async _read() {
if (this._writableState.finished) {
return this.push(null)
}
if (this.pull && !await this.pull()) {
return
}
await nextLoop()
this._read()
}
}
/**
* @this {import('barnard59-core').Context}
* @param {PipelineStream} pipeline
* @param {string} variable
* @return {ForEach}
*/
function factory(pipeline, variable) {
return new ForEach({
pipeline,
createPipeline: this.createPipeline,
variable,
variables: this.variables,
})
}
export default factory