-
-
Notifications
You must be signed in to change notification settings - Fork 588
/
Copy pathstream-receiver.js
110 lines (96 loc) · 2.58 KB
/
stream-receiver.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"use strict";
const ServiceBroker = require("../src/service-broker");
const fs = require("fs");
const path = require("path");
const crypto = require("crypto");
// Create broker
const broker = new ServiceBroker({
nodeID: "streaming-receiver-" + process.pid,
transporter: "TCP",
serializer: "ProtoBuf",
logger: console,
logLevel: "info"
});
broker.createService({
name: "file2",
actions: {
save(ctx) {
const stream = ctx.params;
const fileName = "d:/received-src.zip";
broker.logger.info("Open file");
const s = fs.createWriteStream(fileName);
stream.pipe(s);
const startTime = Date.now();
stream.on("data", chunk => {
this.uploadedSize += chunk.length;
broker.logger.info(
"RECV: ",
Number((this.uploadedSize / this.stat.size) * 100).toFixed(0) +
`% (${chunk.length})`
);
});
s.on("close", () => {
getSHA(fileName).then(hash => {
broker.logger.info("File received.");
broker.logger.info("Size:", this.uploadedSize);
broker.logger.info("SHA:", hash);
broker.logger.info("Time:", Date.now() - startTime + "ms");
});
});
s.on("error", err => {
broker.logger.info("Stream error!", err);
});
}
},
created() {
this.fileName = "d:/src.zip";
this.stat = fs.statSync(this.fileName);
this.uploadedSize = 0;
}
});
broker
.start()
.then(() => {
broker.repl();
return broker.waitForServices("file");
})
.delay(1000)
.then(() => {
const fileName = "d:/src.zip";
const stat = fs.statSync(fileName);
let uploadedSize = 0;
broker.call("file.get").then(stream => {
const fileName = "d:/received-src.zip";
broker.logger.info("Open file");
const s = fs.createWriteStream(fileName);
stream.pipe(s);
const startTime = Date.now();
stream.on("data", chunk => {
uploadedSize += chunk.length;
broker.logger.info(
"RECV: ",
Number((uploadedSize / stat.size) * 100).toFixed(0) + `% (${chunk.length})`
);
});
s.on("close", () => {
getSHA(fileName).then(hash => {
broker.logger.info("File received.");
broker.logger.info("Size:", uploadedSize);
broker.logger.info("SHA:", hash);
broker.logger.info("Time:", Date.now() - startTime + "ms");
});
});
s.on("error", err => {
broker.logger.info("Stream error!", err);
});
});
});
function getSHA(fileName) {
return new Promise((resolve, reject) => {
let hash = crypto.createHash("sha1");
let stream = fs.createReadStream(fileName);
stream.on("error", err => reject(err));
stream.on("data", chunk => hash.update(chunk));
stream.on("end", () => resolve(hash.digest("hex")));
});
}