-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_concurrency.py
57 lines (47 loc) · 1.86 KB
/
test_concurrency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import subprocess
import time
import os
import concurrent.futures
import json
import sys
LINK = "https://autograder-service-ece440spring2024-619f12.apps.shift.nerc.mghpcc.org/"
# request with correct solution
def make_request1(uname_pwd):
for http_retries in range(2):
try:
response = subprocess.run(["curl", "-m", "120", "--user", uname_pwd, "-X", "POST", "-F", "[email protected]", LINK], capture_output=True, text=True)
response_json = json.loads(response.stdout)
score = response_json.get('score')
return score
except:
pass
return 'fail'
# bogus request
def make_request2(uname_pwd):
for http_retries in range(1):
try:
response = subprocess.run(["curl","-m", "120", "--user", uname_pwd, "-X", "POST", "-F", "submission.zip=@./standalone_vm/submission.zip", LINK], capture_output=True, text=True)
response_json = json.loads(response.stdout)
score = response_json.get('score')
return score
except:
pass
return 'fail'
def main(uname_pwd):
number_of_requests = 100
with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_requests) as executor:
# Create a list of futures
futures = []
for i in range(number_of_requests):
if i % 2 == 0: # Even step
futures.append(executor.submit(make_request2, uname_pwd))
else: # Odd step
futures.append(executor.submit(make_request1, uname_pwd))
# Wait for the futures to complete and get the results
results = [future.result() for future in concurrent.futures.as_completed(futures)]
# Print results
for result in results:
print(result)
if __name__ == "__main__":
uname_pwd = sys.argv[1]
main(uname_pwd)