Skip to content

Commit

Permalink
Add fail rejected command fix 6767 (#6770)
Browse files Browse the repository at this point in the history
* HotFix for Publisher problems. Introduce list of users to skip and use 10-file blocks for 2018UL datasets (#6745)

* Py2only (#6765)

* Keep logging level at DEBUG in TapeRecallStatus. Fix #6762

* print proxy validity in MyProxyLogon. Fix #6762

* ignore commands in config.TaskWorker.rejectedCommands. Fix #6767

* add rejectedCommands to TW config
  • Loading branch information
belforte authored Sep 13, 2021
1 parent 6abb8fb commit 4695bde
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
49 changes: 45 additions & 4 deletions src/python/TaskWorker/MasterWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import urllib
import signal
import logging
from base64 import b64encode

from httplib import HTTPException
from MultiProcessingLog import MultiProcessingLog

Expand Down Expand Up @@ -339,18 +341,55 @@ def failBannedTask(self, task):
True : if the task was declared bad and was failed
False: for normal (good) tasks
"""
taskname = task['tm_taskname']
command = task['tm_task_command']
bannedUsernames = getattr(self.config.TaskWorker, 'bannedUsernames', [])
if task['tm_username'] in bannedUsernames:
self.logger.debug("Forcefully failing task %s", task['tm_taskname'])
self.logger.debug("Forcefully failing task %s", taskname)
if task['tm_task_command']:
dummyWorktype, failstatus = STATE_ACTIONS_MAP[task['tm_task_command']]
dummyWorktype, failstatus = STATE_ACTIONS_MAP[command]
else:
failstatus = 'FAILED'
self.updateWork(task['tm_taskname'], task['tm_task_command'], failstatus)
# TODO look into logging a message for the user
self.updateWork(taskname, command, failstatus)
warning = 'username %s banned in CRAB TaskWorker configuration' % task['tm_username']
configreq = {'subresource': 'addwarning', 'workflow': taskname, 'warning': warning}
try:
self.crabserver.post(api='task', data=urllib.urlencode(configreq))
except Exception as e:
self.logger.error("Error uploading warning: %s", str(e))
self.logger.warning("Cannot add a warning to REST interface. Warning message: %s", warning)
return True
return False

def skipRejectedCommand(self, task):
""" This method is used at the TW startup and is used to ignore requests for
a command field in the reject list. It allows to configure the TW so to process
some commands, not not all. E.g. to prevent users from doing more submit and/or resubmit
True : if the task was declared bad and was failed
False: for normal (good) tasks
"""
taskname = task['tm_taskname']
command = task['tm_task_command']
rejectedCommands = getattr(self.config.TaskWorker, 'rejectedCommands', [])
if command in rejectedCommands:
self.logger.debug("Rejecting command %s", command)
if command == 'SUBMIT': # refuse i.e. mark as submission failed
self.updateWork(taskname, command, 'SUBMITFAILED')
if command == 'RESUMIT': # ignore, i.e. leave in status 'SUBMITTED'
self.updateWork(taskname, command, 'SUBMITTED')
if command == 'KILL': # ignore, i.e. leave in status 'SUBMITTED'
self.updateWork(taskname, command, 'SUBMITTED')
warning = 'command %s disabled in CRAB TaskWorker configuration' % command
configreq = {'subresource': 'addwarning', 'workflow': taskname, 'warning': b64encode(warning)}
try:
self.crabserver.post(api='task', data=urllib.urlencode(configreq))
except Exception as e:
self.logger.error("Error uploading warning: %s", str(e))
self.logger.warning("Cannot add a warning to REST interface. Warning message: %s", warning)
return True
return False


def algorithm(self):
"""I'm the intelligent guy taking care of getting the work
and distributing it to the slave processes."""
Expand All @@ -376,6 +415,8 @@ def algorithm(self):
for task in pendingwork:
if self.failBannedTask(task):
continue
if self.skipRejectedCommand(task):
continue
if self.updateWork(task['tm_taskname'], task['tm_task_command'], 'QUEUED'):
worktype, failstatus = STATE_ACTIONS_MAP[task['tm_task_command']]
toInject.append((worktype, task, failstatus, None))
Expand Down
1 change: 1 addition & 0 deletions src/script/Deployment/TaskWorker/TaskWorkerConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@

config.TaskWorker.highPrioEgroups = ['cms-crab-HighPrioUsers']
config.TaskWorker.bannedUsernames = ['mickeymouse','donaldduck']
config.TaskWorker.rejectedCommands = ['NONE', 'NOPE'] # commands are upper case e.g. 'SUBMIT'

# Setting the list of users for the highprio accounting group
# not usually needed since the list if automatically populated from e-group
Expand Down

0 comments on commit 4695bde

Please sign in to comment.