-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
107 lines (83 loc) · 2.86 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
'use strict';
var util = require('util');
var TransformStream = require('stream').Transform;
var async = require('async');
function SwiftTransformStream(options) {
TransformStream.call(this, options);
modify(this, options && options.concurrency);
}
util.inherits(SwiftTransformStream, TransformStream);
// -------------------------------------------
function transform(data, encoding, callback) {
/* jshint validthis:true */
// Ignore if killed
/* istanbul ignore if */
if (this._swiftTransform.killed) {
return;
}
// Push the task
this._swiftTransform.queue.push({ data: data, encoding: encoding }, transformDone.bind(this));
// If we are not saturated, simply call more right away
if (this._swiftTransform.queue.running() + this._swiftTransform.queue.length() < this._swiftTransform.queue.concurrency) {
setImmediate(callback);
// Otherwise store the callback because we will need it later
} else {
this._swiftTransform.callback = callback;
}
}
function flush(callback) {
/* jshint validthis:true */
var _flush = this._swiftTransform.flush || function (callback) { callback(); };
// Ignore if killed
/* istanbul ignore if */
if (this._swiftTransform.killed) {
return;
}
if (this._swiftTransform.queue.idle()) {
return _flush.call(this, callback);
}
this._swiftTransform.queue.drain = _flush.bind(this, callback);
}
function doTransform(task, callback) {
/*jshint validthis:true*/
this._swiftTransform.transform.call(this, task.data, task.encoding, callback);
}
function transformDone(err, data) {
/*jshint validthis:true*/
var callback;
// Ignore if killed
if (this._swiftTransform.killed) {
return;
}
// If the transform failed, emit error and kill the queue
if (err) {
this._swiftTransform.killed = true;
this._swiftTransform.callback = null;
this._swiftTransform.queue.kill();
this.emit('error', err);
// Otherwise push the data and call callback to keep data flowing
} else {
data != null && this.push(data);
if (this._swiftTransform.callback) {
callback = this._swiftTransform.callback;
this._swiftTransform.callback = null;
callback();
}
}
}
function modify(stream, concurrency) {
if (stream._swiftTransform) {
return stream;
}
stream._swiftTransform = {};
// Copy user transform & flush methods and replace with ours
stream._swiftTransform.transform = stream._transform;
stream._swiftTransform.flush = stream._flush;
stream._transform = transform;
stream._flush = flush;
// Queue setup
stream._swiftTransform.queue = async.queue(doTransform.bind(stream), concurrency || 1);
return stream;
}
module.exports = modify;
module.exports.Transform = SwiftTransformStream;