-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathindex.js
382 lines (333 loc) · 14 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
// @ts-check
import { Readable } from 'node:stream'
import { ReadableStream } from 'node:stream/web'
import { performance } from 'node:perf_hooks'
import { Worker } from 'node:worker_threads'
import { sparqlGetRewriteConfiguration } from 'trifid-core'
import rdf from '@zazuko/env-node'
import ReplaceStream from './lib/ReplaceStream.js'
import { authBasicHeader, objectLength, isValidUrl } from './lib/utils.js'
const defaultConfiguration = {
endpointUrl: '',
username: '',
password: '',
endpoints: {},
datasetBaseUrl: '',
allowRewriteToggle: true, // Allow the user to toggle the rewrite configuration using the `rewrite` query parameter.
rewrite: false, // Rewrite by default
rewriteQuery: true, // Allow rewriting the query
rewriteResults: true, // Allow rewriting the results
formats: {},
queryLogLevel: 'debug', // Log level for queries
serviceDescriptionWorkerUrl: new URL('./lib/serviceDescriptionWorker.js', import.meta.url),
serviceDescriptionTimeout: 5000, // max time to wait for the service description
serviceDescriptionFormat: undefined, // override the accept header for the service description request. by default, will use content negotiation using formats `@zazuko/env-node` can parse
}
const oneMonthMilliseconds = 60 * 60 * 24 * 30 * 1000
const DEFAULT_ENDPOINT_NAME = 'default'
/** @type {import('trifid-core/types').TrifidPlugin} */
const factory = async (trifid) => {
const { logger, config, trifidEvents } = trifid
const endpoints = new Map()
const options = { ...defaultConfiguration, ...config }
let dynamicEndpoints = false
if (objectLength(options.endpoints) > 0) {
// Check if the default endpoint is defined
if (!Object.hasOwnProperty.call(options.endpoints, DEFAULT_ENDPOINT_NAME)) {
throw Error('Missing default endpoint in the endpoints configuration')
}
// Override default values with the default endpoint values (in case it's a valid URL ; else it might be the default /query)
if (isValidUrl(options.endpoints.default)) {
options.endpointUrl = options.endpoints.default.url || ''
options.username = options.endpoints.default.username || ''
options.password = options.endpoints.default.password || ''
}
// Support for multiple endpoints
dynamicEndpoints = true
}
if (!options.endpointUrl) {
throw Error(
dynamicEndpoints
? `Missing endpoints.${DEFAULT_ENDPOINT_NAME}.url parameter`
: 'Missing endpointUrl parameter',
)
}
let authorizationHeader
if (options.username && options.password) {
authorizationHeader = authBasicHeader(options.username, options.password)
}
const datasetBaseUrl = options.datasetBaseUrl
const allowRewriteToggle = options.allowRewriteToggle
const rewriteConfigValue = options.rewrite
const rewriteConfig = sparqlGetRewriteConfiguration(rewriteConfigValue, datasetBaseUrl)
endpoints.set(DEFAULT_ENDPOINT_NAME, {
endpointUrl: options.endpointUrl,
username: options.username,
password: options.password,
authorizationHeader,
datasetBaseUrl,
allowRewriteToggle,
rewriteConfigValue,
rewriteConfig,
})
if (dynamicEndpoints) {
for (const [endpointName, endpointConfig] of Object.entries(options.endpoints)) {
if (endpointName === DEFAULT_ENDPOINT_NAME) {
continue
}
if (!endpointConfig.url) {
throw Error(`Missing endpoints.${endpointName}.url parameter`)
}
let endpointAuthorizationHeader
if (endpointConfig.username && endpointConfig.password) {
endpointAuthorizationHeader = authBasicHeader(endpointConfig.username, endpointConfig.password)
}
const endpointDatasetBaseUrl = endpointConfig.datasetBaseUrl || datasetBaseUrl
const endpointRewriteConfigValue = endpointConfig.rewrite ?? rewriteConfigValue
endpoints.set(endpointName, {
endpointUrl: endpointConfig.url || '',
username: endpointConfig.username || '',
password: endpointConfig.password || '',
authorizationHeader: endpointAuthorizationHeader,
datasetBaseUrl: endpointDatasetBaseUrl,
allowRewriteToggle: endpointConfig.allowRewriteToggle ?? allowRewriteToggle,
rewriteConfigValue: endpointRewriteConfigValue,
rewriteConfig: sparqlGetRewriteConfiguration(endpointRewriteConfigValue, endpointDatasetBaseUrl),
})
}
}
const queryLogLevel = options.queryLogLevel
if (!logger[queryLogLevel]) {
throw Error(`Invalid queryLogLevel: ${queryLogLevel}`)
}
/**
* Log a query, depending on the `queryLogLevel`.
* @param {string} msg Message to log
* @returns {void}
*/
const queryLogger = (msg) => logger[queryLogLevel](msg)
const worker = new Worker(options.serviceDescriptionWorkerUrl)
worker.postMessage({
type: 'config',
data: {
endpointUrl: options.endpointUrl,
serviceDescriptionTimeout: options.serviceDescriptionTimeout,
serviceDescriptionFormat: options.serviceDescriptionFormat,
authorizationHeader,
},
})
const serviceDescription = new Promise((resolve) => {
const minimalSD = rdf.clownface().blankNode().addOut(rdf.ns.rdf.type, rdf.ns.sd.Service)
worker.once('message', async (message) => {
const { type, data } = message
switch (type) {
case 'serviceDescription':
resolve(await rdf.dataset().import(
rdf.formats.parsers.import('application/n-triples', Readable.from(data)),
))
break
case 'serviceDescriptionTimeOut':
logger.warn('The proxied SPARQL endpoint did not return a Service Description in a timely fashion. Will return a minimal document')
logger.info('You can increase the timeout using the \'serviceDescriptionTimeout\' configuration')
resolve(minimalSD.dataset)
break
case 'serviceDescriptionError':
logger.error('Error while fetching the Service Description. Will return a minimal document')
logger.error(data)
resolve(minimalSD.dataset)
break
}
})
})
trifidEvents.on('close', async () => {
logger.debug('Got "close" event from Trifid ; closing worker…')
await worker.terminate().catch(logger.error.bind(logger))
logger.debug('Worker terminated')
})
return {
defaultConfiguration: async () => {
return {
methods: ['GET', 'POST'],
paths: [
'/query',
'/query/',
],
}
},
routeHandler: async () => {
/**
* Query string type.
*
* @typedef {Object} QueryString
* @property {string} [query] The SPARQL query.
* @property {string} [rewrite] Should the query and the results be rewritten?
* @property {string} [format] The format of the results.
* @property {string} [endpoint] The name of the endpoint to use (default: DEFAULT_ENDPOINT_NAME).
*/
/**
* Request body type.
* @typedef {Object} RequestBody
* @property {string} [query] The SPARQL query.
*/
/**
* Route handler.
* @param {import('fastify').FastifyRequest<{ Querystring: QueryString, Body: RequestBody | string }> & { cookies: { endpointName?: string }, accepts: () => { type: (types: string[]) => string[] | string | false }}} request Request.
* @param {import('fastify').FastifyReply & { setCookie: (name: string, value: string, opts?: any) => {}, clearCookie: (name: string, opts?: any) => {}}} reply Reply.
*/
const handler = async (request, reply) => {
const savedEndpointName = request.cookies.endpointName || DEFAULT_ENDPOINT_NAME
let endpointName = request.query.endpoint || savedEndpointName
endpointName = endpointName.replace(/[^a-z0-9-]/gi, '')
// Only set the cookie if the endpoint name has changed and if it's not the default endpoint
if (request.cookies.endpointName !== endpointName && endpointName !== DEFAULT_ENDPOINT_NAME) {
reply.setCookie('endpointName', endpointName, { maxAge: oneMonthMilliseconds, path: '/' })
// Clear the cookie if the endpoint name is the default one
} else if (endpointName === DEFAULT_ENDPOINT_NAME && request.cookies.endpointName !== undefined) {
reply.clearCookie('endpointName', { path: '/' })
}
const endpoint = endpoints.get(endpointName)
if (!endpoint) {
return reply.callNotFound()
}
logger.debug(`Using endpoint: ${endpointName}`)
let requestPort = ''
if (request.port) {
requestPort = `:${request.port}`
}
const fullUrl = `${request.protocol}://${request.hostname}${requestPort}${request.url}`
const fullUrlObject = new URL(fullUrl)
const fullUrlPathname = fullUrlObject.pathname
// Generate the IRI we expect
fullUrlObject.search = ''
fullUrlObject.searchParams.forEach((_value, key) => fullUrlObject.searchParams.delete(key))
const iriUrlString = fullUrlObject.toString()
// Handle Service Description request
if (Object.keys(request.query).length === 0 && request.method === 'GET') {
const dataset = rdf.dataset(await serviceDescription)
rdf.clownface({ dataset })
.has(rdf.ns.rdf.type, rdf.ns.sd.Service)
.addOut(rdf.ns.sd.endpoint, rdf.namedNode(fullUrl))
const accept = request.accepts()
const negotiatedTypes = accept.type([...rdf.formats.serializers.keys()])
const negotiatedType = Array.isArray(negotiatedTypes) ? negotiatedTypes[0] : negotiatedTypes
if (!negotiatedType) {
reply.code(406).send()
return reply
}
reply
.header('content-type', negotiatedType)
// @ts-ignore (cause: broken type definitions)
.send(await dataset.serialize({ format: negotiatedType }))
return reply
}
// Enforce non-trailing slash
if (fullUrlPathname.slice(-1) === '/') {
reply.redirect(`${fullUrlPathname.slice(0, -1)}`)
return reply
}
let currentRewriteConfig = endpoint.rewriteConfig
if (endpoint.allowRewriteToggle) {
let rewriteConfigValueFromQuery = endpoint.rewriteConfigValue
if (`${request.query.rewrite}` === 'false') {
rewriteConfigValueFromQuery = false
} else if (`${request.query.rewrite}` === 'true') {
rewriteConfigValueFromQuery = true
}
currentRewriteConfig = sparqlGetRewriteConfiguration(rewriteConfigValueFromQuery, endpoint.datasetBaseUrl)
}
const { rewrite: rewriteValue, iriOrigin } = currentRewriteConfig
const rewriteResponse = rewriteValue
? {
origin: endpoint.datasetBaseUrl,
replacement: iriOrigin(iriUrlString),
}
: false
let query = ''
const method = request.method
switch (method) {
case 'GET':
query = request.query.query || ''
break
case 'POST':
if (typeof request.body === 'string') {
query = request.body
}
if (typeof request.body !== 'string' && request.body.query) {
query = request.body.query
}
if (typeof query !== 'string') {
query = JSON.stringify(query)
}
break
default:
reply.code(405).send('Method Not Allowed')
return reply
}
if (rewriteResponse && options.rewriteQuery) {
query = query.replaceAll(rewriteResponse.replacement, rewriteResponse.origin)
}
logger.debug('Got a request to the sparql proxy')
queryLogger(`Received query${rewriteValue ? ' (rewritten)' : ''} via ${method}:\n${query}`)
try {
let acceptHeader = request.headers.accept || 'application/sparql-results+json'
if (request.query.format) {
acceptHeader = options.formats[request.query.format] || acceptHeader
}
const headers = {
'Content-Type': 'application/x-www-form-urlencoded',
Accept: acceptHeader,
}
if (endpoint.authorizationHeader) {
headers.Authorization = endpoint.authorizationHeader
}
const start = performance.now()
let response = await fetch(endpoint.endpointUrl, {
method: 'POST',
headers,
body: new URLSearchParams({ query }),
})
const end = performance.now()
const duration = end - start
if (!response) {
logger.warn('No response from the endpoint, make sure that the endpoint is reachable')
response = new Response(JSON.stringify({
success: false,
message: 'No response from the endpoint',
}), { status: 502, headers: { 'content-type': 'application/json' } })
}
const contentType = response.headers.get('content-type')
/** @type {any} */
let responseStream = response.body
if (rewriteResponse && options.rewriteResults) {
const replaceStream = new ReplaceStream(rewriteResponse.origin, rewriteResponse.replacement)
responseStream = Readable
.from(responseStream)
.pipe(replaceStream)
responseStream = Readable
.from(responseStream)
}
if (responseStream instanceof ReadableStream) {
responseStream = Readable.fromWeb(responseStream)
}
let proxyReply = reply
.status(response.status)
.header('Server-Timing', `sparql-proxy;dur=${duration};desc="Querying the endpoint"`)
if (contentType) {
proxyReply = proxyReply.header('content-type', contentType)
}
proxyReply.send(responseStream)
return proxyReply
} catch (error) {
logger.error('Error while querying the endpoint')
logger.error(error)
reply
.code(500)
.send('Error while querying the endpoint')
return reply
}
}
return handler
},
}
}
export default factory