diff --git a/src/python/TaskWorker/MasterWorker.py b/src/python/TaskWorker/MasterWorker.py index 6602a8bbdd..a03766bedb 100644 --- a/src/python/TaskWorker/MasterWorker.py +++ b/src/python/TaskWorker/MasterWorker.py @@ -10,6 +10,8 @@ import urllib import signal import logging +from base64 import b64encode + from httplib import HTTPException from MultiProcessingLog import MultiProcessingLog @@ -339,18 +341,55 @@ def failBannedTask(self, task): True : if the task was declared bad and was failed False: for normal (good) tasks """ + taskname = task['tm_taskname'] + command = task['tm_task_command'] bannedUsernames = getattr(self.config.TaskWorker, 'bannedUsernames', []) if task['tm_username'] in bannedUsernames: - self.logger.debug("Forcefully failing task %s", task['tm_taskname']) + self.logger.debug("Forcefully failing task %s", taskname) if task['tm_task_command']: - dummyWorktype, failstatus = STATE_ACTIONS_MAP[task['tm_task_command']] + dummyWorktype, failstatus = STATE_ACTIONS_MAP[command] else: failstatus = 'FAILED' - self.updateWork(task['tm_taskname'], task['tm_task_command'], failstatus) - # TODO look into logging a message for the user + self.updateWork(taskname, command, failstatus) + warning = 'username %s banned in CRAB TaskWorker configuration' % task['tm_username'] + configreq = {'subresource': 'addwarning', 'workflow': taskname, 'warning': warning} + try: + self.crabserver.post(api='task', data=urllib.urlencode(configreq)) + except Exception as e: + self.logger.error("Error uploading warning: %s", str(e)) + self.logger.warning("Cannot add a warning to REST interface. Warning message: %s", warning) return True return False + def skipRejectedCommand(self, task): + """ This method is used at the TW startup and is used to ignore requests for + a command field in the reject list. It allows to configure the TW so to process + some commands, not not all. E.g. to prevent users from doing more submit and/or resubmit + True : if the task was declared bad and was failed + False: for normal (good) tasks + """ + taskname = task['tm_taskname'] + command = task['tm_task_command'] + rejectedCommands = getattr(self.config.TaskWorker, 'rejectedCommands', []) + if command in rejectedCommands: + self.logger.debug("Rejecting command %s", command) + if command == 'SUBMIT': # refuse i.e. mark as submission failed + self.updateWork(taskname, command, 'SUBMITFAILED') + if command == 'RESUMIT': # ignore, i.e. leave in status 'SUBMITTED' + self.updateWork(taskname, command, 'SUBMITTED') + if command == 'KILL': # ignore, i.e. leave in status 'SUBMITTED' + self.updateWork(taskname, command, 'SUBMITTED') + warning = 'command %s disabled in CRAB TaskWorker configuration' % command + configreq = {'subresource': 'addwarning', 'workflow': taskname, 'warning': b64encode(warning)} + try: + self.crabserver.post(api='task', data=urllib.urlencode(configreq)) + except Exception as e: + self.logger.error("Error uploading warning: %s", str(e)) + self.logger.warning("Cannot add a warning to REST interface. Warning message: %s", warning) + return True + return False + + def algorithm(self): """I'm the intelligent guy taking care of getting the work and distributing it to the slave processes.""" @@ -376,6 +415,8 @@ def algorithm(self): for task in pendingwork: if self.failBannedTask(task): continue + if self.skipRejectedCommand(task): + continue if self.updateWork(task['tm_taskname'], task['tm_task_command'], 'QUEUED'): worktype, failstatus = STATE_ACTIONS_MAP[task['tm_task_command']] toInject.append((worktype, task, failstatus, None)) diff --git a/src/script/Deployment/TaskWorker/TaskWorkerConfig.py b/src/script/Deployment/TaskWorker/TaskWorkerConfig.py index 06a80e34b0..baea225a31 100644 --- a/src/script/Deployment/TaskWorker/TaskWorkerConfig.py +++ b/src/script/Deployment/TaskWorker/TaskWorkerConfig.py @@ -106,6 +106,7 @@ config.TaskWorker.highPrioEgroups = ['cms-crab-HighPrioUsers'] config.TaskWorker.bannedUsernames = ['mickeymouse','donaldduck'] +config.TaskWorker.rejectedCommands = ['NONE', 'NOPE'] # commands are upper case e.g. 'SUBMIT' # Setting the list of users for the highprio accounting group # not usually needed since the list if automatically populated from e-group