-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathload_test.py
228 lines (199 loc) · 7.19 KB
/
load_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import sys
import os.path
sys.path.append("../")
from pathlib import Path
import os
import random
import uuid
import requests
import argparse
from app.main import ENDPOINTS
from enum import Enum
from utils import to_oauth_request_form
from concurrent.futures import ThreadPoolExecutor, as_completed
BASE_PATH = Path(__file__).parent.resolve()
class RequestType(Enum):
DEFAULT = 0
REGISTER = 1
LOGIN = 2
PROTECTED_ROOT = 3
FIT = 4
PREDICT = 5
# Fixed User Credentials
user_credentials_template = {
"password": "Test1@123",
"password_confirm": "Test1@123",
"validation_link": "tabpfn",
}
def parse_args():
"""
Parse command line arguments
1. Server URL
2. Number of total users
- default: 10 users
3. Number of parallel requests
- default: 10 users hitting the server at the same time.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--server_url",
type=str,
default="https://tabpfn-server-wjedmz7r5a-ez.a.run.app",
help="Server URL",
)
parser.add_argument("--num_users", type=int, default=10, help="Number of Users")
parser.add_argument(
"--num_requests", type=int, default=10, help="Number of parallel requests"
)
return parser.parse_args()
def generate_user_email_address():
"""
Generate a random email address for a user:
This can generate around 16^32 unique email addresses as of now and
can be increased by increasing the length of the UUID. So the probability
of generating the same email address for the second time is very low.
Approximately, there would be a possibility of
1 - (16^32! / (16^32^100 * (16^32 - 100)!)) for atleast 1 collision for
100 generated UUIDs which is significantly low due to increasing combinations
"""
random_uuid = uuid.uuid4().hex.replace("%", "")
domain_name = random.choice(["gmail.com", "outlook.org", "yahoo.in", "hotmail.com"])
random_email = "user" + f"{random_uuid}@{domain_name}"
return random_email
def api_request(url, data=None, headers=None, type=RequestType.DEFAULT, files=None):
"""
Make an API request to the server:
type:
1. Register API
2. Login API
3. Protected Root API
4. Fit API
5. Upload Test Set API / Predict API
"""
response = None
if type == RequestType.REGISTER:
response = requests.post(url, params=data)
elif type == RequestType.LOGIN:
response = requests.post(url, data=data)
elif type == RequestType.PROTECTED_ROOT:
response = requests.get(url, headers=headers)
elif type == RequestType.FIT:
response = requests.post(url, headers=headers, files=files)
elif type == RequestType.PREDICT:
response = requests.post(url, headers=headers, params=data, files=files)
else:
raise NotImplementedError(f"Response type {type} is not implemented")
return response.json()
def process_user(user, SERVER_URL):
"""
Process a user:
1. Register the user
2. Login the user
3. Access the protected root
4. Fit the model with train set
5. Upload a test set
6. Predict on the test set
Inputs:
user: User ID: 1, 2, 3, ...
"""
email = generate_user_email_address()
# Combine user data with user_credentials_template and unique email
user_data = {"email": email, **user_credentials_template, user: user}
# Register API: Registers a new user
register_api = SERVER_URL + ENDPOINTS.register.path
_ = api_request(register_api, user_data, type=RequestType.REGISTER)
# print(response_register)
# Login API: Authenticates the user JUST REGISTERED
request_data = to_oauth_request_form(email, user_credentials_template["password"])
login_api = SERVER_URL + ENDPOINTS.login.path
response_login = api_request(login_api, request_data, type=RequestType.LOGIN)
# print(response_login)
# Protected Root API: Access the protected root
user_access_token = response_login["access_token"]
headers = {"Authorization": f"Bearer {user_access_token}"}
protected_api = SERVER_URL + ENDPOINTS.protected_root.path
_ = api_request(
protected_api, headers=headers, type=RequestType.PROTECTED_ROOT
)
# print(response_protected)
# Fit API: Train the model with train set
fit_api = SERVER_URL + ENDPOINTS.fit.path
## Get the path of the train set
x_train_path = os.path.join(BASE_PATH, "datasets", "X_train.csv")
y_train_path = os.path.join(BASE_PATH, "datasets", "y_train.csv")
files = {
"x_file": ("X_train.csv", open(x_train_path, "rb"), "application/csv"),
"y_file": ("y_train.csv", open(y_train_path, "rb"), "application/csv"),
}
fit_response = api_request(
fit_api,
headers=headers,
type=RequestType.FIT,
files=files,
)
# Upload Test Set API: Upload a test set
## Get the path of the train set
x_test_path = os.path.join(BASE_PATH, "datasets", "X_test.csv")
files_test = {"x_file": ("X_test.csv", open(x_test_path, "rb"), "application/csv")}
data = {"train_set_uid": fit_response["train_set_uid"]}
# THIS IS FOR INTERNAL USE ONLY
# upload_test_set_api = SERVER_URL + ENDPOINTS.upload_test_set.path
# upload_test_set_response = api_request(
# upload_test_set_api,
# data=data,
# headers=headers,
# type=RequestType.PREDICT,
# files=files_test
# )
# print(upload_test_set_response)
# USE ANY OF THE FOLLOWING 2 APIs: PREDCIT OR PREDICT PROBA
# Predict Proba API: Predict probabilities on the test set
# predict_proba_api = SERVER_URL + ENDPOINTS.predict_proba.path
# predict_response = api_request(
# predict_proba_api,
# data=data,
# headers=headers,
# type=RequestType.PREDICT,
# files=files_test
# )
# Predict API: Predict on the test set
predict_api = SERVER_URL + ENDPOINTS.predict.path
predict_response = api_request(
predict_api,
data=data,
headers=headers,
type=RequestType.PREDICT,
files=files_test,
)
# print(predict_response)
return predict_response
def main():
"""
Usage:
`python load_test.py --num_users=100 --num_requests=10 --server_url='http://0.0.0.0/'
"""
# Generate a list of parsed arguments
args = parse_args()
# Set the server URL
SERVER_URL = args.server_url
# Number of total users
num_users = args.num_users
# Generate a list of users
users = range(1, num_users + 1)
# Number of parallel requests
num_parallel_requests = args.num_requests
with ThreadPoolExecutor(max_workers=num_parallel_requests) as executor:
# Submit tasks for each user
futures = {
executor.submit(process_user, user, SERVER_URL): user for user in users
}
# Wait for all tasks to complete
for future in as_completed(futures):
user = futures[future]
try:
result = future.result()
print(f"User {user} processed successfully. Result: {result}")
except Exception as e:
print(f"Error processing user {user}: {e}")
if __name__ == "__main__":
main()