Skip to content

Commit

Permalink
Add ability to filter requested runner jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Chocobozzz committed Oct 31, 2024
1 parent e0f39d7 commit a91bd80
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 19 deletions.
9 changes: 8 additions & 1 deletion apps/peertube-runner/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,15 @@ export class RunnerServer {
private async requestJob (server: PeerTubeServer) {
logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)

const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
const { availableJobs } = await server.runnerJobs.request({
runnerToken: server.runnerToken,

jobTypes: this.enabledJobs.size !== getSupportedJobsList().length
? Array.from(this.enabledJobs)
: undefined
})

// FIXME: remove in PeerTube v8: jobTypes has been introduced in PeerTube v7, so do the filter here too
const filtered = availableJobs.filter(j => isJobSupported(j, this.enabledJobs))

if (filtered.length === 0) {
Expand Down
3 changes: 3 additions & 0 deletions packages/models/src/runners/request-runner-job-body.model.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { RunnerJobType } from './runner-job-type.type.js'

export interface RequestRunnerJobBody {
runnerToken: string
jobTypes?: RunnerJobType[]
}
22 changes: 11 additions & 11 deletions packages/server-commands/src/runners/runner-jobs-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,30 @@ export class RunnerJobsCommand extends AbstractCommand {
...options,

path,
fields: pick(options, [ 'runnerToken' ]),
fields: pick(options, [ 'runnerToken', 'jobTypes' ]),
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
}))
}

async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) {
const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ])
const { availableJobs } = await this.request({
...options,

const { availableJobs } = await this.request(options)
jobTypes: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]
})

return {
availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
} as RequestRunnerJobResult<RunnerJobVODPayload>
return { availableJobs } as RequestRunnerJobResult<RunnerJobVODPayload>
}

async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) {
const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ])
const { availableJobs } = await this.request({
...options,

const { availableJobs } = await this.request(options)
jobTypes: [ 'live-rtmp-hls-transcoding' ]
})

return {
availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
} as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
return { availableJobs } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
}

// ---------------------------------------------------------------------------
Expand Down
8 changes: 8 additions & 0 deletions packages/tests/src/api/check-params/runners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,14 @@ describe('Test managing runners', function () {
it('Should fail with an unknown runner token', async function () {
await server.runnerJobs.request({ runnerToken: badUUID, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
})

it('Should fail with a bad jobTypes token', async function () {
await server.runnerJobs.request({ runnerToken, jobTypes: 'toto' as any, expectedStatus: HttpStatusCode.BAD_REQUEST_400 })
})

it('Should succeed with the correct params', async function () {
await server.runnerJobs.request({ runnerToken, jobTypes: [] })
})
})

describe('Accept', function () {
Expand Down
12 changes: 12 additions & 0 deletions packages/tests/src/api/runners/runner-common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,18 @@ describe('Test runner common actions', function () {
jobUUID = webVideoJobs[0].uuid
})

it('Should filter requested jobs', async function () {
{
const { availableJobs } = await server.runnerJobs.request({ runnerToken, jobTypes: [ 'vod-web-video-transcoding' ] })
expect(availableJobs).to.have.lengthOf(2)
}

{
const { availableJobs } = await server.runnerJobs.request({ runnerToken, jobTypes: [ 'vod-hls-transcoding' ] })
expect(availableJobs).to.have.lengthOf(0)
}
})

it('Should have sorted available jobs by priority', async function () {
const { availableJobs } = await server.runnerJobs.request({ runnerToken })

Expand Down
4 changes: 3 additions & 1 deletion server/core/controllers/api/runners/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
HttpStatusCode,
ListRunnerJobsQuery,
LiveRTMPHLSTranscodingUpdatePayload,
RequestRunnerJobBody,
RequestRunnerJobResult,
RunnerJobState,
RunnerJobSuccessBody,
Expand Down Expand Up @@ -158,7 +159,8 @@ export {

async function requestRunnerJob (req: express.Request, res: express.Response) {
const runner = res.locals.runner
const availableJobs = await RunnerJobModel.listAvailableJobs()
const body = req.body as RequestRunnerJobBody
const availableJobs = await RunnerJobModel.listAvailableJobs(body.jobTypes)

logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })

Expand Down
1 change: 1 addition & 0 deletions server/core/middlewares/validators/runners/runners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const deleteRunnerValidator = [

const getRunnerFromTokenValidator = [
body('runnerToken').custom(isRunnerTokenValid),
body('jobTypes').optional().isArray(),

async (req: express.Request, res: express.Response, next: express.NextFunction) => {
if (areValidationErrors(req, res, { tags })) return
Expand Down
20 changes: 14 additions & 6 deletions server/core/models/runner/runner-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,24 @@ export class RunnerJobModel extends SequelizeModel<RunnerJobModel> {
return RunnerJobModel.findOne<MRunnerJobRunner>(query)
}

static listAvailableJobs () {
const query = {
static listAvailableJobs (jobTypes?: string[]) {
const jobTypesWhere = jobTypes
? {
type: {
[Op.in]: jobTypes
}
}
: {}

return RunnerJobModel.findAll<MRunnerJob>({
limit: 10,
order: getSort('priority'),
where: {
state: RunnerJobState.PENDING
}
}
state: RunnerJobState.PENDING,

return RunnerJobModel.findAll<MRunnerJob>(query)
...jobTypesWhere
}
})
}

static listStalledJobs (options: {
Expand Down
5 changes: 5 additions & 0 deletions support/doc/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6422,6 +6422,11 @@ paths:
properties:
runnerToken:
type: string
jobTypes:
type: array
description: Filter jobs depending on their types
items:
type: string
required:
- runnerToken
responses:
Expand Down

0 comments on commit a91bd80

Please sign in to comment.