Skip to content

Commit

Permalink
fix: base event storage initialisation
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Oct 10, 2024
1 parent c8f93e5 commit 2b7f91d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 27 deletions.
1 change: 1 addition & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ jobs:
with:
output: swagger-ui
spec-file: static/api.json
github_token: ${{ secrets.GITHUB_TOKEN }}

- name: Deploy to GitHub Pages
uses: peaceiris/actions-gh-pages@v3
Expand Down
24 changes: 8 additions & 16 deletions src/http/plugins/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,23 @@ export const db = fastifyPlugin(

fastify.addHook('onTimeout', async (request) => {
if (request.db) {
try {
await request.db.dispose()
} catch (e) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
}
})
}
})

fastify.addHook('onRequestAbort', async (request) => {
if (request.db) {
try {
await request.db.dispose()
} catch (e) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
}
})
}
})
},
Expand Down Expand Up @@ -133,27 +129,23 @@ export const dbSuperUser = fastifyPlugin<DbSuperUserPluginOptions>(

fastify.addHook('onTimeout', async (request) => {
if (request.db) {
try {
await request.db.dispose()
} catch (e) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
}
})
}
})

fastify.addHook('onRequestAbort', async (request) => {
if (request.db) {
try {
await request.db.dispose()
} catch (e) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
}
})
}
})
},
Expand Down
6 changes: 5 additions & 1 deletion src/internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ export abstract class Queue {
})
return Queue.stop()
.then(async () => {
await Queue.callClose()
logSchema.info(logger, '[Queue] Exited', {
type: 'queue',
})
Expand All @@ -109,6 +108,11 @@ export abstract class Queue {
type: 'queue',
})
})
.finally(async () => {
await Queue.callClose().catch(() => {
// no-op
})
})
},
{ once: true }
)
Expand Down
35 changes: 25 additions & 10 deletions src/storage/events/base-event.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
import { Event as QueueBaseEvent, BasePayload, StaticThis, Event } from '@internal/queue'
import { getPostgresConnection, getServiceKeyUser } from '@internal/database'
import { StorageKnexDB } from '../database'
import { createStorageBackend } from '../backend'
import { createStorageBackend, StorageBackendAdapter } from '../backend'
import { Storage } from '../storage'
import { getConfig } from '../../config'
import { logger } from '@internal/monitoring'
import { createAgent } from '@internal/http'

const { storageS3MaxSockets, storageBackendType, region } = getConfig()

const httpAgent = createAgent('s3_worker', {
maxSockets: storageS3MaxSockets,
})
const storageBackend = createStorageBackend(storageBackendType, {
httpAgent: httpAgent,
})
let storageBackend: StorageBackendAdapter | undefined = undefined

export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends QueueBaseEvent<T> {
static onStart() {
httpAgent.monitor()
this.getOrCreateStorageBackend()
}

static onClose() {
storageBackend.close()
storageBackend?.close()
}

/**
Expand Down Expand Up @@ -81,6 +76,26 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends
host: payload.tenant.host,
})

return new Storage(storageBackend, db)
return new Storage(this.getOrCreateStorageBackend(), db)
}

protected static getOrCreateStorageBackend(monitor = false) {
if (storageBackend) {
return storageBackend
}

const httpAgent = createAgent('s3_worker', {
maxSockets: storageS3MaxSockets,
})

storageBackend = createStorageBackend(storageBackendType, {
httpAgent: httpAgent,
})

if (monitor) {
httpAgent.monitor()
}

return storageBackend
}
}

0 comments on commit 2b7f91d

Please sign in to comment.