From 9b220f0f845f54b0dfd1af31d96d95c56adab74c Mon Sep 17 00:00:00 2001 From: Starkrights Date: Fri, 3 Mar 2023 13:19:12 -0600 Subject: [PATCH 1/2] Add worker-mixed.js --- lib/worker-mixed.js | 67 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 lib/worker-mixed.js diff --git a/lib/worker-mixed.js b/lib/worker-mixed.js new file mode 100644 index 000000000..93158db24 --- /dev/null +++ b/lib/worker-mixed.js @@ -0,0 +1,67 @@ +'use strict' + +const pino = require('../pino.js') +const build = require('pino-abstract-transport') +const buildPipelineStream = require('./worker-pipeline.js') +const loadTransportStreamBuilder = require('./transport-stream') + + +// This file is not checked by the code coverage tool, +// as it is not reliable. + +/* istanbul ignore file */ + +module.exports = async function ({ targets, levels, dedupe }) { + // build target streams + targets = await Promise.all(targets.map(async (t) => { + if(t.pipeline){ + return { + level: t.level, + stream: await buildPipelineStream(t) + } + } + else { + const fn = await loadTransportStreamBuilder(t.target) + const stream = await fn(t.options) + return { + level: t.level, + stream + } + } + + })); + + return build(process, { + parse: 'lines', + metadata: true, + close (err, cb) { + let expected = 0 + for (const transport of targets) { + expected++ + transport.stream.on('close', closeCb) + transport.stream.end() + } + + function closeCb () { + if (--expected === 0) { + cb(err) + } + } + } + }) + + function process (stream) { + const multi = pino.multistream(targets, { levels, dedupe }) + // TODO manage backpressure + stream.on('data', function (chunk) { + const { lastTime, lastMsg, lastObj, lastLevel } = this + multi.lastLevel = lastLevel + multi.lastTime = lastTime + multi.lastMsg = lastMsg + multi.lastObj = lastObj + + // TODO handle backpressure + multi.write(chunk + '\n') + }) + } +} From 214d9ee7199970165d77fb1517cb27618059025d Mon Sep 17 00:00:00 2001 From: Starkrights Date: Fri, 3 Mar 2023 13:50:54 -0600 Subject: [PATCH 2/2] Add initial worker-mixed.js integration --- lib/transport.js | 14 +++++++++++++- lib/worker-mixed.js | 12 ++++-------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/lib/transport.js b/lib/transport.js index 39de490d8..ed2fd8662 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -86,8 +86,20 @@ function transport (fullOptions) { } if (targets) { - target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js') + target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker-mixed.js') + options.targets = targets.map((dest) => { + if (dest.pipeline) { + dest.targets = dest.targets.map((pTarget) => { + // parse pipeline target paths + return { + ...pTarget, + target: fixTarget(pTarget.target) + } + }) + return { ...dest } + } + return { ...dest, target: fixTarget(dest.target) diff --git a/lib/worker-mixed.js b/lib/worker-mixed.js index 93158db24..6ac34c339 100644 --- a/lib/worker-mixed.js +++ b/lib/worker-mixed.js @@ -2,10 +2,8 @@ const pino = require('../pino.js') const build = require('pino-abstract-transport') -const buildPipelineStream = require('./worker-pipeline.js') +const buildPipelineStream = require('./worker-pipeline.js') const loadTransportStreamBuilder = require('./transport-stream') - - // This file is not checked by the code coverage tool, // as it is not reliable. @@ -14,13 +12,12 @@ const loadTransportStreamBuilder = require('./transport-stream') module.exports = async function ({ targets, levels, dedupe }) { // build target streams targets = await Promise.all(targets.map(async (t) => { - if(t.pipeline){ + if (t.pipeline) { return { level: t.level, stream: await buildPipelineStream(t) } - } - else { + } else { const fn = await loadTransportStreamBuilder(t.target) const stream = await fn(t.options) return { @@ -28,8 +25,7 @@ module.exports = async function ({ targets, levels, dedupe }) { stream } } - - })); + })) return build(process, { parse: 'lines',