-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathapp.js
339 lines (301 loc) · 12.5 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
require("dotenv").config();
// ================== Part #1. Most variables and constants are declared here
const PORT = process.env.PORT || 3000;
const ORIGIN=process.env.ORIGIN || `http://localhost:${PORT}`;
const LTA_API_KEY_BACKUP=process.env.LTA_API_KEY_BACKUP;
// const LTA_API_KEY=process.env.LTA_API_KEY;
const API_ENDPOINT = "http://datamall2.mytransport.sg/ltaodataservice";
// http://datamall2.mytransport.sg/ltaodataservice/BusArrivalv2?BusStopCode=83139
// http://datamall2.mytransport.sg/ltaodataservice/BusServices
// http://datamall2.mytransport.sg/ltaodataservice/BusStops
const PAGE_SIZE = 500; // How many records the API returns in a page.
const LIMIT_PER_CALL=4500;
const compression = require("compression");
const request = require("request");
const path = require("path");
const favicon = require("serve-favicon");
const engine = require("consolidate");
const express = require("express");
const router = express.Router(); // set up router
router.use(express.urlencoded({extended: true}));
router.use(express.json());
router.use((req, res, next) => { // router middleware
res.header("Access-Control-Allow-Origin", ORIGIN || "*");
next();
});
// ========== Part #2. A redis client is instantiated here to fetch all of its non-expired cached information
// ========== This includes Bus services, Bus Routes, Bus Stops & Bus ETAs for pre-load rendering
// ========== and mitigate excessive load time for the web app on start
const redis = require("redis");
const url = require("url");
const redis_username=process.env.REDIS_USERNAME;
const redis_password=process.env.REDIS_PASSWORD;
const redis_endpoint_uri=process.env.REDIS_ENDPOINT_URI;
const redis_db=process.env.REDIS_DB;
const redisStr=`redis://${redis_username}:${redis_password}@${redis_endpoint_uri}/${redis_db}`;
const redisURL = url.parse(redisStr);
const connectedRedisClient = (redisClient) => new Promise((resolve, reject) => {
redisClient.on("connect", () => resolve(redisClient));
redisClient.on("error", (err) => reject(err));
});
var redisClient;
(async()=> {
const redisClientInstance = redis.createClient(redisURL.port, redisURL.hostname, {no_ready_check: true});
redisClientInstance.auth(redisURL.auth.split(":")[1]);
try {
redisClient = await connectedRedisClient(redisClientInstance);
console.log("Successfully connected to Redis instance.");
} catch(err) {
console.log(err);
console.log("Failed to connect to Redis instance.");
}
// ====================================================================
// ================== Part #3. All server side API calls are called via the below functions and the redis Client updatse its data
// ================== storage with the API outputs in event the cache of its storage has expired (to fetch only up-to-date data)
function resolveAsyncCall(reqOptions) {
return new Promise(resolve => {
request(reqOptions, function(err, res, body) {
let result=body.value;
resolve(result);
});
});
}
async function asyncCall(transportation) {
var arr_result=[];
var offset = 0;
var options={
url: `${API_ENDPOINT}/${transportation}?$skip=${offset}`,
method: "GET",
json: true,
headers: {
"AccountKey" : LTA_API_KEY_BACKUP,
"accept" : "application/json"
}
};
var result = [];
var toContinue=true;
while(toContinue) {
if(offset==0 || result.length==PAGE_SIZE) {
result = await resolveAsyncCall(options);
offset += PAGE_SIZE;
options.url=`${API_ENDPOINT}/${transportation}?$skip=${offset}`;
} else if(result.length < PAGE_SIZE) {
toContinue=false;
}
arr_result=arr_result.concat(result);
}
return new Promise(resolve => {
resolve(arr_result);
});
};
router.post("/ltaodataservice/all/:transportation", async (req, res) => {
try {
let params=req.params;
let transportation=params["transportation"];
if(typeof redisClient !== "undefined") {
let cacheKey=`${transportation}_hash`;
redisClient.get(cacheKey, (err, data) => {
if (err) {
console.error(err);
throw err;
}
if (data) {
console.log(`${cacheKey} is retrieved from Redis`);
return res.status(200).json(JSON.parse(data));
} else {
let entireListing;
(async () => {
try {
entireListing=await asyncCall(transportation);
} catch(e) {
console.log(e);
}
let cacheExpirySeconds=60*60*24*60;
redisClient.setex(cacheKey, cacheExpirySeconds, JSON.stringify(entireListing));
console.log(`${cacheKey} retrieved from the API`);
return res.status(200).json(entireListing);
})();
}
});
} else {
console.log('empty array');
return res.status(200).json([]);
}
} catch(err) {
return res.status(500).json({
type: "error",
message: (err !== null && typeof err.message !== "undefined") ? err.message : `Error. Unable to retrieve data from datamall.lta.gov.sg ${transportation} Routing API.`
});
}
});
router.post("/ltaodataservice/:transportation/:client_offset", async(req, res) => {
try {
let params=req.params;
let transportation=params["transportation"];
let client_offset=params["client_offset"];
client_offset=parseInt(client_offset);
function resolveAsyncCall(reqOptions) {
return new Promise(resolve => {
request(reqOptions, function(err, res, body) {
let result=body.value;
resolve(result);
});
});
}
async function asyncCall(transportation) {
var arr_result=[];
var offset = client_offset;
var options={
url: `${API_ENDPOINT}/${transportation}?$skip=${offset}`,
method: "GET",
json: true,
headers: {
"AccountKey" : LTA_API_KEY_BACKUP,
"accept" : "application/json"
}
};
var result = [];
var toContinue=true;
while(toContinue) {
if(offset==(client_offset+LIMIT_PER_CALL)) {
toContinue=false;
} else if(offset==client_offset || result.length==PAGE_SIZE) {
result = await resolveAsyncCall(options);
offset += PAGE_SIZE;
options.url=`${API_ENDPOINT}/${transportation}?$skip=${offset}`;
} else if(
(offset>client_offset)
&& ( offset<(client_offset+LIMIT_PER_CALL) && (result.length<PAGE_SIZE) )
) {
toContinue=false;
}
arr_result=arr_result.concat(result);
}
return new Promise(resolve => {
resolve(arr_result);
});
};
if(typeof redisClient !== "undefined") {
let cacheKey=`${transportation}_hash_${client_offset}`;
redisClient.get(cacheKey, (err, data) => {
if (err) {
console.error(err);
throw err;
}
if (data) {
console.log(`${cacheKey} is retrieved from Redis`);
return res.status(200).json(JSON.parse(data));
} else {
(async () => {
let entireSubListing;
try {
entireSubListing=await asyncCall(transportation);
} catch(e) {
console.log(e);
}
let cacheExpirySeconds=60*60*24*60;
redisClient.setex(cacheKey, cacheExpirySeconds, JSON.stringify(entireSubListing));
console.log(`${cacheKey} retrieved from the API`);
return res.status(200).json(entireSubListing);
})();
}
});
} else {
console.log('empty array');
return res.status(200).json([]);
}
} catch(err2) {
return res.status(500).json({
type: "error",
message: (err2 !== null && typeof err2.message !== "undefined") ? err2.message : `Error. Unable to retrieve data from datamall.lta.gov.sg ${transportation} API.`
});
}
});
const app = express();
app.use(compression()); //use compression
// ================== Part #4. Server side socket is set up via socketio and http server below. Connection with client side must be
// ================== established before bilateral messages can be exchanged
const http = require("http");
const socketio = require("socket.io");
const server = http.createServer(app);
const io = socketio(server);
// important! must listen from `server`, not `app`, otherwise socket.io won't function correctly
server.listen(PORT, () => {
console.log(`SG Transportation App [using Forward Proxy] is listening on port ${PORT}!`);
});
// REGISTER ALL ROUTES -------------------------------
// all of the routes will be prefixed with /api
app.use("/api", router);
// set up express app properties + serve static assets
app.use(express.static(path.join(__dirname, "public")))
.set("views", path.join(__dirname, "views"))
.engine("html", engine.mustache)
.use(favicon(path.join(__dirname, "public", "img/favicon.ico")))
.set("view engine", "html")
.get("/", (req, res) => res.render("index.html"))
const onlineClients = new Set(); // Used to track the no. of connected client sockets and ids
const previousBusCode = new Map(); // Stores the latst bus stop no. ETAs requested by a client
const updateInterval = new Map(); // Stores latest intervalID of the socket tagged to its client to stop fetching data when not needed
// whenever a new user logs onto the web app a new client socket shall be established and connected
function onNewWebsocketConnection(socket) {
console.info(`Server side socket[${socket.id}] connection established.`);
// awaits for client-side to callback and confirm connection.
// echoes on the terminal every "back_to_server" message this socket sends
socket.on("back_to_server", msg => {
console.info(`Client side socket id: ${msg}`);
if(msg==socket.id) {
onlineClients.add(socket.id);
previousBusCode.set(socket.id, undefined);
updateInterval.set(socket.id, undefined);
}
});
// server side receives bus stop code from client side socket
socket.on("bus_arrivals", bus_stop_code => {
let intervalID=updateInterval.get(socket.id);
let prevBusCode=previousBusCode.get(socket.id);
// when bus_stop_code is undefined it means client side has no required information
// to transfer to server side to fetch the Bus ETAs
if(typeof bus_stop_code==="undefined") {
if( (typeof intervalID!=="undefined") ) { // When user had requested for another bus stop's ETAs previously
clearInterval(intervalID);
updateInterval.set(socket.id, undefined);
prevBusCode=undefined; // When user selects another bus stop, the ETAs for the previous selection should be removed
previousBusCode.set(socket.id, undefined);
}
} else if( (typeof prevBusCode==="undefined") || (prevBusCode !== bus_stop_code) ) { // User has selected another bus stop
previousBusCode.set(socket.id, bus_stop_code);
if( (typeof intervalID!=="undefined") ) { // To stop fetch ETAs for previous bus stop selected
clearInterval(intervalID);
updateInterval.set(socket.id, undefined);
}
intervalID = setInterval(() => {
request({
url: `${API_ENDPOINT}/BusArrivalv2?BusStopCode=${bus_stop_code}`,
method: "GET",
json: true,
headers: {
"AccountKey" : LTA_API_KEY_BACKUP,
"accept" : "application/json"
}
}, (err, res, body) => {
socket.emit("get_bus_arrivals_info", JSON.stringify(body["Services"]));
});
}, 10000);
updateInterval.set(socket.id, intervalID); // update the stored interval ID
}
});
socket.on("disconnect", () => {
onlineClients.delete(socket.id);
previousBusCode.delete(socket.id);
updateInterval.delete(socket.id);
console.info(`Server side socket[${socket.id}] has disconnected.`);
});
}
// will fire for every new socket connection: every user logs onto the web app
io.on("connection", onNewWebsocketConnection);
// broadcast here
/*
setInterval(() => {
io.emit("online_clients_tracker", onlineClients.size);
}, 10000);*/
})();