forked from cryptpad/cryptpad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
expire-channels.js
114 lines (98 loc) · 2.85 KB
/
expire-channels.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
var Fs = require("fs");
var Path = require("path");
var nThen = require("nthen");
var config;
try {
config = require('./config');
} catch (e) {
config = require('./config.example');
}
var FileStorage = require(config.storage || './storage/file');
var root = Path.resolve(config.taskPath || './tasks');
var dirs;
var nt;
var store;
var queue = function (f) {
nt = nt.nThen(f);
};
var tryParse = function (s) {
try { return JSON.parse(s); }
catch (e) { return null; }
};
var CURRENT = +new Date();
var handleTask = function (str, path, cb) {
var task = tryParse(str);
if (!Array.isArray(task)) {
console.error('invalid task: not array');
return cb();
}
if (task.length < 2) {
console.error('invalid task: too small');
return cb();
}
var time = task[0];
var command = task[1];
var args = task.slice(2);
if (time > CURRENT) {
// not time for this task yet
console.log('not yet time');
return cb();
}
nThen(function (waitFor) {
switch (command) {
case 'EXPIRE':
console.log("expiring: %s", args[0]);
store.removeChannel(args[0], waitFor());
break;
default:
console.log("unknown command", command);
}
}).nThen(function () {
// remove the task file...
Fs.unlink(path, function (err) {
if (err) { console.error(err); }
cb();
});
});
};
nt = nThen(function (w) {
Fs.readdir(root, w(function (e, list) {
if (e) { throw e; }
dirs = list;
if (dirs.length === 0) {
w.abort();
return;
}
}));
}).nThen(function (waitFor) {
FileStorage.create(config, waitFor(function (_store) {
store = _store;
}));
}).nThen(function () {
dirs.forEach(function (dir, dIdx) {
queue(function (w) {
console.log('recursing into %s', dir);
Fs.readdir(Path.join(root, dir), w(function (e, list) {
list.forEach(function (fn) {
queue(function (w) {
var filePath = Path.join(root, dir, fn);
var cb = w();
console.log("processing file at %s", filePath);
Fs.readFile(filePath, 'utf8', function (e, str) {
if (e) {
console.error(e);
return void cb();
}
handleTask(str, filePath, cb);
});
});
});
if (dIdx === (dirs.length - 1)) {
queue(function () {
store.shutdown();
});
}
}));
});
});
});