From 9b404f24c29ea3c0129c1848e07075c9782dfcd4 Mon Sep 17 00:00:00 2001 From: Ludovic Muller Date: Thu, 8 Feb 2024 09:53:34 +0100 Subject: [PATCH] handler-fetch: improve the way to work with the worker --- .../test/entity-renderer.test.js | 82 +++++++------------ packages/handler-fetch/index.js | 39 +++++++-- packages/handler-fetch/lib/utils.js | 28 +++++++ packages/handler-fetch/lib/worker.js | 35 ++++---- 4 files changed, 107 insertions(+), 77 deletions(-) create mode 100644 packages/handler-fetch/lib/utils.js diff --git a/packages/entity-renderer/test/entity-renderer.test.js b/packages/entity-renderer/test/entity-renderer.test.js index 9e4e703f..3d373acf 100644 --- a/packages/entity-renderer/test/entity-renderer.test.js +++ b/packages/entity-renderer/test/entity-renderer.test.js @@ -3,7 +3,7 @@ /* eslint-disable no-useless-catch */ import { strictEqual } from 'assert' -import { describe, it } from 'mocha' +import { describe, it, beforeEach, afterEach } from 'mocha' import { createTrifidInstance } from '../examples/instance.js' import { getListenerURL } from './support/utils.js' @@ -11,65 +11,43 @@ import { getListenerURL } from './support/utils.js' const trifidConfigUrl = './examples/config/trifid.yaml' describe('@zazuko/trifid-entity-renderer', () => { - describe('basic tests', () => { - it('should create a middleware with factory and default options', async () => { - const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn') - const trifidListener = await trifidInstance.start() - trifidListener.close() - }) + let trifidListener - it('should be able to load a rendered entity', async () => { - const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn') - const trifidListener = await trifidInstance.start() + beforeEach(async () => { + const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn') + trifidListener = await trifidInstance.start() + }) + + afterEach(() => { + trifidListener.close() + }) - try { - const entityUrl = `${getListenerURL(trifidListener)}/person/amy-farrah-fowler` - const res = await fetch(entityUrl) - strictEqual(res.status, 200) - const resText = await res.text() - strictEqual(resText.toLocaleLowerCase().includes('amy'), true) - } catch (e) { - throw e - } finally { - trifidListener.close() - } + describe('basic tests', () => { + it('should be able to load a rendered entity', async () => { + const entityUrl = `${getListenerURL(trifidListener)}/person/amy-farrah-fowler` + const res = await fetch(entityUrl) + strictEqual(res.status, 200) + const resText = await res.text() + strictEqual(resText.toLocaleLowerCase().includes('amy'), true) }) it('should be able to load a rendered entity using HTML', async () => { - const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn') - const trifidListener = await trifidInstance.start() - - try { - const entityUrl = `${getListenerURL(trifidListener)}/person/amy-farrah-fowler` - const res = await fetch(entityUrl, { - headers: { - accept: 'text/html', - }, - }) - strictEqual(res.status, 200) - const resText = await res.text() - strictEqual(resText.toLocaleLowerCase().includes(' { - const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn') - const trifidListener = await trifidInstance.start() - - try { - const entityUrl = `${getListenerURL(trifidListener)}/person/someone-that-does-not-exist` - const res = await fetch(entityUrl) - strictEqual(res.status, 404) - } catch (e) { - throw e - } finally { - trifidListener.close() - } + const entityUrl = `${getListenerURL(trifidListener)}/person/someone-that-does-not-exist` + const res = await fetch(entityUrl) + strictEqual(res.status, 404) }) }) }) diff --git a/packages/handler-fetch/index.js b/packages/handler-fetch/index.js index 0cbea5f6..855fed5b 100644 --- a/packages/handler-fetch/index.js +++ b/packages/handler-fetch/index.js @@ -2,6 +2,7 @@ import { Worker } from 'node:worker_threads' import { v4 as uuidv4 } from 'uuid' +import { waitForVariableToBeTrue } from './lib/utils.js' /** @type {import('trifid-core/dist/types/index.d.ts').TrifidMiddleware} */ export const factory = async (trifid) => { @@ -11,11 +12,16 @@ export const factory = async (trifid) => { const workerUrl = new URL('./lib/worker.js', import.meta.url) const worker = new Worker(workerUrl) + let ready = false + worker.on('message', async (message) => { - const { type, data } = JSON.parse(`${message}`) + const { type, data } = message if (type === 'log') { logger.debug(data) } + if (type === 'ready') { + ready = true + } }) worker.on('error', (error) => { @@ -26,34 +32,51 @@ export const factory = async (trifid) => { logger.info(`Worker exited with code ${code}`) }) - worker.postMessage(JSON.stringify({ + worker.postMessage({ type: 'config', data: { contentType, url, baseIri, graphName, unionDefaultGraph, }, - })) + }) + /** + * Send the query to the worker and wait for the response. + * + * @param {string} query The SPARQL query + * @returns {Promise<{ response: string, contentType: string }>} The response and its content type + */ const handleQuery = async (query) => { return new Promise((resolve, _reject) => { const queryId = uuidv4() - worker.postMessage(JSON.stringify({ + worker.postMessage({ type: 'query', data: { queryId, query, }, - })) + }) - worker.on('message', (message) => { - const { type, data } = JSON.parse(`${message}`) + const messageHandler = (message) => { + const { type, data } = message if (type === 'query' && data.queryId === queryId) { + worker.off('message', messageHandler) resolve(data) } - }) + } + + worker.on('message', messageHandler) }) } + // Wait for the worker to become ready, so we can be sure it can handle queries + await waitForVariableToBeTrue( + () => ready, + 30000, + 20, + 'Worker did not become ready within 30 seconds', + ) + return async (req, res, _next) => { let query if (req.method === 'GET') { diff --git a/packages/handler-fetch/lib/utils.js b/packages/handler-fetch/lib/utils.js new file mode 100644 index 00000000..0efd77e0 --- /dev/null +++ b/packages/handler-fetch/lib/utils.js @@ -0,0 +1,28 @@ +// @ts-check + +/** + * Wait for a variable to be truthy, with a timeout. + * + * @param {Function} getValueFunction A function that needs to return a truthy value to resolve the promise + * @param {number} [timeoutMs] The maximum time to wait for the variable to be truthy, in milliseconds + * @param {number} [checkIntervalMs] The interval at which to check the variable's value, in milliseconds + * @param {string} [errorMessage] The error message to use if the promise is rejected + * @returns {Promise} + */ +export const waitForVariableToBeTrue = async (getValueFunction, timeoutMs = 30000, checkIntervalMs = 20, errorMessage = 'Reached Timeout') => { + return new Promise((resolve, reject) => { + // Check the variable's value periodically + const interval = setInterval(() => { + if (getValueFunction()) { + clearInterval(interval) + resolve() + } + }, checkIntervalMs) + + // Set a timeout to reject the promise if the time exceeds the specified duration + setTimeout(() => { + clearInterval(interval) + reject(new Error(errorMessage)) + }, timeoutMs) + }) +} diff --git a/packages/handler-fetch/lib/worker.js b/packages/handler-fetch/lib/worker.js index 24999fc8..c4b38465 100644 --- a/packages/handler-fetch/lib/worker.js +++ b/packages/handler-fetch/lib/worker.js @@ -30,10 +30,10 @@ const getContent = async (url) => { // Create a store const store = new oxigraph.Store() -parentPort.postMessage(JSON.stringify({ +parentPort.postMessage({ type: 'log', data: 'Created store', -})) +}) // Handle configuration const handleConfig = async (config) => { @@ -45,49 +45,50 @@ const handleConfig = async (config) => { // Read data from file or URL const data = await getContent(url) - parentPort.postMessage(JSON.stringify({ + parentPort.postMessage({ type: 'log', data: `Loaded ${data.length} bytes of data from ${url}`, - })) + }) // Load the data into the store store.load(data, contentType, baseIri, graphNameIri) - parentPort.postMessage(JSON.stringify({ + parentPort.postMessage({ type: 'log', data: 'Loaded data into store', - })) + }) + + // Tell the parent that the worker is ready to handle queries + parentPort.postMessage({ + type: 'ready', + data: true, + }) } // Handle query const handleQuery = async (data) => { const { query, queryId } = data const { response, contentType } = await performOxigraphQuery(store, query) - parentPort.postMessage(JSON.stringify({ + parentPort.postMessage({ type: 'query', data: { queryId, response, contentType, }, - })) + }) } parentPort.on('message', async (event) => { - if (!event) { - return - } - - const parsedData = JSON.parse(`${event}`) - if (!parsedData || !parsedData.type) { + if (!event || !event.type) { return } - switch (parsedData.type) { + switch (event.type) { case 'config': - await handleConfig(parsedData.data) + await handleConfig(event.data) break case 'query': - await handleQuery(parsedData.data) + await handleQuery(event.data) break } })