-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsocket_drcom.py
70 lines (48 loc) · 1.63 KB
/
socket_drcom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from socket import *
import usocket as _socket
import os
import signal
import ffilib
alarm = ffilib.libc().func("I", "alarm", "I")
__socket = socket
del socket
log = lambda x: print(x)
_timeout_methods = ('accept', 'bind', 'connect', 'sendall', 'sendto',
'recvfrom')
class timeout(OSError):
pass
def timeout_handler(sig):
raise timeout()
signal.signal(14, timeout_handler)
class socketRecvfromFixed(__socket):
def __init__(self, *a, **b):
super().__init__(*a, **b)
def recvfrom(self, *a, **b):
s, addr = super().recvfrom(*a, **b)
addr = _socket.sockaddr(addr)
return (s, (_socket.inet_ntop(addr[0], addr[1]), addr[2]))
class socket(socketRecvfromFixed):
def __init__(self, *a, **b):
super().__init__(*a, **b)
self.killtimeout = 0
def setkilltimeout(self, timeout):
self.killtimeout = int(timeout)
def getkilltimeout(self):
return self.killtimeout
def _run_with_timeout(self, func, a, b):
alarm(self.killtimeout)
rst = func(*a, **b)
alarm(0)
return rst
def accept(self, *a, **b):
return self._run_with_timeout(super().accept, a, b)
def bind(self, *a, **b):
return self._run_with_timeout(super().bind, a, b)
def connect(self, *a, **b):
return self._run_with_timeout(super().connect, a, b)
def sendall(self, *a, **b):
return self._run_with_timeout(super().sendall, a, b)
def sendto(self, *a, **b):
return self._run_with_timeout(super().sendto, a, b)
def recvfrom(self, *a, **b):
return self._run_with_timeout(super().recvfrom, a, b)