-
Notifications
You must be signed in to change notification settings - Fork 6
/
baconBatch.py
executable file
·368 lines (314 loc) · 14.9 KB
/
baconBatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
#!/usr/bin/env python
# baconBatch.py #############################################################################
# Python driver for Bacon Analyzer executable
# Original Author N.Wardle (CERN)
# TODO : Provide output support to EOS
# For now assume output is small enough to store locally.
# ------------------------------------------------------------------------------------
import ROOT as r
import sys, commands, os, fnmatch
from optparse import OptionParser
from optparse import OptionGroup
from numpy import arange
from itertools import product
#from BaconAna.Utils.makeFilelist import *
from makeFilelist import *
# I want to make jobs out of ANY executable and separate based on groups of files or [list] input parameters. Also need the input swap to be as generic as possible
# Ok this is dangerous since we pretty much have to assume some arguments for the exec
# Take WAnalysis as the standard command line style
# 'maxevents, input, isGen
#default_args = ['10000000','nothing.root','1'] #,output.root -> could add to analyzer
default_args = []
# Options
parser = OptionParser()
parser = OptionParser(usage="usage: %prog analyzer outputfile [options] \nrun with --help to get list of options")
parser.add_option("-d","--directory",default='',help="Pick up files from a particular directory. can also pass from /eos/. Will initiate split by files (note you must also pass which index the file goes to, eg if its the first argument, use -d 0:directory_name)")
parser.add_option("-o","--outdir",default='bacon',help="output for analyzer. This will always be the output for job scripts.")
parser.add_option("-a","--args",dest="args",default=[],action="append",help='Pass executable args n:arg OR named arguments --name:arg (or -x:arg). Multiple args can be passed with <val1,val2...> or lists of integers with [min,max,stepsize]\n For example ... \
1. -a "0:<cat,dog>" -a "1:<3,4>" will produce the following runs "analyzer cat 3, analyzer cat 4, analyzer dog 3, analyzer dog 4. \n \
2. -a "--my_option:[1,3,1]" will produce runs "analyzer --my_option 1, analyzer --my_option 2". \n \
The two types can be mixed and the tool will always figure out all combinations of the ranges/lists given. You can also simply pass through options directly to the analyzer program with -a "--option_name=value" ')
parser.add_option("-v","--verbose",dest="verbose",default=False,action="store_true",help="Spit out more info")
parser.add_option("","--passSumEntries",dest="passSumEntries",default="",help="x:treename Get Entries in TTree treename and pass to argument x")
parser.add_option("","--passSumWeights",dest="passSumWeights",default="",help="x:treename:branch Get Weights from branch in treename and sum them, pass to x")
parser.add_option("","--blacklist",dest="blacklist",default=[],action="append",help="Add blacklist file types (search for this string in files and ignore them")
parser.add_option("","--sandbox",dest="sandbox",default=[],action="append",help="List of files to copy over to tmp directory - note, will look in current dir, otherwise you can specify the full path.")
# Make batch submission scripts options
parser.add_option("-n","--njobs",dest="njobs",type='int',default=-1,help="Split into n jobs, will automatically produce submission scripts")
parser.add_option("-q","--queue",default='1nh',help="submission queue")
parser.add_option("--dryRun",default=False,action="store_true",help="Do nothing, just print what will happen")
monitor_options = ['sub','check','resub','hadd']
# Monitor options (submit,check,resubmit failed) -- just pass outodir as usual but this time pass --monitor sub --monitor check or --monitor resub
parser.add_option("--monitor",default='',help="Monitor mode (%s) pass directory of jobs"%"/".join(monitor_options))
cwd = os.getcwd()
(options,args) = parser.parse_args()
if len(args)<2 and not options.monitor: sys.exit('Error -- must specify ANALYZER and OUTPUTNAME' )
njobs = options.njobs if options.njobs>0 else 1
if options.passSumWeights and options.passSumEntries : sys.exit('Error -- must specify only one of passSumEntries or passSumWeights' )
r.gROOT.SetBatch(1)
def write_job(exec_line, out, analyzer, i, n):
cwd = os.getcwd()
analyzer_short = analyzer.split("/")[-1]
exec_line = exec_line.replace(analyzer,analyzer_short)
sub_file = open('%s/sub_%s_job%d.sh'%(out,analyzer_short,i),'w')
sub_file.write('#!/bin/bash\n')
sub_file.write('# Job Number %d, running over %d files \n'%(i,n))
sub_file.write('touch %s.run\n'%os.path.abspath(sub_file.name))
sub_file.write('cd %s\n'%os.getcwd())
sub_file.write('eval `scramv1 runtime -sh`\n')
sub_file.write('cd -\n')
sub_file.write('mkdir -p scratch\n')
sub_file.write('cd scratch\n')
sub_file.write('cp -p $CMSSW_BASE/bin/$SCRAM_ARCH/%s .\n'%analyzer)
#sub_file.write('cp -p %s .\n'%(os.path.abspath(analyzer)))
for sbfile in options.sandbox:
if os.path.abspath("%s/%s"%(cwd,sbfile)):
sub_file.write('cp -p %s/%s .\n'%(cwd,sbfile))
else:
sub_file.write('cp -p %s .\n'%sbfile)
sub_file.write('mkdir -p %s\n'%(out))
sub_file.write('if ( %s ) then\n'%exec_line)
sub_file.write('\t hadd -f Output_job%d.root %s/*.root \n'%(i,(out)))
sub_file.write('\t mv Output_job*.root %s\n'%os.path.abspath(out))
sub_file.write('\t rm -rf ./bacon ./Output_job* \n')
for sbfile in options.sandbox: sub_file.write('\t rm -rf %s \n'%sbfile)
sub_file.write('\t touch %s.done\n'%os.path.abspath(sub_file.name))
sub_file.write('else\n')
sub_file.write('\t touch %s.fail\n'%os.path.abspath(sub_file.name))
sub_file.write('fi\n')
sub_file.write('rm -f %s.run\n'%os.path.abspath(sub_file.name))
sub_file.close()
os.system('chmod +x %s'%os.path.abspath(sub_file.name))
def trawlHadd(directory):
list_of_dirs=set()
for root, dirs, files in os.walk(directory):
for x in files:
if '.root' in x:
list_of_dirs.add(root)
for di in list_of_dirs:
for root, dirs, files in os.walk(di):
list_of_files=''
for file in fnmatch.filter(files,'Output*.root'):
list_of_files += ' '+os.path.join(root,'%s'%file)
print root, ' hadding --> ', len(list_of_files.split())
os.system('mkdir -p tmp_r')
#print list_of_files
for fi in list_of_files.split():
os.system('cp %s tmp_r/tmp_%s'%(fi,fi.split('/')[-1]))
outname = di.replace('/','_')
exec_line = 'hadd -f %s/%s.root tmp_r/*'%(di,outname)
if options.verbose: print exec_line
os.system(exec_line)
os.system('rm -rf tmp_r')
def submit_jobs(lofjobs):
for sub_file in lofjobs:
os.system('rm -f %s.done'%os.path.abspath(sub_file))
os.system('rm -f %s.fail'%os.path.abspath(sub_file))
os.system('rm -f %s.log'%os.path.abspath(sub_file))
os.system('bsub -q %s -o %s.log %s'%(options.queue,os.path.abspath(sub_file),os.path.abspath(sub_file)))
if options.monitor:
if options.monitor not in monitor_options: sys.exit('Error -- Unknown monitor mode %s'%options.monitor)
dir = options.outdir
if options.monitor == 'sub' or options.monitor == 'resub':
# pick up job scripts in output directory (ends in .sh)
lofjobs = []
for root,dirs,files in os.walk(dir):
for file in fnmatch.filter(files,'*.sh'):
if options.monitor == 'resub' and not os.path.isfile('%s/%s.fail'%(root,file)): continue
lofjobs.append('%s/%s'%(os.path.abspath(root),file))
print 'Submitting %d jobs from directory %s'%(len(lofjobs),dir)
submit_jobs(lofjobs)
if options.monitor == 'check':
failjobs = []
runjobs = []
donejobs = []
number_of_jobs = 0
for root,dirs,files in os.walk(dir):
for file in fnmatch.filter(files,'*.sh'):
if os.path.isfile('%s/%s.fail'%(root,file)): failjobs.append('%s/%s'%(root,file))
if os.path.isfile('%s/%s.done'%(root,file)):
if not '%s.sh'%file in failjobs : donejobs.append('%s/%s'%(root,file))
if os.path.isfile('%s/%s.run'%(root,file)): runjobs.append('%s/%s'%(root,file))
number_of_jobs+=1
print 'Status of jobs directory ', dir
print ' Total of %d jobs'%number_of_jobs
print ' %d in status Fail -> (resub them with --monitor resub)'%len(failjobs)
for job in failjobs : print '\t FAIL %s'%job
print ' %d in status Running -> '%len(runjobs)
for job in runjobs : print '\t RUN %s'%job
print ' %d in status Done -> '%len(donejobs)
for job in donejobs : print '\t DONE %s'%job
print "\n %d/%d Running, %d/%d Done, %d/%d Failed (resub with --monitor resub)"\
%(len(runjobs),number_of_jobs,len(donejobs),number_of_jobs,len(failjobs),number_of_jobs)
if options.monitor == 'hadd': trawlHadd(dir)
sys.exit('Finished Monitor -- %s'%options.monitor)
def parse_to_dict(l_list):
if len(l_list)<1: return {}
ret = {}
nkey = 0
for item in l_list:
vargs = item.split(':') # should put a try here
ni = vargs[0]
varg = vargs[1:]
varg = ":".join(varg)
if not '-' in ni:
ni = int(ni)
nkey+=1
if not "[" in item and "<" not in item:
ret[(ni)]=['',[varg]]
else :
if "[" in varg:
varg = varg.replace("[","")
varg = varg.replace("]","")
min,max,step = varg.split(",")
ret[(ni)] = ['',arange(int(min),int(max),int(step))]
elif "<" in varg:
varg = varg.replace("<","")
varg = varg.replace(">","")
largs = varg.split(",")
ret[(ni)] = ['',largs]
iskey = 0
for kr in ret.keys():
if type(kr)==type(''):
ll = ret.pop(kr)
ll[1] = [kr+' '+str(l) for l in ll[1]]
ret[nkey+iskey]=ll
iskey+=1
return ret
def getFilesJob(dirin,job,tnjobs):
if tnjobs == 1 :
tnjobs = -1
job = 0
infiles = []
if "," in dirin : alldirs = dirin.split(',')
else : alldirs=[dirin]
infiles = []
for dir in alldirs:
if '/store/' in dir : infiles.extend(makeCaFiles(dir,options.blacklist,tnjobs,job))
else : infiles.extend(makeFiles(dir,options.blacklist,tnjobs,job))
if options.verbose: print "VERB -- Found following files for dir %s --> "%dir, infiles
return infiles
def getArgsJob(interationsobject,job_id,njobs):
injobs = []
# nf = 0
ifile = 0
for ff in iterationsobject:
if (njobs > 0) and (ifile % njobs != job_id):
injobs.append((ff,False))
else:
injobs.append((ff,True))
ifile += 1
return injobs
# -- MAIN
os.system('mkdir -p %s/%s'%(cwd,options.outdir))
mindeces = []
analyzer = args[0]
outfile = args[1]
analyzer_args = parse_to_dict(options.args)
if options.passSumEntries:
pos,treenam = options.passSumEntries.split(":")
numEntries = 0
if options.directory :
files = getFilesJob(options.directory.split(":")[1],0,-1)
for fi in files:
if options.verbose : print "VERB -- Accessing file %s, and checking for tree %s"%(fi[0],treenam)
tf = r.TFile.Open(fi[0])
try :
tf.IsOpen()
except:
continue
hf = tf.Get(treenam) # first try to see if its a histogram
try:
numEntries+=int(hf.GetEntries())
except:
try:
numEntries+=int(getattr(tf,treenam).GetEntries())
except:
continue
else: numEntries = -1;
if options.verbose : print "VERB -- Sum entries for jobs = %d, being passed to argument %s"%(numEntries,pos)
analyzer_args[int(pos)]=['',[int(numEntries)]]
if options.passSumWeights:
pos,treenam,branch = options.passSumWeights.split(":")
sumWeights = 0
if options.directory :
files = getFilesJob(options.directory.split(":")[1],0,-1)
tmpH = r.TH1F("htmp","htmp",1,0,1)
for i,fi in enumerate(files):
if options.verbose : print "VERB -- Accessing file %s, and checking for tree %s"%(fi[0],treenam)
tf = r.TFile.Open(fi[0])
try :
tf.IsOpen()
except:
print "WARNING - Cannot open file - %s "%(fi[0])
continue
try:
#numEntries+=int(getattr(tf,treenam).GetEntries())
tmpH_t = r.TH1F("htmp_%d"%i,"htmp",1,0,1)
tr = tf.Get(treenam)
tr.Draw("0.5>>htmp_%d"%(i),"%s"%(branch))
tmpH.Add(tmpH_t)
except:
print "WARNING - NO TREE %s for sumWeights :( "%(treenam)
continue
sumWeights = tmpH.Integral()
else: sys.exit("Lazy!, --passSumWeights only with directory option")
if options.verbose : print "VERB -- Sum weights for jobs = %d, being passed to argument %s"%(sumWeights,pos)
analyzer_args[int(pos)]=['',[float(sumWeights)]]
exec_line = '%s'%analyzer
if options.directory :
filepos,options.directory = options.directory.split(':')
#analyzer_args[int(filepos)]=['',"KEYWORD_BACONFILEINPUT"]
if not "-" in filepos: analyzer_args[int(filepos)]=['',"KEYWORD_BACONFILEINPUT"]
else: analyzer_args[0] = ['',"%s KEYWORD_BACONFILEINPUT"%filepos]
#print analyzer_args
# NEED TO ITERATE OF MAP OF ARGS, FORGET DEFAULT ARGGS I THINK, forec them set!!!!!
#for arg_i,arg in enumerate(default_args):
sortedkeys = analyzer_args.keys()
if len(sortedkeys): sortedkeys.sort()
for key in sortedkeys:
# if arg_i in analyzer_args.keys():
arg = analyzer_args[key][1]
if 'KEYWORD_BACONFILEINPUT' in arg:
exec_line+= " %s "%arg
elif len(arg)>1:
mindeces.append(key)
exec_line+= ' MULTARG_%d '%key
else: exec_line+=' %s '%arg[0]
# check that from max to 0 all arguments are accounted for (could always add defaults above) !
for arg_c in range(0,max(analyzer_args.keys())):
if arg_c not in analyzer_args.keys(): sys.exit("ERROR -- missing argument %d"%arg_c)
if not options.dryRun and njobs > 0:
print ' Writing %d Submission Scripts to %s (submit after with --monitor sub)'%(njobs,options.outdir)
print ' will use executable -- (default call pattern shown) \n\t%s'%exec_line
for job_i in range(njobs):
################################ WHY does this need to be recreate?
# This must be the sorted set of keys from the dictionary to build the iterations
listoflists = [analyzer_args[k][1] for k in sortedkeys ]
#itertools section, make object containing all args to be considered
#i.e it iterates over all combinations of arguments in the args list
iterationsobject = product(*listoflists)
################################
if options.directory: files = getFilesJob(options.directory,job_i,njobs)
else: files = getArgsJob(iterationsobject,job_i,njobs)# use itertools to split up any arglists into jobs
#else: files=[]
job_exec = ''
nfiles_i = 0
for fil_i,fil in enumerate(files):
#if options.directory :
if not fil[1]: continue
if options.directory: exec_line_i = exec_line.replace('KEYWORD_BACONFILEINPUT'," "+fil[0]+" ")
else:
exec_line_i = exec_line
for i,m in enumerate(fil[0]): # no defaults so guarantee (make the check) that all of the args are there)
exec_line_i = exec_line_i.replace(" MULTARG_%d "%i," "+str(m)+" " ) #LIST OVER iterated arguments and produce and replace MULTIARG_i with arguemnt at i in list ?
job_exec+=exec_line_i+'; mv -v %s %s/%s_job%d_file%d.root; '%(outfile,options.outdir,outfile.strip("*?,"),job_i,fil_i)
nfiles_i += 1
if options.verbose: print "VERB -- job exec line --> ",job_exec
if options.dryRun :
print ' will produce job %d/%d -> '%(job_i+1,njobs), job_exec
elif options.njobs > 0:
write_job(job_exec, options.outdir, analyzer, job_i, nfiles_i)
else:
print "Running: ", job_exec
os.system(job_exec)