Skip to content
This repository has been archived by the owner on Jun 22, 2023. It is now read-only.

Monitoring step #28

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions test/throughput.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { strictEqual } from 'assert'
import { array } from 'get-stream'
import { isDuplex } from 'isstream'
import { describe, it } from 'mocha'
import { Readable } from 'readable-stream'
import throughput from '../throughput.js'

describe('throughput', () => {
it('should be a function', () => {
strictEqual(typeof throughput, 'function')
})

it('should return a duplex', () => {
const s = throughput({})
strictEqual(isDuplex(s), true)
})

it('should pipe elements on objectMode false', async () => {
const s = throughput({
interval: 2,
out: {
write: str => {}
}
})
const stringsArray = [...Array(10).keys()].map(toString)
const stream = Readable.from(stringsArray, {
objectMode: false
})
stream.pipe(s)
strictEqual((await array(s)).length, 10)
})

it('should pipe elements on objectMode true', async () => {
const s = throughput({
interval: 2,
out: {
write: str => {}
}
})
const objectsArray = [...Array(10).keys()].map(number => {
return { number: number }
})
const stream = Readable.from(objectsArray, {
objectMode: true
})
stream.pipe(s)
strictEqual((await array(s)).length, 10)
})

it('should write in the console', async () => {
const seen = []
const spy = {
write: str => {
seen.push(str)
}
}
const s = throughput({
interval: 2,
out: spy
})
const stream = Readable.from([...Array(10).keys()].map(toString), {
objectMode: true
})
stream.pipe(s)
await array(s)
strictEqual(seen.length > 0, true)
})
})
61 changes: 61 additions & 0 deletions throughput.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { cursorTo, clearLine } from 'readline'
import { PassThrough } from 'stream'

function throughput ({
interval,
label,
out
}) {
let count = 0
let lastCount = 0
let started = false

const timeUnit = interval === 1000 ? ' per second' : `/${interval} ms`

function update () {
if (count) {
if (lastCount === count) {
return
}
const str = `[${label}] count: ${count} (${(count - lastCount)}${timeUnit})`
lastCount = count
cursorTo(out, 0)
out.write(str)
clearLine(out, 1)
}
setTimeout(update, interval)
}

return new PassThrough({
objectMode: true,
write (chunk, encoding, callback) {
if (!started) {
out.write(`[${label}][start] ${new Date()}\n`)
started = true
update()
}
this.push(chunk)
count++
callback()
},
flush (callback) {
out.write(`[${label}][total] ${count} chunks\n`)
out.write(`[${label}][end] ${new Date()}\n`)
callback()
}
})
}

function factory ({
interval = 1000,
label = 'throughput',
out = process.stdout
}) {
return throughput({
interval,
label,
out
})
}

export default factory