Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MSUnmerged initStandalone && Read AllUnmerged from file #11916

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import random
import re
import os
import os, shutil
import errno
import stat
try:
Expand Down Expand Up @@ -95,7 +95,8 @@ def __init__(self, msConfig, logger=None):

self.msConfig.setdefault("verbose", True)
self.msConfig.setdefault("interval", 60)
self.msConfig.setdefault("limitFilesPerRSE", 200)
self.msConfig.setdefault("limitFilesPerRSE", 2000)
self.msConfig.setdefault("limitDirsPerRSE", 200)
self.msConfig.setdefault("skipRSEs", [])
self.msConfig.setdefault("rseExpr", "*")
self.msConfig.setdefault("enableRealMode", False)
Expand Down Expand Up @@ -302,6 +303,70 @@ def _execute(self, rseList):
self.plineCounters[pline.name]['rsesCleaned'], \
self.plineCounters[pline.name]['filesDeletedSuccess']



# @profile
def cleanRSEOs(self, rse):
"""
The method to implement the actual deletion of files for an RSE.
:param rse: MSUnmergedRSE object to be cleaned
:return: The MSUnmergedRSE object
"""

self.logger.info("RSE: %s, Using os library for cleaning %s/store/unmerged/ locally!", rse['name'], rse['pfnPrefix'])

if not rse['pfnPrefix']:
self.logger.error("RSE: %s, Missing rse['pfnPrefix']. Cannot continue!")
return rse

# Start cleaning one directory at a time:
dirCounter = 0
for dirLfn in rse['dirs']['toDelete']:
if dirCounter == self.msConfig['limitDirsPerRSE']:
self.logger.info("Reached directory counter limit: %s! Exit now!", self.msConfig['limitDirsPerRSE'])
break
if dirLfn in rse['dirs']['deletedSuccess']:
self.logger.info("RSE: %s Already deleted lfn: %s.", rse['name'], dirLfn)
continue
dirCounter += 1

dirPfn = rse['pfnPrefix'] + dirLfn
# self.logger.info("RSE: %s Trying to delete pfn: %s", rse['name'], dirPfn)

if self.msConfig['enableRealMode']:
# The following bool flag is to track the success for directory removal
# during all consecutive attempts/steps of cleaning the current branch.
rmdirSuccess = False

# Initially try to delete the whole directory even before emptying its content:
self.logger.info("RSE: %s Trying to delete lfn: %s", rse['name'], dirLfn)
try:
rmdirSuccess = self._rmDirOs(dirPfn)
except OSError as osExc:
errMessage = osExc.strerror
rse['counters']['gfalErrors'].setdefault(errMessage, 0)
rse['counters']['gfalErrors'][errMessage] += 1

# If the directory was considered successfully removed, update the file counters with the length of the directory contents
# If the above operation fails try to execute the directory contents deletion in bulk - full list of files per directory
if rmdirSuccess:
rse['dirs']['deletedSuccess'].add(dirLfn)
# if dirLfn in rse['dirs']['toDelete']:
# rse['dirs']['toDelete'].remove(dirLfn)
if dirLfn in rse['dirs']['deletedFail']:
rse['dirs']['deletedFail'].remove(dirLfn)
msg = "RSE: %s SUCCESS deleting lfn: %s"
self.logger.info(msg, rse['name'], dirLfn)
else:
rse['dirs']['deletedFail'].add(dirLfn)
msg = "RSE: %s FAILED to delete lfn: %s"
self.logger.error(msg, rse['name'], dirLfn)

rse['counters']['dirsDeletedSuccess'] = len(rse['dirs']['deletedSuccess'])
rse['counters']['dirsDeletedFail'] = len(rse['dirs']['deletedFail'])

return rse

# @profile
def cleanRSE(self, rse):
"""
Expand Down Expand Up @@ -456,6 +521,25 @@ def _rmDir(self, ctx, dirPfn):
rmdirSuccess = False
return rmdirSuccess

def _rmDirOs(self, dirPfn):
"""
Auxiliary method to be used for removing a single directory entry with os libraries
and handling eventual errors raised.
:param dirPfn: The Pfn of the directory to be removed
:return: Bool: True if the removal was successful, False otherwise
NOTE: An attempt to delete an already missing directory is considered a success
"""
try:
shutil.rmtree(dirPfn)
rmdirSuccess = True
except OSError as osExc:
if osExc.errno == errno.ENOENT:
self.logger.warning("MISSING directory: %s", dirPfn)
rmdirSuccess = True
else:
self.logger.error("FAILED to remove directory: %s: gfalException: %s, gfalErrorCode: %s", dirPfn, str(osExc), osExc.errno)
raise osExc
return rmdirSuccess

def _purgeTree(self, ctx, baseDirPfn, isDirEntry=False):
"""
Expand Down
Loading