Skip to content

Commit

Permalink
feat (TS): RequestTasks use getBulkRequestStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Aug 14, 2024
1 parent 7e33023 commit d9c89d9
Showing 1 changed file with 54 additions and 22 deletions.
76 changes: 54 additions & 22 deletions src/DIRAC/TransformationSystem/Client/RequestTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,26 +317,49 @@ def getSubmittedTaskStatus(self, taskDicts):
Check if tasks changed status, and return a list of tasks per new status
"""
updateDict = {}
badRequestID = 0
externalIDs = [
int(taskDict["ExternalID"])
for taskDict in taskDicts
if taskDict["ExternalID"] and int(taskDict["ExternalID"])
]

# Count how many tasks don't have an valid external ID
badRequestID = len(taskDicts) - len(externalIDs)

res = self.requestClient.getBulkRequestStatus(externalIDs)
if not res["OK"]:
# We need a transformationID for the log, and although we expect a single one,
# do things ~ properly
tids = list({taskDict["TransformationID"] for taskDict in taskDicts})
try:
tid = tids[0]
except IndexError:
tid = 0

self._logWarn(
"getSubmittedTaskStatus: Failed to get bulk requestIDs",
res["Message"],
transID=tid,
)
return S_OK({})
new_statuses = res["Value"]

for taskDict in taskDicts:
oldStatus = taskDict["ExternalStatus"]
# ExternalID is normally a string
if taskDict["ExternalID"] and int(taskDict["ExternalID"]):
newStatus = self.requestClient.getRequestStatus(taskDict["ExternalID"])
if not newStatus["OK"]:
log = self._logVerbose if "not exist" in newStatus["Message"] else self._logWarn
log(
"getSubmittedTaskStatus: Failed to get requestID for request",
newStatus["Message"],
transID=taskDict["TransformationID"],
)
else:
newStatus = newStatus["Value"]
# We don't care updating the tasks to Assigned while the request is being processed
if newStatus != oldStatus and newStatus != "Assigned":
updateDict.setdefault(newStatus, []).append(taskDict["TaskID"])

newStatus = new_statuses.get(taskDict["ExternalID"])
if not newStatus:
self._logVerbose(
"getSubmittedTaskStatus: Failed to get requestID for request",
f"No such RequestID {taskDict['ExternalID']}",
transID=taskDict["TransformationID"],
)
else:
badRequestID += 1
# We don't care updating the tasks to Assigned while the request is being processed
if newStatus != oldStatus and newStatus != "Assigned":
updateDict.setdefault(newStatus, []).append(taskDict["TaskID"])

if badRequestID:
self._logWarn("%d requests have identifier 0" % badRequestID)
return S_OK(updateDict)
Expand Down Expand Up @@ -368,21 +391,30 @@ def getSubmittedFileStatus(self, fileDicts):
if taskDict["ExternalStatus"] != "Created" and externalID and int(externalID):
requestFiles[externalID] = taskFiles[taskID]

res = self.requestClient.getBulkRequestStatus(list(requestFiles))
if not res["OK"]:
self._logWarn(
"Failed to get request status",
res["Message"],
transID=transID,
method="getSubmittedFileStatus",
)
return S_OK({})
reqStatuses = res["Value"]

updateDict = {}
for requestID, lfnList in requestFiles.items():
# We only take request in final state to avoid race conditions
# https://github.com/DIRACGrid/DIRAC/issues/7116#issuecomment-2188740414
reqStatus = self.requestClient.getRequestStatus(requestID)
if not reqStatus["OK"]:
log = self._logVerbose if "not exist" in reqStatus["Message"] else self._logWarn
log(
reqStatus = reqStatuses.get(requestID)
if not reqStatus:
self._logVerbose(
"Failed to get request status",
reqStatus["Message"],
f"Request {requestID} does not exist",
transID=transID,
method="getSubmittedFileStatus",
)
continue
reqStatus = reqStatus["Value"]
if reqStatus not in Request.FINAL_STATES:
continue

Expand Down

0 comments on commit d9c89d9

Please sign in to comment.