forked from Conflux-Chain/conflux-rust
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththrottle_test.py
executable file
·107 lines (88 loc) · 3.5 KB
/
throttle_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python3
import os
import time
from jsonrpcclient.exceptions import ReceivedErrorResponseError
from test_framework.test_framework import ConfluxTestFramework
from conflux.rpc import RpcClient
class ThrottleRpcTests(ConfluxTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.conf_parameters = {
"throttling_conf": "\"../throttling.toml\""
}
def setup_chain(self):
# prepare throttling configuratoin file
throttle_conf = os.path.join(self.options.tmpdir, "throttling.toml")
# Note: Should not use cfx_getBestBlockHash for this test, because it has been called to start a node.
with open(throttle_conf, "w") as fp:
fp.write("[rpc_local]\n")
fp.write("cfx_epochNumber=\"300,200,1,100,1\"\n")
fp.write("cfx_gasPrice=\"5,5,2,1,0\"\n")
# In python tests, we will only call the local RPC interface,
# just add the public rpc section as a placeholder.
fp.write("[rpc]")
self.conf_parameters["throttling_conf"] = "'{}'".format(throttle_conf)
ConfluxTestFramework.setup_chain(self)
def setup_network(self):
self.setup_nodes()
def run_test(self):
client = RpcClient(self.nodes[0])
self.test_throttled(client)
self.test_recharged(client)
def test_throttled(self, client):
# allow 2 times
assert client.epoch_number() == 0
assert client.epoch_number() == 0
# throttled
try:
client.epoch_number()
assert "should be throttled"
except ReceivedErrorResponseError as e:
assert e.response.code == -32072
assert e.response.data.startswith("throttled in ")
# allow to tolerate 1 time even throttled
try:
client.epoch_number()
assert "should be throttled"
except ReceivedErrorResponseError as e:
assert e.response.code == -32072
assert e.response.data.startswith("throttled in ")
# already throttled
try:
client.epoch_number()
assert "should be throttled"
except ReceivedErrorResponseError as e:
assert e.response.code == -32072
assert e.response.data == "already throttled, please try again later"
def test_recharged(self, client):
# allow 5 times
for _ in range(5):
assert client.gas_price() is not None
# throttled
try:
client.gas_price()
assert "should be throttled"
except ReceivedErrorResponseError as e:
assert e.response.code == -32072
assert e.response.data.startswith("throttled in ")
# do not tolerate once throttled
try:
client.gas_price()
assert "should be throttled"
except ReceivedErrorResponseError as e:
assert e.response.code == -32072
assert e.response.data == "already throttled, please try again later"
# sleep 1 second to recharge tokens
time.sleep(1)
# 2 tokens recharged
assert client.gas_price() is not None
assert client.gas_price() is not None
# throttled again
try:
client.gas_price()
assert "should be throttled"
except ReceivedErrorResponseError as e:
assert e.response.code == -32072
assert e.response.data.startswith("throttled in ")
if __name__ == "__main__":
ThrottleRpcTests().main()