forked from saltstack/pr-commands
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprcommands.py
183 lines (159 loc) · 4.97 KB
/
prcommands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import os
import json
import re
import requests
import pprint
import logging
import time
import functools
import hmac
import hashlib
log = logging.getLogger(__name__)
uri = os.environ.get('JENKINS_URI', 'https://jenkinsci.saltstack.com/api/json')
user = os.environ['JENKINS_USER']
password = os.environ['JENKINS_PASS']
github_secret = os.environ['GITHUB_SECRET']
log_level = os.environ.get('LOG_LEVEL', 'INFO')
class ValidationError(Exception):
'''
Raised when an invalid request is encountered
'''
def validate_github_request(signature, payload, secret=github_secret):
digest = hmac.new(secret.encode(), payload.encode(), hashlib.sha1).hexdigest()
if digest == signature:
return True
raise ValidationError("Signature did not validate")
def timedcache(method, timeout=300):
'''
Cache the return value of a function for the the specified amount of
seconds.
'''
argsmap = {}
@functools.wraps(method)
def wrapper(*args, **kwargs):
key = (repr(args), repr(kwargs))
if key not in argsmap:
value = method(*args, **kwargs)
argsmap[key] = (value, time.time())
elif time.time() - argsmap[key][1] >= timeout:
value = method(*args, **kwargs)
argsmap[key] = (value, time.time())
else:
value = argsmap[key][0]
return value
return wrapper
def parse_body(body):
'''
Parse the body of a github issue comment and look for 're-run' test
commands.
'''
for line in body.lower().split('\n'):
words = line.split()
try:
idx = words.index('re-run')
except ValueError:
continue
if words[idx+1] == 'full':
yield words[idx:idx+3]
else:
yield words[idx:idx+2]
def get_pr_jobs():
'''
Get all Jenkins jobs associated with pull requests
'''
res = requests.get(
'https://jenkinsci.saltstack.com/view/Pull%20Requests/api/json',
headers={'accept': 'application/json'},
auth=requests.auth.HTTPBasicAuth(user, password),
)
if res.status_code != 200:
raise RuntimeError("Received non 200 status code from jenkins")
data = res.json()
for job in data['jobs']:
yield job
# TODO: This won't work until all branches have the params config merged
# forward?
@timedcache
def job_has_params(job_url):
res = requests.get(
'{}/api/json'.format(job_url.rstrip('/'))
)
if res.status_code != 200:
raise RuntimeError("Received non 200 status code from jenkins")
data = res.json()
data['jobs'][-1]['url']
res = requests.get(
'{}/api/json'.format(data['jobs'][-1]['url'])
)
if res.status_code != 200:
raise RuntimeError("Received non 200 status code from jenkins")
data = res.json()
for d in data['property']:
if d['_class'] == 'hudson.model.ParametersDefinitionProperty':
return True
return False
def job_has_params(job_url):
'''
Determin weather a Jenkins job accepts build parameters
'''
name = job_url.rstrip('/').rsplit('/')[-1]
if name in ('pr-doc', 'pr-lint',):
return False
else:
return True
def filter_jobs(jobs, keyword):
'''
Filter jobs by a keyword. When the keyword is 'all' every job is returned
'''
for job in jobs:
if keyword == 'all':
yield job
elif job['name'].find(keyword) != -1:
yield job
def build_job(job_url, pr_number, run_full, has_params):
'''
Request jenkins to build a job
'''
log.debug("job_url: %s pr_num: %s run_full: %s has_params: %s",
job_url, pr_number, run_full, has_params)
if has_params:
pr_url = '{}/job/PR-{}/buildWithParameters?runFull={}'.format(
job_url.rstrip('/'),
pr_number,
'true' if run_full else 'false'
)
else:
pr_url = '{}/job/PR-{}/build'.format(
job_url.rstrip('/'),
pr_number,
)
res = requests.get(
'https://jenkinsci.saltstack.com/crumbIssuer/api/json',
auth=requests.auth.HTTPBasicAuth(user, password),
)
if res.status_code != 200:
raise Exception("Jenkins returned non 200 response")
data = res.json()
res = requests.post(
pr_url,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
data['crumbRequestField']: data['crumb']
},
auth=requests.auth.HTTPBasicAuth(user, password),
)
if res.status_code == 201:
log.info("Build started: %s", pr_url)
else:
log.info("Build request received non 201 status: %s", res.status_code)
def run_cmd(cmd, pr_number):
'''
Run a PR command
'''
for job in filter_jobs(get_pr_jobs(), cmd[-1]):
has_params = job_has_params(job['url'])
if cmd[1] == 'full':
run_full = True
else:
run_full = False
build_job(job['url'], pr_number, run_full, has_params)