diff --git a/lib/transport.js b/lib/transport.js index 39de490d8..ed2fd8662 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -86,8 +86,20 @@ function transport (fullOptions) { } if (targets) { - target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js') + target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker-mixed.js') + options.targets = targets.map((dest) => { + if (dest.pipeline) { + dest.targets = dest.targets.map((pTarget) => { + // parse pipeline target paths + return { + ...pTarget, + target: fixTarget(pTarget.target) + } + }) + return { ...dest } + } + return { ...dest, target: fixTarget(dest.target) diff --git a/lib/worker-mixed.js b/lib/worker-mixed.js new file mode 100644 index 000000000..6ac34c339 --- /dev/null +++ b/lib/worker-mixed.js @@ -0,0 +1,63 @@ +'use strict' + +const pino = require('../pino.js') +const build = require('pino-abstract-transport') +const buildPipelineStream = require('./worker-pipeline.js') +const loadTransportStreamBuilder = require('./transport-stream') +// This file is not checked by the code coverage tool, +// as it is not reliable. + +/* istanbul ignore file */ + +module.exports = async function ({ targets, levels, dedupe }) { + // build target streams + targets = await Promise.all(targets.map(async (t) => { + if (t.pipeline) { + return { + level: t.level, + stream: await buildPipelineStream(t) + } + } else { + const fn = await loadTransportStreamBuilder(t.target) + const stream = await fn(t.options) + return { + level: t.level, + stream + } + } + })) + + return build(process, { + parse: 'lines', + metadata: true, + close (err, cb) { + let expected = 0 + for (const transport of targets) { + expected++ + transport.stream.on('close', closeCb) + transport.stream.end() + } + + function closeCb () { + if (--expected === 0) { + cb(err) + } + } + } + }) + + function process (stream) { + const multi = pino.multistream(targets, { levels, dedupe }) + // TODO manage backpressure + stream.on('data', function (chunk) { + const { lastTime, lastMsg, lastObj, lastLevel } = this + multi.lastLevel = lastLevel + multi.lastTime = lastTime + multi.lastMsg = lastMsg + multi.lastObj = lastObj + + // TODO handle backpressure + multi.write(chunk + '\n') + }) + } +}