-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.coffee
84 lines (72 loc) · 3.01 KB
/
monitor.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
numOfProcessorsRunning = 0
port = parseInt (if process.env.hasOwnProperty 'ROOT_URL' then process.env['ROOT_URL'].replace /[^0-9]/g, '' else process.env['PORT'])
affinity = Npm.require('os').cpus().length
#If this is a web node, reserve a core for serving web clients? Maybe drop this to 1 core?
if Meteor.settings.jobQueue?.serverType == "web"
affinity--
myHostName = process.env['HOSTNAME'] + ':' + port
global = this
Fiber = Npm.require 'fibers'
console.log 'Port: ' + port
console.log 'Affinity: ' + affinity
findOldestJob = () ->
if affinity > 0
possibleJob = JobQueue.findOne {hostname: ''}, {sort: ['submitTime', 'ascending']}
if possibleJob
console.log "Found a possible job."
initiateClaim possibleJob._id
else
console.log "Just finished a job, but it doesn't look like theres any others for me."
initiateClaim = (id) ->
document = JobQueue.findOne {_id: id}
if (numOfProcessorsRunning < affinity)
console.log "Job looks acceptable. Trying to claim job with ID: " + id
claim id
else
console.log "Job with ID: " + id + " was not acceptable."
claim = (id) ->
console.log 'Attempting to claim job...'
numChanged = JobQueue.update {_id: id, hostname: ''}, {$set: {hostname: myHostName}}
if numChanged > 0
job = JobQueue.findOne {_id: id}
numOfProcessorsRunning++
console.log 'Claimed job with ID: ' + job._id
fiber = Fiber ->
processorClass = Processors[job.processor]
if processorClass is undefined
console.log "Couldn't find the processor " + job.processor + ". Did you make a typo?"
processor = new processorClass(id, job.settings)
output = {}
try
output = processor.process()
context = processorClass.outputSchema.namedContext('processorOutput')
if not context.validate output
console.log 'Processor output failed schema validation for job ' + id
console.log context.invalidKeys()
JobQueue.update {_id: id}, {$set: {status: 'failed validation'}}
else
JobQueue.update {_id: id} , {$set: {output: output}}
catch error
console.log 'ERROR: ' + error
JobQueue.update {_id: id}, {$set: {status: 'error'}}
finally
numOfProcessorsRunning--
console.log 'Job Completed! Looking for new jobs.'
findOldestJob()
fiber.run() #Warning: non-blocking, gets yielded out of
else
console.log 'Could not accept job with ID: ' + id + '. Looking for new job.'
findOldestJob() #will keep looking for jobs as long as it can find one without a host
# Manually call this to start up the monitor
JobQueue.startupWorker = ->
console.log 'myHostName: ' + myHostName
console.log 'concurrent process limit: ' + affinity
statusPendingCursor = JobQueue.find {status: 'pending'}
statusPendingCursor.observeChanges
changed: (id, fields) ->
initiateClaim id
noHostCursor = JobQueue.find {hostname: ''}
noHostCursor.observeChanges
added: (id, fields) ->
console.log 'Added: ' + id
initiateClaim id