Skip to content

Commit

Permalink
sweep: DIRACGrid#7741 Change the updating logic of the TS FileStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Oct 7, 2024
1 parent b924d6d commit bf61420
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ Databases

Every now and then, it is interesting to look at the fragmentation status of your database. This is done by using the ``analyze table`` statement (https://dev.mysql.com/doc/refman/8.4/en/analyze-table.html) possibly followed by the ``optimize table`` statement (https://dev.mysql.com/doc/refman/8.4/en/optimize-table.html).

To know whether your tables are fragmented::

select table_schema,table_name, sys.format_bytes(data_length) table_size, sys.format_bytes(data_free) empty_space from information_schema.tables where data_length >= (1024*1024*1024) order by data_length desc;


The fragmented space should be very small with respect to the overall table size.




Duplications
============
Expand Down
6 changes: 4 additions & 2 deletions src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def __readConf(self):
# lifetime of the proxy we download to delegate to FTS
self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME)

self.jobMonitoringBatchSize = self.am_getOption("JobMonitoringBatchSize", JOB_MONITORING_BATCH_SIZE)

return S_OK()

def initialize(self):
Expand Down Expand Up @@ -316,7 +318,7 @@ def monitorJobsLoop(self):
log.info("Getting next batch of jobs to monitor", f"{loopId}/{nbOfLoops}")
# get jobs from DB
res = self.fts3db.getActiveJobs(
limit=JOB_MONITORING_BATCH_SIZE, lastMonitor=lastMonitor, jobAssignmentTag=self.assignmentTag
limit=self.jobMonitoringBatchSize, lastMonitor=lastMonitor, jobAssignmentTag=self.assignmentTag
)

if not res["OK"]:
Expand Down Expand Up @@ -351,7 +353,7 @@ def monitorJobsLoop(self):

# If we got less to monitor than what we asked,
# stop looping
if len(activeJobs) < JOB_MONITORING_BATCH_SIZE:
if len(activeJobs) < self.jobMonitoringBatchSize:
break
# Commit records after each loop
self.dataOpSender.concludeSending()
Expand Down
5 changes: 5 additions & 0 deletions src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ def monitor(self, context=None, ftsServer=None, ucert=None):
filesStatus[file_id]["ftsGUID"] = None
# TODO: update status to defunct if not recoverable here ?

# If the file is failed, check if it is recoverable
if file_state in FTS3File.FTS_FAILED_STATES:
if not fileDict.get("Recoverable", True):
filesStatus[file_id]["status"] = "Defunct"

# If the file is not in a final state, but the job is, we return an error
# FTS can have inconsistencies where the FTS Job is in a final state
# but not all the files.
Expand Down
6 changes: 6 additions & 0 deletions src/DIRAC/DataManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ Agents
OperationBulkSize = 20
# How many Job we will monitor in one loop
JobBulkSize = 20
# split jobBulkSize in several chunks
# Bigger numbers (like 100) are efficient when there's a single agent
# When there are multiple agents, it may slow down the overall because
# of lock and race conditions
# (This number should of course be smaller or equal than JobBulkSize)
JobMonitoringBatchSize = 20
# Max number of files to go in a single job
MaxFilesPerJob = 100
# Max number of attempt per file
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/DataManagementSystem/DB/FTS3DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ def getActiveJobs(self, limit=20, lastMonitor=None, jobAssignmentTag="Assigned")
session.query(FTS3Job)
.join(FTS3Operation)
.filter(FTS3Job.status.in_(FTS3Job.NON_FINAL_STATES))
.filter(FTS3Operation.status == "Active")
.filter(FTS3Job.assignment.is_(None))
.filter(FTS3Operation.assignment.is_(None))
)
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" This module allows to resolve output SEs for Job based
on SE and site/country association
"""

from random import shuffle

from DIRAC import gLogger, gConfig
Expand Down Expand Up @@ -70,7 +71,6 @@ def getDestinationSEList(outputSE, site, outputmode="Any"):
raise RuntimeError(localSEs["Message"])
localSEs = localSEs["Value"]
sLog.verbose("Local SE list is:", ", ".join(localSEs))

# There is an alias defined for this Site
associatedSEs = gConfig.getValue(f"/Resources/Sites/{prefix}/{site}/AssociatedSEs/{outputSE}", [])
if associatedSEs:
Expand Down
20 changes: 19 additions & 1 deletion src/DIRAC/RequestManagementSystem/Client/ReqClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
:synopsis: implementation of client for RequestDB using DISET framework
"""

import os
import time
import random
Expand Down Expand Up @@ -258,7 +259,7 @@ def getRequestStatus(self, requestID):
self.log.debug("getRequestStatus: attempting to get status for '%d' request." % requestID)
requestStatus = self._getRPC().getRequestStatus(requestID)
if not requestStatus["OK"]:
self.log.error(
self.log.verbose(
"getRequestStatus: unable to get status for request",
": '%d' %s" % (requestID, requestStatus["Message"]),
)
Expand Down Expand Up @@ -470,6 +471,23 @@ def resetFailedRequest(self, requestID, allR=False):
return self.putRequest(req)
return S_OK("Not reset")

@ignoreEncodeWarning
def getBulkRequestStatus(self, requestIDs: list[int]):
"""get the Status for the supplied request IDs.
:param self: self reference
:param list requestIDs: list of job IDs (integers)
:return: S_ERROR or S_OK( { reqID1:status, requID2:status2, ... })
"""
res = self._getRPC().getBulkRequestStatus(requestIDs)
if not res["OK"]:
return res

# Cast the requestIDs back to int
statuses = strToIntDict(res["Value"])

return S_OK(statuses)


# ============= Some useful functions to be shared ===========

Expand Down
15 changes: 15 additions & 0 deletions src/DIRAC/RequestManagementSystem/DB/RequestDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,21 @@ def getRequestStatus(self, requestID):
session.close()
return S_OK(status[0])

def getBulkRequestStatus(self, requestIDs):
"""get requests statuses for given request IDs"""
session = self.DBSession()
try:
statuses = (
session.query(Request.RequestID, Request._Status).filter(Request.RequestID.in_(requestIDs)).all()
) # pylint: disable=no-member
status_dict = {req_id: req_status for req_id, req_status in statuses}
except Exception as e:
# log as well?
return S_ERROR(f"Failed to getBulkRequestStatus {e!r}")
finally:
session.close()
return S_OK(status_dict)

def getRequestFileStatus(self, requestID, lfnList):
"""get status for files in request given its id
Expand Down
10 changes: 10 additions & 0 deletions src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,16 @@ def export_getRequestStatus(cls, requestID):
gLogger.error("getRequestStatus", status["Message"])
return status

types_getBulkRequestStatus = [list]

@classmethod
def export_getBulkRequestStatus(cls, requestIDs):
"""get requests statuses given their ids"""
res = cls.__requestDB.getBulkRequestStatus(requestIDs)
if not res["OK"]:
gLogger.error("getBulkRequestStatus", res["Message"])
return res

types_getRequestFileStatus = [int, [str, list]]

@classmethod
Expand Down
9 changes: 6 additions & 3 deletions src/DIRAC/TransformationSystem/Agent/TransformationAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
:caption: TransformationAgent options
"""
from importlib import import_module

import time
import os
import datetime
Expand Down Expand Up @@ -242,7 +243,7 @@ def processTransformation(self, transDict, clients):
if transID not in self.replicaCache:
self.__readCache(transID)
transFiles = transFiles["Value"]
unusedLfns = [f["LFN"] for f in transFiles]
unusedLfns = {f["LFN"] for f in transFiles}
unusedFiles = len(unusedLfns)

plugin = transDict.get("Plugin", "Standard")
Expand All @@ -251,7 +252,7 @@ def processTransformation(self, transDict, clients):
maxFiles = Operations().getValue(f"TransformationPlugins/{plugin}/MaxFilesToProcess", 0)
# Get plugin-specific limit in number of files (0 means no limit)
totLfns = len(unusedLfns)
lfnsToProcess = self.__applyReduction(unusedLfns, maxFiles=maxFiles)
lfnsToProcess = set(self.__applyReduction(unusedLfns, maxFiles=maxFiles))
if len(lfnsToProcess) != totLfns:
self._logInfo(
"Reduced number of files from %d to %d" % (totLfns, len(lfnsToProcess)),
Expand Down Expand Up @@ -534,8 +535,10 @@ def _getDataReplicasDM(self, transID, lfns, clients, forJobs=True, ignoreMissing
method=method,
transID=transID,
)
successful_set = set(replicas["Successful"])
failed_set = set(replicas["Failed"])
# If files are neither Successful nor Failed, they are set problematic in the FC
problematicLfns = [lfn for lfn in lfns if lfn not in replicas["Successful"] and lfn not in replicas["Failed"]]
problematicLfns = [lfn for lfn in lfns if lfn not in successful_set and lfn not in failed_set]
if problematicLfns:
self._logInfo(f"{len(problematicLfns)} files found problematic in the catalog, set ProbInFC")
res = clients["TransformationClient"].setFileStatusForTransformation(
Expand Down
102 changes: 79 additions & 23 deletions src/DIRAC/TransformationSystem/Client/RequestTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,26 +309,48 @@ def getSubmittedTaskStatus(self, taskDicts):
Check if tasks changed status, and return a list of tasks per new status
"""
updateDict = {}
badRequestID = 0
externalIDs = [
int(taskDict["ExternalID"])
for taskDict in taskDicts
if taskDict["ExternalID"] and int(taskDict["ExternalID"])
]
# Count how many tasks don't have an valid external ID
badRequestID = len(taskDicts) - len(externalIDs)

res = self.requestClient.getBulkRequestStatus(externalIDs)
if not res["OK"]:
# We need a transformationID for the log, and although we expect a single one,
# do things ~ properly
tids = list({taskDict["TransformationID"] for taskDict in taskDicts})
try:
tid = tids[0]
except IndexError:
tid = 0

self._logWarn(
"getSubmittedTaskStatus: Failed to get bulk requestIDs",
res["Message"],
transID=tid,
)
return S_OK({})
new_statuses = res["Value"]

for taskDict in taskDicts:
oldStatus = taskDict["ExternalStatus"]
# ExternalID is normally a string
if taskDict["ExternalID"] and int(taskDict["ExternalID"]):
newStatus = self.requestClient.getRequestStatus(taskDict["ExternalID"])
if not newStatus["OK"]:
log = self._logVerbose if "not exist" in newStatus["Message"] else self._logWarn
log(
"getSubmittedTaskStatus: Failed to get requestID for request",
newStatus["Message"],
transID=taskDict["TransformationID"],
)
else:
newStatus = newStatus["Value"]
# We don't care updating the tasks to Assigned while the request is being processed
if newStatus != oldStatus and newStatus != "Assigned":
updateDict.setdefault(newStatus, []).append(taskDict["TaskID"])

newStatus = new_statuses.get(int(taskDict["ExternalID"]))
if not newStatus:
self._logVerbose(
"getSubmittedTaskStatus: Failed to get requestID for request",
f"No such RequestID {taskDict['ExternalID']}",
transID=taskDict["TransformationID"],
)
else:
badRequestID += 1
# We do not update the tasks status if the Request is Assigned, as it is a very temporary status
if newStatus != oldStatus and newStatus != "Assigned":
updateDict.setdefault(newStatus, []).append(taskDict["TaskID"])

if badRequestID:
self._logWarn("%d requests have identifier 0" % badRequestID)
return S_OK(updateDict)
Expand All @@ -355,13 +377,38 @@ def getSubmittedFileStatus(self, fileDicts):
requestFiles = {}
for taskDict in res["Value"]:
taskID = taskDict["TaskID"]
externalID = taskDict["ExternalID"]
externalID = int(taskDict["ExternalID"])
# Only consider tasks that are submitted, ExternalID is a string
if taskDict["ExternalStatus"] != "Created" and externalID and int(externalID):
requestFiles[externalID] = taskFiles[taskID]

res = self.requestClient.getBulkRequestStatus(list(requestFiles))
if not res["OK"]:
self._logWarn(
"Failed to get request status",
res["Message"],
transID=transID,
method="getSubmittedFileStatus",
)
return S_OK({})
reqStatuses = res["Value"]

updateDict = {}
for requestID, lfnList in requestFiles.items():
# We only take request in final state to avoid race conditions
# https://github.com/DIRACGrid/DIRAC/issues/7116#issuecomment-2188740414
reqStatus = reqStatuses.get(requestID)
if not reqStatus:
self._logVerbose(
"Failed to get request status",
f"Request {requestID} does not exist",
transID=transID,
method="getSubmittedFileStatus",
)
continue
if reqStatus not in Request.FINAL_STATES:
continue

statusDict = self.requestClient.getRequestFileStatus(requestID, lfnList)
if not statusDict["OK"]:
log = self._logVerbose if "not exist" in statusDict["Message"] else self._logWarn
Expand All @@ -371,10 +418,19 @@ def getSubmittedFileStatus(self, fileDicts):
transID=transID,
method="getSubmittedFileStatus",
)
else:
for lfn, newStatus in statusDict["Value"].items():
if newStatus == "Done":
updateDict[lfn] = TransformationFilesStatus.PROCESSED
elif newStatus == "Failed":
updateDict[lfn] = TransformationFilesStatus.PROBLEMATIC
continue

# If we are here, it means the Request is in a final state.
# In principle, you could expect every file also be in a final state
# but this is only true for simple Request.
# Hence, the file is marked as PROCESSED only if the file status is Done
# In any other case, we mark it problematic
# This is dangerous though, as complex request may not be re-entrant
# We would need a way to make sure it is safe to do so.
# See https://github.com/DIRACGrid/DIRAC/issues/7116 for more details
for lfn, newStatus in statusDict["Value"].items():
if newStatus == "Done":
updateDict[lfn] = TransformationFilesStatus.PROCESSED
else:
updateDict[lfn] = TransformationFilesStatus.PROBLEMATIC
return S_OK(updateDict)

0 comments on commit bf61420

Please sign in to comment.