Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume raw/generator unmerged dump data in MSUnmerged #12059

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
457 changes: 205 additions & 252 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py

Large diffs are not rendered by default.

54 changes: 25 additions & 29 deletions src/python/WMCore/Services/RucioConMon/RucioConMon.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ def __init__(self, url, logger=None, configDict=None):
super(RucioConMon, self).__init__(configDict)
self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint'])

def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False):
def _getResult(self, uri, callname="", clearCache=False, args=None):
"""
Either fetch data from the cache file or query the data-service
:param uri: The endpoint uri
:param callname: alias for caller function
:param clearCache: parameter to control the cache behavior
:param args: additional parameters to HTTP request call
:param binary: specifies request for binary object from HTTP requests (e.g. zipped content)
:return: A dictionary
"""

Expand All @@ -68,31 +67,26 @@ def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False
if clearCache:
self.clearCache(cachedApi, args)
results = '{}' # explicitly define results which will be loaded by json.loads below
if binary:
with self.refreshCache(cachedApi, apiUrl, decoder=False, binary=True) as istream:
results = gzip.decompress(istream.read())
return results
else:
with self.refreshCache(cachedApi, apiUrl) as istream:
results = istream.read()

results = json.loads(results)
return results
with self.refreshCache(cachedApi, apiUrl, decoder=True, binary=False) as istream:
results = istream.read()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I don't know how much data istream reads, but here you have two potential memory spikes, one is reading data from istream and another to parse the json. Instead, I suggest to use

return json.load(istream)

which performs both steps as one, i.e. JSON can read from file object directly.

return json.loads(results)

def _getResultZipped(self, uri, callname="", clearCache=True, args=None):
def _getResultZipped(self, uri, callname="", clearCache=True):
"""
This method is retrieving a zipped file from the uri privided, instead
of the normal json
This method retrieves gzipped content, instead of the standard json format.
:param uri: The endpoint uri
:param callname: alias for caller function
:param clearCache: parameter to control the cache behavior
:param args: additional parameters to HTTP request call
:return: a list of LFNs
:return: yields a single record from the data retrieved
"""
data = self._getResult(uri, callname, clearCache, args, binary=True)
# convert bytes which we received upstream to string
data = decodeBytesToUnicode(data)
return [f for f in data.split('\n') if f]
cachedApi = callname
if clearCache:
self.clearCache(cachedApi)

with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream:
for line in istream:
line = decodeBytesToUnicode(line).replace("\n", "")
yield line

def getRSEStats(self):
"""
Expand All @@ -109,20 +103,22 @@ def getRSEUnmerged(self, rseName, zipped=False):
Gets the list of all unmerged files in an RSE
:param rseName: The RSE whose list of unmerged files to be retrieved
:param zipped: If True the interface providing the zipped lists will be called
:return: A list of unmerged files for the RSE in question
:return: a generator of unmerged files for the RSE in question
"""
# NOTE: The default API provided by Rucio Consistency Monitor is in a form of a
# zipped file/stream. Currently we are using the newly provided json API
# But in in case we figure out the data is too big we may need to
# implement the method with the zipped API and use disc cache for
# reading/streaming from file. This will prevent any set arithmetic
# in the future.
if not zipped:
uri = "files?rse=%s&format=json" % rseName
rseUnmerged = self._getResult(uri, callname=rseName)
return rseUnmerged
else:
if zipped:
uri = "files?rse=%s&format=raw" % rseName
callname = '{}.zipped'.format(rseName)
rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True)
return rseUnmerged
rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True)
else:
uri = "files?rse=%s&format=json" % rseName
callname = '{}.json'.format(rseName)
rseUnmerged = self._getResult(uri, callname=callname)
# now lazily return items
for item in rseUnmerged:
yield item
4 changes: 2 additions & 2 deletions src/python/WMCore/Services/WMStatsServer/WMStatsServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def getChildParentDatasetMap(self, requestType="StepChain", parentageResolved=Fa
def getProtectedLFNs(self):
"""
A method to be used for fetching a list of all protected lfns from WMStatServer
:returns: A list of lfns
:returns: a unique list of protected LFNs
"""
callname = 'protectedlfns'
return self._getResult(callname, verb="GET")
return list(set(self._getResult(callname, verb="GET")))
73 changes: 32 additions & 41 deletions test/python/WMCore_t/MicroService_t/MSUnmerged_t/MSUnmerged_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import os
import unittest

from future.utils import viewkeys
from mock import mock

from Utils.PythonVersion import PY3
Expand Down Expand Up @@ -70,8 +69,8 @@ def getBasicRSEData():
rse = {"name": "T2_TestRSE",
"counters": {"dirsToDeleteAll": 0},
"dirs": {"allUnmerged": set(),
"toDelete": {},
"protected": []},
"toDelete": set(),
"protected": set()},
"files": {"allUnmerged": set(lfns),
"toDelete": {},
"protected": []}
Expand Down Expand Up @@ -168,13 +167,13 @@ def testPlineUnmerged(self):
rse = self.msUnmerged.updateRSETimestamps(rse, start=True, end=False)
rse = self.msUnmerged.consRecordAge(rse)
rse = self.msUnmerged.getUnmergedFiles(rse)
rse = self.msUnmerged.filterUnmergedFiles(rse)
rse = self.msUnmerged.getPfn(rse)
rse = self.msUnmerged.cleanRSE(rse)
rse = self.msUnmerged.updateServiceCounters(rse)
rse = self.msUnmerged.updateRSETimestamps(rse, start=False, end=True)
# self.msUnmerged.plineUnmerged.run(rse)
expectedRSE = {'name': 'T2_US_Wisconsin',
'pfnPrefix': None,
'pfnPrefix': mock.ANY,
'isClean': False,
'rucioConMonStatus': None,
'timestamps': {'endTime': mock.ANY,
Expand All @@ -185,23 +184,18 @@ def testPlineUnmerged(self):
"counters": {"totalNumFiles": 11938,
"totalNumDirs": 11,
"dirsToDelete": 6,
"filesToDelete": 0,
"filesToDelete": 10934,
"filesDeletedSuccess": 0,
"filesDeletedFail": 0,
"dirsDeletedSuccess": 0,
"dirsDeletedFail": 0,
"gfalErrors": {}},
'files': {'allUnmerged': mock.ANY,
'files': {'allUnmerged': [],
'deletedFail': set(),
'deletedSuccess': set(),
'protected': {},
'toDelete': {'/store/unmerged/Phase2HLTTDRSummer20ReRECOMiniAOD/DYToLL_M-50_TuneCP5_14TeV-pythia8/FEVT/FlatPU0To200_pilot_111X_mcRun4_realistic_T15_v1-v2': mock.ANY,
'/store/unmerged/Run2016G/DoubleEG/MINIAOD/UL2016_MiniAODv2-v1': mock.ANY,
'/store/unmerged/SAM/testSRM/SAM-cms-lvs-gridftp.hep.wisc.edu': mock.ANY,
'/store/unmerged/SAM/testSRM/SAM-cms-lvs-gridftp.hep.wisc.edu/lcg-util': mock.ANY,
'/store/unmerged/SAM/testSRM/SAM-cmssrm.hep.wisc.edu': mock.ANY,
'/store/unmerged/SAM/testSRM/SAM-cmssrm.hep.wisc.edu/lcg-util': mock.ANY}},
'dirs': {'allUnmerged': set(),
'toDelete': {}},
'dirs': {'allUnmerged': [],
"deletedSuccess": set(),
"deletedFail": set(),
'protected': {'/store/unmerged/RunIIAutumn18FSPremix/PMSSM_set_1_prompt_1_TuneCP2_13TeV-pythia8/AODSIM/GridpackScan_102X_upgrade2018_realistic_v15-v1',
Expand Down Expand Up @@ -233,54 +227,51 @@ def testCutPath(self):

def testFilterInclDirectories(self):
"Test MSUnmerged with including directories filter"
toDeleteDict = {"/store/unmerged/data/prod/2018/1/12": ["/store/unmerged/data/prod/2018/1/12/log6.tar"],
"/store/unmerged/express/prod/2020/1/12": ["/store/unmerged/express/prod/2020/1/12/log8.tar",
"/store/unmerged/express/prod/2020/1/12/log9.tar"]}
toDeleteDict = {"/store/unmerged/data/prod/2018/1/12", "/store/unmerged/express/prod/2020/1/12"}
rseData = getBasicRSEData()

self.msUnmerged.msConfig['dirFilterIncl'] = ["/store/unmerged/data/prod/2018/",
"/store/unmerged/express"]
self.msUnmerged.protectedLFNs = set()
filterData = self.msUnmerged.filterUnmergedFiles(rseData)
self.assertEqual(filterData['counters']['dirsToDelete'], 2)
self.assertItemsEqual(viewkeys(filterData['files']['toDelete']), viewkeys(toDeleteDict))
self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/data/prod/2018/1/12']),
toDeleteDict['/store/unmerged/data/prod/2018/1/12'])
self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/express/prod/2020/1/12']),
toDeleteDict['/store/unmerged/express/prod/2020/1/12'])
filterData = set()
for dirPath in rseData['dirs']['allUnmerged']:
if self.msUnmerged._isDeletable(dirPath):
filterData.add(dirPath)

self.assertEqual(len(filterData), 2)
self.assertItemsEqual(filterData, toDeleteDict)

def testFilterExclDirectories(self):
"Test MSUnmerged with excluding directories filter"
toDeleteDict = {"/store/unmerged/data/prod/2018/1/12": ["/store/unmerged/data/prod/2018/1/12/log6.tar"],
"/store/unmerged/express/prod/2020/1/12": ["/store/unmerged/express/prod/2020/1/12/log8.tar",
"/store/unmerged/express/prod/2020/1/12/log9.tar"]}
toDeleteDict = {"/store/unmerged/data/prod/2018/1/12", "/store/unmerged/express/prod/2020/1/12"}
rseData = getBasicRSEData()

self.msUnmerged.msConfig['dirFilterExcl'] = ["/store/unmerged/logs",
"/store/unmerged/data/prod/2019",
"/store/unmerged/alan/prod"]
self.msUnmerged.protectedLFNs = set()
filterData = self.msUnmerged.filterUnmergedFiles(rseData)
self.assertEqual(filterData['counters']['dirsToDelete'], 2)
self.assertItemsEqual(viewkeys(filterData['files']['toDelete']), viewkeys(toDeleteDict))
self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/data/prod/2018/1/12']),
toDeleteDict['/store/unmerged/data/prod/2018/1/12'])
self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/express/prod/2020/1/12']),
toDeleteDict['/store/unmerged/express/prod/2020/1/12'])
filterData = set()
for dirPath in rseData['dirs']['allUnmerged']:
if self.msUnmerged._isDeletable(dirPath):
filterData.add(dirPath)

self.assertEqual(len(filterData), 2)
self.assertItemsEqual(filterData, toDeleteDict)

def testFilterInclExclDirectories(self):
"Test MSUnmerged with including and excluding directories filter"
toDeleteDict = {"/store/unmerged/express/prod/2020/1/12": ["/store/unmerged/express/prod/2020/1/12/log8.tar",
"/store/unmerged/express/prod/2020/1/12/log9.tar"]}
toDeleteDict = {"/store/unmerged/express/prod/2020/1/12"}
rseData = getBasicRSEData()
self.msUnmerged.msConfig['dirFilterIncl'] = ["/store/unmerged/data/prod/2018/",
"/store/unmerged/express"]
self.msUnmerged.msConfig['dirFilterExcl'] = ["/store/unmerged/logs",
"/store/unmerged/data/prod",
"/store/unmerged/alan/prod"]
self.msUnmerged.protectedLFNs = set()
filterData = self.msUnmerged.filterUnmergedFiles(rseData)
self.assertEqual(filterData['counters']['dirsToDelete'], 1)
self.assertItemsEqual(viewkeys(filterData['files']['toDelete']), viewkeys(toDeleteDict))
self.assertItemsEqual(list(filterData['files']['toDelete']['/store/unmerged/express/prod/2020/1/12']),
toDeleteDict['/store/unmerged/express/prod/2020/1/12'])
filterData = set()
for dirPath in rseData['dirs']['allUnmerged']:
if self.msUnmerged._isDeletable(dirPath):
filterData.add(dirPath)

self.assertEqual(len(filterData), 1)
self.assertItemsEqual(filterData, toDeleteDict)
42 changes: 42 additions & 0 deletions test/python/WMCore_t/MicroService_t/MSUnmerged_t/test_gfal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python
import logging
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELF-REMINDER: This script will be removed before merging this PR.


try:
import gfal2
except ImportError:
# in case we do not have gfal2 installed
print("FAILED to import gfal2. Use it only in emulateGfal2=True mode!!!")
gfal2 = None

def createGfal2Context(logLevel="normal", emulate=False):
"""
Create a gfal2 context object
:param logLevel: string with the gfal2 log level
:param emulate: boolean to be used by unit tests
:return: the gfal2 context object
"""
if emulate:
return None
ctx = gfal2.creat_context()
gfal2.set_verbose(gfal2.verbose_level.names[logLevel])
return ctx

def testGFAL(ctx):
logger = logging.getLogger()
rseDirs = ["/store/unmerged/Run3Summer22EENanoAODv11/Wto2Q-3Jets_HT-200to400_TuneCP5_13p6TeV_madgraphMLM-pythia8/NANOAODSIM/126X_mcRun3_2022_realistic_postEE_v1-v3",
"/store/unmerged/RunIISummer20UL18NanoAODv9/GluGluHoffshell_HToWWToENuTauNu_TuneCP5_13TeV_MCFM701-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2"]

for dirPfn in rseDirs:
try:
# NOTE: For gfal2 rmdir() exit status of 0 is success
rmdirSuccess = ctx.rmdir(dirPfn) == 0
except gfal2.GError as gfalExc:
logger.warning("MISSING directory: %s, gfal code=%s", dirPfn, gfalExc.code)

def main():
ctx = createGfal2Context()
testGFAL(ctx)
print("succeeded")

if __name__ == '__main__':
main()
11 changes: 5 additions & 6 deletions test/python/WMCore_t/Services_t/Rucio_t/RucioConMon_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import unittest

from nose.plugins.attrib import attr

from WMCore.Services.RucioConMon.RucioConMon import RucioConMon


Expand All @@ -17,17 +16,17 @@ class RucioConMonTest(unittest.TestCase):
Unit tests for RucioConMon Service module
"""

@attr("integration")
# @attr("integration")
def testGetRSEUnmerged(self):
"""
Test getRSEUnmerged method using both zipped and unzipped requests
This test uses specific rse name which can be changed to any other RSE.
"""
# url = "https://cmsweb.cern.ch/rucioconmon/WM/files?rse=T2_TR_METU&format=raw"
mgr = RucioConMon("https://cmsweb.cern.ch/rucioconmon")
rseName = "T2_TR_METU"
dataUnzipped = mgr.getRSEUnmerged(rseName, zipped=False)
dataZipped = mgr.getRSEUnmerged(rseName, zipped=True)
mgr = RucioConMon("https://cmsweb.cern.ch/rucioconmon/unmerged")
rseName = "T2_RU_ITEP"
dataUnzipped = [item for item in mgr.getRSEUnmerged(rseName, zipped=False)]
dataZipped = [item for item in mgr.getRSEUnmerged(rseName, zipped=True)]
self.assertTrue(dataUnzipped == dataZipped)


Expand Down