From e5e93229d413c4e6c86130225f3eec1646303c81 Mon Sep 17 00:00:00 2001 From: Todor Ivanov Date: Thu, 29 Feb 2024 12:03:40 +0100 Subject: [PATCH 1/2] Add MSUnmerged initStandalone --- .../MicroService/MSUnmerged/initStandalone.py | 510 ++++++++++++++++++ 1 file changed, 510 insertions(+) create mode 100644 src/python/WMCore/MicroService/MSUnmerged/initStandalone.py diff --git a/src/python/WMCore/MicroService/MSUnmerged/initStandalone.py b/src/python/WMCore/MicroService/MSUnmerged/initStandalone.py new file mode 100644 index 0000000000..9df4f18acb --- /dev/null +++ b/src/python/WMCore/MicroService/MSUnmerged/initStandalone.py @@ -0,0 +1,510 @@ +import sys +import os +import time +import logging +import resource +import stat +import errno +import json +import random +import re +import queue +import threading + + +from pprint import pformat, pprint +# from itertools import izip + +from WMCore.Services.Rucio.Rucio import Rucio +from rucio.client import Client + +import gfal2 + +from WMCore.Configuration import loadConfigurationFile +from WMCore.MicroService.MSCore.MSCore import MSCore +from WMCore.MicroService.MSCore.MSManager import MSManager +from WMCore.MicroService.MSUnmerged.MSUnmerged import MSUnmerged, createGfal2Context +from WMCore.MicroService.MSUnmerged.MSUnmergedRSE import MSUnmergedRSE +from WMCore.Services.RucioConMon.RucioConMon import RucioConMon +from WMCore.Services.WMStatsServer.WMStatsServer import WMStatsServer +from WMCore.Database.MongoDB import MongoDB +from WMCore.WMException import WMException +from Utils.Pipeline import Pipeline, Functor +from Utils.TwPrint import twFormat +from Utils.IteratorTools import grouper + + + +def resCons(mark, logger=None): + """ + A simple function for measuring resources consumption at a given marker + point in the script + :param mark: A string identifying the marker point + :param logger: A logger to use for the output + :return: The message as constructed for the logger + """ + usage = resource.getrusage(resource.RUSAGE_SELF) + msg = "%s: \nusertime=%s \nsystime=%s \nmem=%s mb" + msg = msg % (mark, usage[0], usage[1], usage[2]/1024.0) + logger.debug(msg) + return msg + + +def reset_logging(): + manager = logging.root.manager + manager.disabled = logging.NOTSET + for logger in manager.loggerDict.values(): + if isinstance(logger, logging.Logger): + logger.setLevel(logging.NOTSET) + logger.propagate = True + logger.disabled = False + logger.filters.clear() + handlers = logger.handlers.copy() + for handler in handlers: + # Copied from `logging.shutdown`. + try: + handler.acquire() + handler.flush() + handler.close() + except (OSError, ValueError): + pass + finally: + handler.release() + logger.removeHandler(handler) + +def _lsTree(ctx, baseDirPfn, haltAtBottom=False, halt=False): + """ + Rrecursively traverse the tree under baseDirPfn and return the resulted list of directories and files + param ctx: Gfal Context manager object + param baseDirPfn: The Pfn of the baseDir starting point + param haltAtBottom: Flag, if True stop recursion at the moment the first the first fileEntry is found + param halt: Flag to signal immediate recursion halt + return: Tuple: (List of all directories and files found && The halt flag from the current run) + """ + dirList = [] + if halt: + return dirList, halt + + # First test if baseDirPfn is actually a directory entry: + try: + logger.info("Stat baseDirPfn: %s" % baseDirPfn) + entryStat = ctx.stat(baseDirPfn) + if not stat.S_ISDIR(entryStat.st_mode): + dirList.append(baseDirPfn) + logger.info("_lsTree called with a fileEntry: %s" % baseDirPfn) + return dirList, halt + except gfal2.GError as gfalExc: + if gfalExc.code == errno.ENOENT: + logger.warning("MISSING baseDir: %s", baseDirPfn) + return dirList, halt + else: + logger.error("FAILED to open baseDir: %s: gfalException: %s", baseDirPfn, str(gfalExc)) + return dirList, halt + + if baseDirPfn[-1] != '/': + baseDirPfn += '/' + + # Second recursively iterate down the tree: + try: + logger.info("Listing baseDirPfn: %s" % baseDirPfn) + dirEntryList = ctx.listdir(baseDirPfn) + except gfal2.GError as gfalExc: + logger.error("gfal Exception raised while listing %s. GError: %s" % (baseDirPfn, str(gfalExc))) + raise gfalExc + + for dirEntry in dirEntryList: + if halt: + break + if dirEntry in ['.', '..']: + continue + dirEntryPfn = baseDirPfn + dirEntry + # logger.info(dirEntryPfn) + try: + logger.info("Stat dirEntryPfn: %s" % dirEntryPfn) + entryStat = ctx.stat(dirEntryPfn) + except gfal2.GError as gfalExc: + if gfalExc.code == errno.ENOENT: + logger.warning("MISSING dirEntry: %s", dirEntryPfn) + continue + else: + logger.error("FAILED to open dirEntry: %s: gfalException: %s", dirEntryPfn, str(gfalExc)) + continue + + if not stat.S_ISDIR(entryStat.st_mode): + dirList.append(dirEntryPfn) + logger.info("Found file: %s" % dirEntry) + if haltAtBottom: + halt=True + return dirList, halt + else: + dirList.append(dirEntryPfn) + dirListExtension, halt = _lsTree(ctx, dirEntryPfn, haltAtBottom=haltAtBottom, halt=halt) + dirList.extend(dirListExtension) + + return dirList, halt + + +def lsTree(ctx, baseDirPfn, haltAtBottom=False): + """ + A _lsTree wrapper + return: Just a list with the directory contents + """ + dirContent, _ = _lsTree(ctx, baseDirPfn, haltAtBottom=haltAtBottom) + return dirContent + + + +def measureTime(ctx, rse, baseDirLfn='/store/unmerged/'): + startTime = {} + endTime = {} + for proto in rse['pfnPrefixes']: + baseDirPfn = rse['pfnPrefixes'][proto] + baseDirLfn + print("Start lsTree with protocol: %s" % proto) + print("Base dir pfn: %s" % baseDirPfn) + startTime[proto] = time.time() + dirContent = lsTree(ctx, baseDirPfn) + endTime[proto] = time.time() + print("Elapsed Time Seconds = %s" % (endTime[proto] - startTime[proto])) + print("") + return dirContent + + +def findPfnPrefix(rseName, proto): + logger.info("searching for Pfn Prefix for protocol: %s" % proto) + pfnPrefix = None + storageConfigPath = '/cvmfs/cms.cern.ch/SITECONF/' + rseName + '/storage.json' + try: + with open(storageConfigPath, 'r') as storageConfigFile: + storageConfig = json.load(storageConfigFile) + for protoConfig in storageConfig[0]['protocols']: + if protoConfig['protocol'] == proto: + # pprint(protoConfig) + if 'prefix' in protoConfig: + pfnPrefix = protoConfig['prefix'] + storageConfigFile.close() + except Exception as ex: + logger.error('Could not open Storage Config File for site: %s' % rseName) + return pfnPrefix + +def findUnprotectdLfn(ctx, msUnmerged, rse): + """ + A simple function to find a random unprotected file suitable for deletion + """ + unprotectedLfn = None + # find the proper pfnPrefix for the site: + if rse['pfnPrefixes']['SRMv2']: + pfnPrefix = rse['pfnPrefixes']['SRMv2'] + else: + pfnPrefix = rse['pfnPrefixes']['WebDAV'] + logger.info("Using PfnPrefix: %s" % pfnPrefix) + + if not msUnmerged.protectedLFNs: + logger.error( "The current MSUnmerged instance has an EMPTY protectedLFNs list. Please update it from the Production WMStatServer. ") + return None + + try: + # dirEntryPfn = rse['pfnPrefixes']['WebDAV'] + '/store/unmerged/' + dirEntryPfn = pfnPrefix + '/store/unmerged/' + logger.info("Stat /store/unmerged/ area at: %s" % dirEntryPfn) + unmergedCont = ctx.listdir(dirEntryPfn) + except gfal2.GError as gfalExc: + logger.error("FAILED to open dirEntry: %s: gfalException: %s", dirEntryPfn, str(gfalExc)) + return unprotectedLfn + + if not unmergedCont: + logger.error("Empty unmerged content") + return None + + + # First try to seek for a file through RucioConMon: + logger.info("First: Trying to find a random Lfn from RucioConMon:") + rseAllUnmerged = [] + try: + rseAllUnmerged = msUnmerged.rucioConMon.getRSEUnmerged(rse['name']) # Intentionally not saving this in the RSE object + except exception as ex: + logger.error("Failed to fetch Unmerged files lists from RucioConMon for site: %s" % rse['name']) + + if rseAllUnmerged: + for fileLfn in rseAllUnmerged: + # Check if what we start with is under /store/unmerged/* and is currently under one of the branches present at the site + if msUnmerged.regStoreUnmergedLfn.match(fileLfn): + # Cut the path to the deepest level known to WMStats protected LFNs + fileBaseLfn = msUnmerged._cutPath(fileLfn) + if not fileBaseLfn in msUnmerged.protectedLFNs: + filePfn = pfnPrefix + fileLfn + try: + logger.info("Stat fileEtryPfn: %s" % filePfn) + entryStat = ctx.stat(filePfn) + except gfal2.GError as gfalExc: + if gfalExc.code == errno.ENOENT: + logger.warning("MISSING fileEntry: %s", filePfn) + continue + else: + logger.error("FAILED to open fileEntry: %s: gfalException: %s", dirEntryPfn, str(gfalExc)) + continue + logger.info("Found an unprotected fileLfn %s with fileBaseLfn: %s" % (fileLfn, fileBaseLfn)) + unprotectedLfn = fileLfn + return unprotectedLfn + + logger.info("Second: Start recursive search for an unprotected Lfn at: %s " % rse['name']) + while not unprotectedLfn: + dirEntry = random.choice(unmergedCont) + skipDirEntry = False + for dirFilter in msConfig['dirFilterExcl']: + if dirFilter.startswith('/store/unmerged/' + dirEntry): + skipDirEntry = True + break + if skipDirEntry: + continue + + logger.info("Start recursive search for an unprotected Lfn at: %s in: /store/unmerged/%s " % (rse['name'], dirEntry)) + dirEntryPfn = pfnPrefix + '/store/unmerged/' + dirEntry + try: + dirTreePfn = lsTree(ctx, dirEntryPfn, haltAtBottom=True) + except gfal2.GError as gfalExc: + logger.error("FAILED to recursively traverse through dirEntry: %s: gfalException: %s", dirEntryPfn, str(gfalExc)) + break + + filePfn = None + for dirEntry in dirTreePfn: + if dirEntry.endswith(".root"): + filePfn = dirEntry + if not filePfn: + continue + logger.info("filePfn: %s" % filePfn) + fileLfn = filePfn.split(pfnPrefix)[1] + if not fileLfn.startswith('/store/unmerged/'): + logger.warning("Badly constructed fileLfn: %s" % fileLfn) + continue + fileBaseLfn = msUnmerged._cutPath(fileLfn) + if not fileBaseLfn in msUnmerged.protectedLFNs: + logger.info("Found an unprotected fileLfn %s with fileBaseLfn: %s" % (fileLfn, fileBaseLfn)) + unprotectedLfn = fileLfn + + return unprotectedLfn + + +# class gfal.... + + +def getUnmergedfromFile(msUnmerged, rse, filePath): + """ + Fetches all the records of unmerged files per RSE from Rucio Consistency Monitor + and cuts everything to a certain level in the path and puts the list in the rse obj. + + Path example: + /store/unmerged/Run2016B/JetHT/MINIAOD/ver2_HIPM_UL2016_MiniAODv2-v2/140000/388E3DEF-9F15-D04C-B582-7DD036D9DD33.root + + Where: + /store/unmerged/ - root unmerged area + /Run2016B - acquisition era + /JetHT - primary dataset + /MINIAOD - data tier + /ver2_HIPM_UL2016_MiniAODv2-v2 - processing string + processing version + /140000/388E3DEF-...-7DD036D9DD33.root - to be cut off + + :param rse: The RSE to work on + :param filePath: Path to file from which to read the list of unmerged files. + :return: rse + """ + msUnmerged.logger.info("We are here") + with open(filePath, 'r') as fdUnmerged: + for line in fdUnmerged: + # rse['files']['allUnmerged'].append(line) + lfnPath = line + # Check if what we start with is under /store/unmerged/* + if msUnmerged.regStoreUnmergedLfn.match(lfnPath): + # Cut the path to the deepest level known to WMStats protected LFNs + dirPath = msUnmerged._cutPath(lfnPath) + # Check if what is left is still under /store/unmerged/* + if msUnmerged.regStoreUnmergedLfn.match(dirPath): + # Add it to the set of allUnmerged + rse['dirs']['allUnmerged'].add(dirPath) + rse['counters']['totalNumFiles'] = len(rse['files']['allUnmerged']) + rse['counters']['totalNumDirs'] = len(rse['dirs']['allUnmerged']) + return rse + +def filterUnmergedFromFile(msUnmerged, rse, filePath): + """ + This method is applying set compliment operation to the set of unmerged + files per RSE in order to exclude the protected LFNs. It uses a file with the + list of allNnmerged lfns for creating the proper generators for file names + instead of uploading everything in memory. + :param filePath: Path to the file with the full list of allUnmergred lfns + :param rse: The RSE to work on + :return: rse + """ + rse['dirs']['toDelete'] = rse['dirs']['allUnmerged'] - msUnmerged.protectedLFNs + rse['dirs']['protected'] = rse['dirs']['allUnmerged'] & msUnmerged.protectedLFNs + + # The following check may seem redundant, but better stay safe than sorry + if not (rse['dirs']['toDelete'] | rse['dirs']['protected']) == rse['dirs']['allUnmerged']: + rse['counters']['dirsToDelete'] = -1 + msg = "Incorrect set check while trying to estimate the final set for deletion." + raise MSUnmergedPlineExit(msg) + + # Get rid of 'allUnmerged' directories + # rse['dirs']['allUnmerged'].clear() + + # NOTE: Here we may want to filter out all protected files from allUnmerged and leave just those + # eligible for deletion. This will minimize the iteration time of the filters + # from toDelete later on. + # while rse['files']['allUnmerged' + + # Now create the filters for rse['files']['toDelete'] - those should be pure generators + # A simple generator: + def genFunc(pattern, filePath): + with open(filePath, 'r') as fd: + for line in fd: + if line.startswith(pattern): + yield line + + # NOTE: If the 'dirFilterIncl' is non empty then the cleaning process will + # be enclosed only in this part of the tree and will ignore anything + # from /store/unmerged/ which does not belong to the included filter + # NOTE: 'dirFilterExcl' is always applied. + + # Merge the additional filters into a final set to be applied: + dirFilterIncl = set(msConfig['dirFilterIncl']) + dirFilterExcl = set(msConfig['dirFilterExcl']) + + # Update directory/files with no service filters + if not dirFilterIncl and not dirFilterExcl: + for dirName in rse['dirs']['toDelete']: + rse['files']['toDelete'][dirName] = genFunc(dirName, filePath) + rse['counters']['dirsToDelete'] = len(rse['files']['toDelete']) + msUnmergedDB.logger.info("RSE: %s: %s", rse['name'], twFormat(rse, maxLength=8)) + return rse + + # If we are here, then there are service filters... + for dirName in rse['dirs']['toDelete']: + # apply exclusion filter + dirFilterExclMatch = [] + for pathExcl in dirFilterExcl: + dirFilterExclMatch.append(dirName.startswith(pathExcl)) + if any(dirFilterExclMatch): + # then it matched one of the exclusion paths + continue + if not dirFilterIncl: + # there is no inclusion filter, simply add this directory/files + rse['files']['toDelete'][dirName] = genFunc(dirName, filePath) + continue + + # apply inclusion filter + for pathIncl in dirFilterIncl: + if dirName.startswith(pathIncl): + rse['files']['toDelete'][dirName] = genFunc(dirName, filePath) + break + + # Now apply the filters back to the set in rse['dirs']['toDelete'] + rse['dirs']['toDelete'] = set(rse['files']['toDelete'].keys()) + + # Update the counters: + rse['counters']['dirsToDelete'] = len(rse['files']['toDelete']) + msUnmerged.logger.info("RSE: %s: %s", rse['name'], twFormat(rse, maxLength=8)) + return rse + + + + +if __name__ == '__main__': + + FORMAT = "%(asctime)s:%(levelname)s:%(module)s:%(funcName)s(): %(message)s" + # logging.basicConfig(stream=sys.stdout, format=FORMAT, level=logging.INFO) + logging.basicConfig(stream=sys.stdout, format=FORMAT, level=logging.DEBUG) + logger = logging.getLogger(__name__) + # reset_logging() + + logger.info("########### MSUnmerged Standalone run ###########") + preConfigMarker = resCons("PreConfig", logger=logger) + + msConfigPath=os.getenv('WMCORE_SERVICE_CONFIG') + '/reqmgr2ms-unmerged-standalone/config-unmerged.py' + msConfigObj = loadConfigurationFile(msConfigPath) + msConfig = msConfigObj.section_('views').section_('data').dictionary_() + + preInstanceMarker = resCons("PreInstance", logger=logger) + + # # setup Rucio client + # rucio = Rucio(msConfig['rucioAccount'], configDict={"logger": logger}) + # rcl = Client(account=msConfig['rucioAccount']) + + # logger.info("########### MSManager startup ###########") + # msManager = MSManager(msConfig, logger) + + random.seed(time.time()) + msConfig['enableRealMode'] = False + msUnmerged = MSUnmerged(msConfig) + msUnmerged.resetServiceCounters() + # ctx = createGfal2Context(msConfig['gfalLogLevel'], msConfig['emulateGfal2']) + msUnmerged.protectedLFNs = set(msUnmerged.wmstatsSvc.getProtectedLFNs()) + msUnmerged.rseConsStats = msUnmerged.rucioConMon.getRSEStats() + + preExecMarker = resCons("PreExec", logger=logger) + + mongoDBConfig = { + 'database': msConfig['mongoDB'], + 'server': msConfig['mongoDBServer'], + 'replicaSet': msConfig['mongoDBReplicaSet'], + 'port': msConfig['mongoDBPort'], + 'username': msConfig['mongoDBUser'], + 'password': msConfig['mongoDBPassword'], + 'connect': True, + 'directConnection': False, + 'logger': logger, + 'create': False, + 'mockMongoDB': msConfig['mockMongoDB']} + + # NOTE: We need to blur `username' and `password' keys before printing the configuration: + msg = "Connecting to MongoDB using the following mongoDBConfig:\n%s" + logger.info(msg, pformat({**mongoDBConfig, **{'username': '****', 'password': '****'}})) + + mongoDBObj = MongoDB(**mongoDBConfig) + mongoDB = getattr(mongoDBObj, msConfig['mongoDB']) + mongoClt = mongoDBObj.client + # mongoColl = currDB[msConfig['collection']] if msConfig['collection'] else None + + # result = msUnmerged.execute() + # logger.info('Execute result: %s', pformat(result)) + # postExecMarker = resCons("PostExec", logger=logger) + # # reset_logging() + + # rseNames = msUnmerged.getRSEList() + # rseList = {} + # protoList = ['SRMv2', 'XRootD', 'WebDAV'] + # for rseName in rseNames: + # rse = MSUnmergedRSE(rseName) + # rse = msUnmerged.getRSEFromMongoDB(rse) + # # rse = msUnmerged.getUnmergedFiles(rse) + # rse = msUnmerged.getPfn(rse) + # rse['pfnPrefixes'] = {} + # for proto in protoList: + # rse['pfnPrefixes'][proto] = findPfnPrefix(rse['name'], proto) + # rseList[rse['name']] = rse + + # for rseName in rseList: + # logger.info("Searching for an unprotected Lfn at: %s" % rseName) + # unprotectedLfn = findUnprotectdLfn(ctx, msUnmerged, rseList[rseName]) + # unprotectedBaseLfn = msUnmerged._cutPath(unprotectedLfn) + # rseList[rseName]['files']['toDelete'][unprotectedBaseLfn] = [unprotectedLfn] + + # msUnmerged.execute() + # msUnmerged.protectedLFNs + # msUnmerged.rseConsStats + # rse['pfnPrefixSrm'] = 'srm://srm.ciemat.es:8443/srm/managerv2?SFN=/pnfs/ciemat.es/data/cms/prod' + # rse['pfnPrefixDavs'] = rse['pfnPrefix'] + # lfn = '/store/unmerged/GenericNoSmearGEN/InclusiveDileptonMinBias_TuneCP5Plus_13p6TeV_pythia8/GEN/124X_mcRun3_2022_realistic_v12-v2' + # dirCont = _lsTree(ctx, rse['pfnPrefixDavs'] + lfn) + + protoList = ['SRMv2', 'XRootD', 'WebDAV'] + rseName = 'T2_CH_CERN' + rse = MSUnmergedRSE(rseName) + rse = msUnmerged.getRSEFromMongoDB(rse) + rse['pfnPrefixes'] = {} + for proto in protoList: + rse['pfnPrefixes'][proto] = findPfnPrefix(rse['name'], proto) + rse['pfnPrefixes']['eos'] = '/eos/cms' + rse['pfnPrefix'] = rse['pfnPrefixes']['eos'] + fileUnmerged = '/data/WMCore.MSUnmergedStandalone/debug/T2_CH_CERN_wm_file_list.2024-02-27' + rse = getUnmergedfromFile(msUnmerged, rse, fileUnmerged) + # rse = msUnmerged.filterUnmergedFiles(rse) + rse = filterUnmergedFromFile(msUnmerged, rse, fileUnmerged) From 9bf4f9a95997ef4a42cf49ae9578ac47ffaa8054 Mon Sep 17 00:00:00 2001 From: Todor Ivanov Date: Thu, 29 Feb 2024 18:27:12 +0100 Subject: [PATCH 2/2] Add cleanRSEOs method to MSUnmerged --- .../MicroService/MSUnmerged/MSUnmerged.py | 88 ++++++++++++++++++- .../MicroService/MSUnmerged/initStandalone.py | 13 ++- 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py index 5e797c08eb..db841bd597 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py +++ b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py @@ -18,7 +18,7 @@ import random import re -import os +import os, shutil import errno import stat try: @@ -95,7 +95,8 @@ def __init__(self, msConfig, logger=None): self.msConfig.setdefault("verbose", True) self.msConfig.setdefault("interval", 60) - self.msConfig.setdefault("limitFilesPerRSE", 200) + self.msConfig.setdefault("limitFilesPerRSE", 2000) + self.msConfig.setdefault("limitDirsPerRSE", 200) self.msConfig.setdefault("skipRSEs", []) self.msConfig.setdefault("rseExpr", "*") self.msConfig.setdefault("enableRealMode", False) @@ -302,6 +303,70 @@ def _execute(self, rseList): self.plineCounters[pline.name]['rsesCleaned'], \ self.plineCounters[pline.name]['filesDeletedSuccess'] + + + # @profile + def cleanRSEOs(self, rse): + """ + The method to implement the actual deletion of files for an RSE. + :param rse: MSUnmergedRSE object to be cleaned + :return: The MSUnmergedRSE object + """ + + self.logger.info("RSE: %s, Using os library for cleaning %s/store/unmerged/ locally!", rse['name'], rse['pfnPrefix']) + + if not rse['pfnPrefix']: + self.logger.error("RSE: %s, Missing rse['pfnPrefix']. Cannot continue!") + return rse + + # Start cleaning one directory at a time: + dirCounter = 0 + for dirLfn in rse['dirs']['toDelete']: + if dirCounter == self.msConfig['limitDirsPerRSE']: + self.logger.info("Reached directory counter limit: %s! Exit now!", self.msConfig['limitDirsPerRSE']) + break + if dirLfn in rse['dirs']['deletedSuccess']: + self.logger.info("RSE: %s Already deleted lfn: %s.", rse['name'], dirLfn) + continue + dirCounter += 1 + + dirPfn = rse['pfnPrefix'] + dirLfn + # self.logger.info("RSE: %s Trying to delete pfn: %s", rse['name'], dirPfn) + + if self.msConfig['enableRealMode']: + # The following bool flag is to track the success for directory removal + # during all consecutive attempts/steps of cleaning the current branch. + rmdirSuccess = False + + # Initially try to delete the whole directory even before emptying its content: + self.logger.info("RSE: %s Trying to delete lfn: %s", rse['name'], dirLfn) + try: + rmdirSuccess = self._rmDirOs(dirPfn) + except OSError as osExc: + errMessage = osExc.strerror + rse['counters']['gfalErrors'].setdefault(errMessage, 0) + rse['counters']['gfalErrors'][errMessage] += 1 + + # If the directory was considered successfully removed, update the file counters with the length of the directory contents + # If the above operation fails try to execute the directory contents deletion in bulk - full list of files per directory + if rmdirSuccess: + rse['dirs']['deletedSuccess'].add(dirLfn) + # if dirLfn in rse['dirs']['toDelete']: + # rse['dirs']['toDelete'].remove(dirLfn) + if dirLfn in rse['dirs']['deletedFail']: + rse['dirs']['deletedFail'].remove(dirLfn) + msg = "RSE: %s SUCCESS deleting lfn: %s" + self.logger.info(msg, rse['name'], dirLfn) + else: + rse['dirs']['deletedFail'].add(dirLfn) + msg = "RSE: %s FAILED to delete lfn: %s" + self.logger.error(msg, rse['name'], dirLfn) + + rse['counters']['dirsDeletedSuccess'] = len(rse['dirs']['deletedSuccess']) + rse['counters']['dirsDeletedFail'] = len(rse['dirs']['deletedFail']) + + return rse + # @profile def cleanRSE(self, rse): """ @@ -456,6 +521,25 @@ def _rmDir(self, ctx, dirPfn): rmdirSuccess = False return rmdirSuccess + def _rmDirOs(self, dirPfn): + """ + Auxiliary method to be used for removing a single directory entry with os libraries + and handling eventual errors raised. + :param dirPfn: The Pfn of the directory to be removed + :return: Bool: True if the removal was successful, False otherwise + NOTE: An attempt to delete an already missing directory is considered a success + """ + try: + shutil.rmtree(dirPfn) + rmdirSuccess = True + except OSError as osExc: + if osExc.errno == errno.ENOENT: + self.logger.warning("MISSING directory: %s", dirPfn) + rmdirSuccess = True + else: + self.logger.error("FAILED to remove directory: %s: gfalException: %s, gfalErrorCode: %s", dirPfn, str(osExc), osExc.errno) + raise osExc + return rmdirSuccess def _purgeTree(self, ctx, baseDirPfn, isDirEntry=False): """ diff --git a/src/python/WMCore/MicroService/MSUnmerged/initStandalone.py b/src/python/WMCore/MicroService/MSUnmerged/initStandalone.py index 9df4f18acb..d8f837880c 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/initStandalone.py +++ b/src/python/WMCore/MicroService/MSUnmerged/initStandalone.py @@ -1,5 +1,5 @@ import sys -import os +import os, shutil import time import logging import resource @@ -307,7 +307,6 @@ def getUnmergedfromFile(msUnmerged, rse, filePath): :param filePath: Path to file from which to read the list of unmerged files. :return: rse """ - msUnmerged.logger.info("We are here") with open(filePath, 'r') as fdUnmerged: for line in fdUnmerged: # rse['files']['allUnmerged'].append(line) @@ -357,7 +356,7 @@ def genFunc(pattern, filePath): with open(filePath, 'r') as fd: for line in fd: if line.startswith(pattern): - yield line + yield line.rstrip() # NOTE: If the 'dirFilterIncl' is non empty then the cleaning process will # be enclosed only in this part of the tree and will ignore anything @@ -433,9 +432,11 @@ def genFunc(pattern, filePath): random.seed(time.time()) msConfig['enableRealMode'] = False + msConfig['limitDirsPerRSE'] = -1 + msUnmerged = MSUnmerged(msConfig) msUnmerged.resetServiceCounters() - # ctx = createGfal2Context(msConfig['gfalLogLevel'], msConfig['emulateGfal2']) + ctx = createGfal2Context(msConfig['gfalLogLevel'], msConfig['emulateGfal2']) msUnmerged.protectedLFNs = set(msUnmerged.wmstatsSvc.getProtectedLFNs()) msUnmerged.rseConsStats = msUnmerged.rucioConMon.getRSEStats() @@ -499,6 +500,7 @@ def genFunc(pattern, filePath): rseName = 'T2_CH_CERN' rse = MSUnmergedRSE(rseName) rse = msUnmerged.getRSEFromMongoDB(rse) + rse = msUnmerged.updateRSETimestamps(rse, start=True, end=False) rse['pfnPrefixes'] = {} for proto in protoList: rse['pfnPrefixes'][proto] = findPfnPrefix(rse['name'], proto) @@ -508,3 +510,6 @@ def genFunc(pattern, filePath): rse = getUnmergedfromFile(msUnmerged, rse, fileUnmerged) # rse = msUnmerged.filterUnmergedFiles(rse) rse = filterUnmergedFromFile(msUnmerged, rse, fileUnmerged) + rse = msUnmerged.cleanRSEOs(rse) + rse = msUnmerged.updateRSETimestamps(rse, start=False, end=True) + rse = msUnmerged.uploadRSEToMongoDB(rse)