forked from cms-sw/cms-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
es_relval_stats.py
executable file
·122 lines (118 loc) · 4.16 KB
/
es_relval_stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python
from sys import exit, argv
from commands import getstatusoutput
from os.path import isdir,basename,exists,join
import json
from datetime import datetime
from es_utils import send_payload
from cmsutils import cmsswIB2Week
from hashlib import sha1
import threading
from time import sleep
def percentile(percentage, data, dlen):
R=(dlen+1)*percentage/100.0
IR=int(R)
if IR>=dlen: return data[-1]
elif IR==0: return data[0]
FR=int((R-IR)*100)
res = data[IR-1]
if FR>0: res=(FR/100.0)*(data[IR]-res)+res
return res
def process(wfnum, s, sfile, hostname, exit_code, details=False):
global release, arch, rel_msec, week, ex_fields
try:
stats = json.load(open(sfile))
xdata = {}
for stat in stats:
for item in stat:
try: xdata[item].append(stat[item])
except:
xdata[item]=[]
xdata[item].append(stat[item])
stat["@timestamp"]=rel_msec+(stat["time"]*1000)
stat["release"]=release
stat["step"]=s
stat["workflow"]=wfnum
stat["architecture"]=arch
idx = sha1(release + arch + wfnum + s + str(stat["time"])).hexdigest()
del stat["time"]
if details:
try:send_payload("relvals_stats_details-"+week,"runtime-stats-details",idx,json.dumps(stat))
except Exception as e: print e
print "Working on ",release, arch, wfnum, s, len(stats)
sdata = {"release":release, "architecture":arch, "step":s, "@timestamp":rel_msec, "workflow":wfnum, "hostname":hostname, "exit_code":exit_code}
for x in xdata:
data = sorted(xdata[x])
if x in ["time","num_threads","processes","num_fds"]:
sdata[x]=data[-1]
continue
if not x in ex_fields: continue
dlen = len(data)
for t in ["min", "max", "avg", "median", "25", "75"]: sdata[x+"_"+t]=0
if dlen>0:
sdata[x+"_min"]=data[0]
sdata[x+"_max"]=data[-1]
if dlen>1:
dlen2=int(dlen/2)
if (dlen%2)==0: sdata[x+"_median"]=int((data[dlen2-1]+data[dlen2])/2)
else: sdata[x+"_median"]=data[dlen2]
sdata[x+"_avg"]=int(sum(data)/dlen)
for t in [25, 75]:
sdata[x+"_"+str(t)]=int(percentile(t,data, dlen))
else:
for t in ["25", "75", "avg", "median"]:
sdata[x+"_"+t]=data[0]
idx = sha1(release + arch + wfnum + s + str(rel_sec)).hexdigest()
try:send_payload("relvals_stats_summary-"+week,"runtime-stats-summary",idx,json.dumps(sdata))
except Exception as e: print e
except Exception as e: print e
return
partial_log_dirpath=argv[1]
jobs=6
try: jobs=int(argv[2])
except: jobs=6
items = partial_log_dirpath.split("/")
if items[-1]!="pyRelValPartialLogs": exit(1)
release=items[-2]
arch=items[-6]
week, rel_sec = cmsswIB2Week(release)
rel_msec = rel_sec*1000
ex_fields=["rss", "vms", "pss", "uss", "shared", "data", "cpu"]
e, o = getstatusoutput("ls -d %s/*" % partial_log_dirpath)
threads = []
for wf in o.split("\n"):
if not isdir(wf): continue
if exists(join(wf,"wf_stats.done")): continue
wfnum = basename(wf).split("_",1)[0]
hostname=""
if exists(join(wf,"hostname")):
hostname=open(join(wf,"hostname")).read().split("\n")[0]
exit_codes={}
if exists(join(wf,"workflow.log")):
e, o = getstatusoutput("head -1 %s/workflow.log | sed 's|.* exit: *||'" % wf)
istep=0
for e in [ int(x) for x in o.strip().split(" ") if x ]:
istep+=1
exit_codes["step%s" % istep ] = e
e, o = getstatusoutput("ls %s/step*.log | sed 's|^.*/||'" % wf)
steps = {}
for log in o.split("\n"): steps[log.split("_")[0]]=""
e, o = getstatusoutput("ls %s/wf_stats-step*.json" % wf)
for s in o.split("\n"):
step = s.split("/wf_stats-")[1][:-5]
if step in steps: steps[step]=s
for s in steps:
sfile =steps[s]
if sfile=="": continue
exit_code=-1
if s in exit_codes: exit_code = exit_codes[s]
while True:
threads = [t for t in threads if t.is_alive()]
if(len(threads) >= jobs):sleep(0.5)
else: break
t = threading.Thread(target=process, args=(wfnum, s, sfile, hostname, exit_code))
t.start()
threads.append(t)
getstatusoutput("touch %s" % join(wf,"wf_stats.done"))
print "Active Threads:",len(threads)
for t in threads: t.join()