Skip to content

Commit

Permalink
handler-fetch: improve the way to work with the worker
Browse files Browse the repository at this point in the history
  • Loading branch information
ludovicm67 committed Feb 8, 2024
1 parent 563c445 commit 9b404f2
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 77 deletions.
82 changes: 30 additions & 52 deletions packages/entity-renderer/test/entity-renderer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,73 +3,51 @@
/* eslint-disable no-useless-catch */

import { strictEqual } from 'assert'
import { describe, it } from 'mocha'
import { describe, it, beforeEach, afterEach } from 'mocha'

import { createTrifidInstance } from '../examples/instance.js'
import { getListenerURL } from './support/utils.js'

const trifidConfigUrl = './examples/config/trifid.yaml'

describe('@zazuko/trifid-entity-renderer', () => {
describe('basic tests', () => {
it('should create a middleware with factory and default options', async () => {
const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn')
const trifidListener = await trifidInstance.start()
trifidListener.close()
})
let trifidListener

it('should be able to load a rendered entity', async () => {
const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn')
const trifidListener = await trifidInstance.start()
beforeEach(async () => {
const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn')
trifidListener = await trifidInstance.start()
})

afterEach(() => {
trifidListener.close()
})

try {
const entityUrl = `${getListenerURL(trifidListener)}/person/amy-farrah-fowler`
const res = await fetch(entityUrl)
strictEqual(res.status, 200)
const resText = await res.text()
strictEqual(resText.toLocaleLowerCase().includes('amy'), true)
} catch (e) {
throw e
} finally {
trifidListener.close()
}
describe('basic tests', () => {
it('should be able to load a rendered entity', async () => {
const entityUrl = `${getListenerURL(trifidListener)}/person/amy-farrah-fowler`
const res = await fetch(entityUrl)
strictEqual(res.status, 200)
const resText = await res.text()
strictEqual(resText.toLocaleLowerCase().includes('amy'), true)
})

it('should be able to load a rendered entity using HTML', async () => {
const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn')
const trifidListener = await trifidInstance.start()

try {
const entityUrl = `${getListenerURL(trifidListener)}/person/amy-farrah-fowler`
const res = await fetch(entityUrl, {
headers: {
accept: 'text/html',
},
})
strictEqual(res.status, 200)
const resText = await res.text()
strictEqual(resText.toLocaleLowerCase().includes('<html'), true)
strictEqual(resText.toLocaleLowerCase().includes('amy'), true)
} catch (e) {
throw e
} finally {
trifidListener.close()
}
const entityUrl = `${getListenerURL(trifidListener)}/person/amy-farrah-fowler`
const res = await fetch(entityUrl, {
headers: {
accept: 'text/html',
},
})
strictEqual(res.status, 200)
const resText = await res.text()
strictEqual(resText.toLocaleLowerCase().includes('<html'), true)
strictEqual(resText.toLocaleLowerCase().includes('amy'), true)
})

it('should not render non-existant entity', async () => {
const trifidInstance = await createTrifidInstance(trifidConfigUrl, 'warn')
const trifidListener = await trifidInstance.start()

try {
const entityUrl = `${getListenerURL(trifidListener)}/person/someone-that-does-not-exist`
const res = await fetch(entityUrl)
strictEqual(res.status, 404)
} catch (e) {
throw e
} finally {
trifidListener.close()
}
const entityUrl = `${getListenerURL(trifidListener)}/person/someone-that-does-not-exist`
const res = await fetch(entityUrl)
strictEqual(res.status, 404)
})
})
})
39 changes: 31 additions & 8 deletions packages/handler-fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import { Worker } from 'node:worker_threads'
import { v4 as uuidv4 } from 'uuid'
import { waitForVariableToBeTrue } from './lib/utils.js'

/** @type {import('trifid-core/dist/types/index.d.ts').TrifidMiddleware} */
export const factory = async (trifid) => {
Expand All @@ -11,11 +12,16 @@ export const factory = async (trifid) => {
const workerUrl = new URL('./lib/worker.js', import.meta.url)
const worker = new Worker(workerUrl)

let ready = false

worker.on('message', async (message) => {
const { type, data } = JSON.parse(`${message}`)
const { type, data } = message
if (type === 'log') {
logger.debug(data)
}
if (type === 'ready') {
ready = true
}
})

worker.on('error', (error) => {
Expand All @@ -26,34 +32,51 @@ export const factory = async (trifid) => {
logger.info(`Worker exited with code ${code}`)
})

worker.postMessage(JSON.stringify({
worker.postMessage({
type: 'config',
data: {
contentType, url, baseIri, graphName, unionDefaultGraph,
},
}))
})

/**
* Send the query to the worker and wait for the response.
*
* @param {string} query The SPARQL query
* @returns {Promise<{ response: string, contentType: string }>} The response and its content type
*/
const handleQuery = async (query) => {
return new Promise((resolve, _reject) => {
const queryId = uuidv4()

worker.postMessage(JSON.stringify({
worker.postMessage({
type: 'query',
data: {
queryId,
query,
},
}))
})

worker.on('message', (message) => {
const { type, data } = JSON.parse(`${message}`)
const messageHandler = (message) => {
const { type, data } = message
if (type === 'query' && data.queryId === queryId) {
worker.off('message', messageHandler)
resolve(data)
}
})
}

worker.on('message', messageHandler)
})
}

// Wait for the worker to become ready, so we can be sure it can handle queries
await waitForVariableToBeTrue(
() => ready,
30000,
20,
'Worker did not become ready within 30 seconds',
)

return async (req, res, _next) => {
let query
if (req.method === 'GET') {
Expand Down
28 changes: 28 additions & 0 deletions packages/handler-fetch/lib/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// @ts-check

/**
* Wait for a variable to be truthy, with a timeout.
*
* @param {Function} getValueFunction A function that needs to return a truthy value to resolve the promise
* @param {number} [timeoutMs] The maximum time to wait for the variable to be truthy, in milliseconds
* @param {number} [checkIntervalMs] The interval at which to check the variable's value, in milliseconds
* @param {string} [errorMessage] The error message to use if the promise is rejected
* @returns {Promise<void>}
*/
export const waitForVariableToBeTrue = async (getValueFunction, timeoutMs = 30000, checkIntervalMs = 20, errorMessage = 'Reached Timeout') => {
return new Promise((resolve, reject) => {
// Check the variable's value periodically
const interval = setInterval(() => {
if (getValueFunction()) {
clearInterval(interval)
resolve()
}
}, checkIntervalMs)

// Set a timeout to reject the promise if the time exceeds the specified duration
setTimeout(() => {
clearInterval(interval)
reject(new Error(errorMessage))
}, timeoutMs)
})
}
35 changes: 18 additions & 17 deletions packages/handler-fetch/lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ const getContent = async (url) => {

// Create a store
const store = new oxigraph.Store()
parentPort.postMessage(JSON.stringify({
parentPort.postMessage({
type: 'log',
data: 'Created store',
}))
})

// Handle configuration
const handleConfig = async (config) => {
Expand All @@ -45,49 +45,50 @@ const handleConfig = async (config) => {

// Read data from file or URL
const data = await getContent(url)
parentPort.postMessage(JSON.stringify({
parentPort.postMessage({
type: 'log',
data: `Loaded ${data.length} bytes of data from ${url}`,
}))
})

// Load the data into the store
store.load(data, contentType, baseIri, graphNameIri)
parentPort.postMessage(JSON.stringify({
parentPort.postMessage({
type: 'log',
data: 'Loaded data into store',
}))
})

// Tell the parent that the worker is ready to handle queries
parentPort.postMessage({
type: 'ready',
data: true,
})
}

// Handle query
const handleQuery = async (data) => {
const { query, queryId } = data
const { response, contentType } = await performOxigraphQuery(store, query)
parentPort.postMessage(JSON.stringify({
parentPort.postMessage({
type: 'query',
data: {
queryId,
response,
contentType,
},
}))
})
}

parentPort.on('message', async (event) => {
if (!event) {
return
}

const parsedData = JSON.parse(`${event}`)
if (!parsedData || !parsedData.type) {
if (!event || !event.type) {
return
}

switch (parsedData.type) {
switch (event.type) {
case 'config':
await handleConfig(parsedData.data)
await handleConfig(event.data)
break
case 'query':
await handleQuery(parsedData.data)
await handleQuery(event.data)
break
}
})

0 comments on commit 9b404f2

Please sign in to comment.