-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
77 lines (57 loc) · 3.02 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* @typedef {Object} Config configuration for the batch processor
* @property {Number} threads - The number of worker threads to create for processing
* @property {Number} parallelProcesses - The number of parallel iterations to run on each thread
* @property {Boolean} [retryOnFail] - If the BatchProcessor should attempt processing failed tasks a second time
* @property {Boolean} [autoStart] - If the BatchProcessor should start without the user calling main.startWorking()
* @property {String} [logLocation] - The path for saving success, failure, and error logs
* @property {String} [iterableName] - The singular term of the iterable items being processed
* @property {Number} [timeout] - The number of milliseconds a task can execute before being considered hung.
*/
const workerThreads = require('worker_threads');
const MainThreadManager = require('./src/mainThread.js');
const WorkerThreadManager = require('./src/workerThread.js');
module.exports = class BatchProcessor {
/**
* Attaches the BatchProcessor functionality to the sub class
* @param {Config} config - The configuration for the Batch Processor
* @param {String} filePath - The path to the subclass, used to create new instances of worker threads
*/
async initialize(config, filePath) {
try {
// Attach the configuration, main thread, and worker thread
this.config = config;
this.config.filePath = config.filePath || filePath;
this.main = MainThreadManager.isMain(this, workerThreads);
this.worker = WorkerThreadManager.isWorker(this, workerThreads);
// If this is the main thread instance, initialize and start sending tasks
if (this.main) {
await this.prepareMainThread();
this.main.setCompletionCallback(this.onBatchComplete);
if (config.autoStart) this.main.startWorking();
}
// If this is a worker thead instance, initialize and start processing tasks
if (this.worker) {
await this.prepareWorkerThread();
this.worker.startWorking();
}
} catch(err) {
console.log('Error initalizing batch processor - ' + err.stack);
throw err;
}
}
/** You can override this method to complete work before the main thread starts */
async prepareMainThread() { }
/** You can override this method to complete work before each worker thread starts */
async prepareWorkerThread() { }
/** you can override this method to perform cleanup after all tasks have been processed */
onBatchComplete() { }
/**
* The meat of the module. Inherit this class in another file and add an execute function
* Your list of work will be automatically sent to this function to be processed on different threads
* @param {*} item
*/
async execute(item) {
throw new Error('You must override the execute method in ' + this.config.filePath);
}
}