From d9c89d966dd460b68cef89b2d201777aaabc1339 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Mon, 12 Aug 2024 15:40:16 +0200 Subject: [PATCH] feat (TS): RequestTasks use getBulkRequestStatus --- .../Client/RequestTasks.py | 76 +++++++++++++------ 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/src/DIRAC/TransformationSystem/Client/RequestTasks.py b/src/DIRAC/TransformationSystem/Client/RequestTasks.py index 762b85d9d06..53f80a3e853 100644 --- a/src/DIRAC/TransformationSystem/Client/RequestTasks.py +++ b/src/DIRAC/TransformationSystem/Client/RequestTasks.py @@ -317,26 +317,49 @@ def getSubmittedTaskStatus(self, taskDicts): Check if tasks changed status, and return a list of tasks per new status """ updateDict = {} - badRequestID = 0 + externalIDs = [ + int(taskDict["ExternalID"]) + for taskDict in taskDicts + if taskDict["ExternalID"] and int(taskDict["ExternalID"]) + ] + + # Count how many tasks don't have an valid external ID + badRequestID = len(taskDicts) - len(externalIDs) + + res = self.requestClient.getBulkRequestStatus(externalIDs) + if not res["OK"]: + # We need a transformationID for the log, and although we expect a single one, + # do things ~ properly + tids = list({taskDict["TransformationID"] for taskDict in taskDicts}) + try: + tid = tids[0] + except IndexError: + tid = 0 + + self._logWarn( + "getSubmittedTaskStatus: Failed to get bulk requestIDs", + res["Message"], + transID=tid, + ) + return S_OK({}) + new_statuses = res["Value"] + for taskDict in taskDicts: oldStatus = taskDict["ExternalStatus"] # ExternalID is normally a string - if taskDict["ExternalID"] and int(taskDict["ExternalID"]): - newStatus = self.requestClient.getRequestStatus(taskDict["ExternalID"]) - if not newStatus["OK"]: - log = self._logVerbose if "not exist" in newStatus["Message"] else self._logWarn - log( - "getSubmittedTaskStatus: Failed to get requestID for request", - newStatus["Message"], - transID=taskDict["TransformationID"], - ) - else: - newStatus = newStatus["Value"] - # We don't care updating the tasks to Assigned while the request is being processed - if newStatus != oldStatus and newStatus != "Assigned": - updateDict.setdefault(newStatus, []).append(taskDict["TaskID"]) + + newStatus = new_statuses.get(taskDict["ExternalID"]) + if not newStatus: + self._logVerbose( + "getSubmittedTaskStatus: Failed to get requestID for request", + f"No such RequestID {taskDict['ExternalID']}", + transID=taskDict["TransformationID"], + ) else: - badRequestID += 1 + # We don't care updating the tasks to Assigned while the request is being processed + if newStatus != oldStatus and newStatus != "Assigned": + updateDict.setdefault(newStatus, []).append(taskDict["TaskID"]) + if badRequestID: self._logWarn("%d requests have identifier 0" % badRequestID) return S_OK(updateDict) @@ -368,21 +391,30 @@ def getSubmittedFileStatus(self, fileDicts): if taskDict["ExternalStatus"] != "Created" and externalID and int(externalID): requestFiles[externalID] = taskFiles[taskID] + res = self.requestClient.getBulkRequestStatus(list(requestFiles)) + if not res["OK"]: + self._logWarn( + "Failed to get request status", + res["Message"], + transID=transID, + method="getSubmittedFileStatus", + ) + return S_OK({}) + reqStatuses = res["Value"] + updateDict = {} for requestID, lfnList in requestFiles.items(): # We only take request in final state to avoid race conditions # https://github.com/DIRACGrid/DIRAC/issues/7116#issuecomment-2188740414 - reqStatus = self.requestClient.getRequestStatus(requestID) - if not reqStatus["OK"]: - log = self._logVerbose if "not exist" in reqStatus["Message"] else self._logWarn - log( + reqStatus = reqStatuses.get(requestID) + if not reqStatus: + self._logVerbose( "Failed to get request status", - reqStatus["Message"], + f"Request {requestID} does not exist", transID=transID, method="getSubmittedFileStatus", ) continue - reqStatus = reqStatus["Value"] if reqStatus not in Request.FINAL_STATES: continue