-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathsock.lua
2926 lines (2528 loc) · 80.1 KB
/
sock.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
--[=[
Portable async socket API for Windows (IOCP) and Linux (epoll).
Written by Cosmin Apreutesei. Public Domain.
TLS support in sock_libtls.lua.
ADDRESS LOOKUP
[try_]getaddrinfo(...) -> ai look-up a hostname
ai:free() free the address list
ai:next() -> ai|nil get next address in list
ai:addrs() -> iter() -> ai iterate addresses
ai:type() -> s socket type: 'tcp', ...
ai:family() -> s address family: 'inet', ...
ai:protocol() -> s protocol: 'tcp', 'icmp', ...
ai:name() -> s cannonical name
ai:tostring() -> s formatted address
ai.addr -> sa first address object
sa:family() -> s address family: 'inet', ...
sa:port() -> n address port
sa:tostring() -> s 'ip:port'
sa:addr() -> ip IP address object
ip:tobinary() -> uint8_t[4|16], 4|16 IP address in binary form
ip:tostring() -> s IP address in string form
SOCKETS
tcp([opt], [family], [protocol]) -> tcp make a TCP socket
udp([opt], [family], [protocol]) -> udp make a UDP socket
rawsocket([opt], [family], [protocol]) -> raw make a raw socket
[try_]connect([tcp, ]host, port, [timeout]) -> tcp (create tcp socket and) connect
listen([tcp, ]host, port, ...) -> tcp (create tcp socket and) listen
s:socktype() -> s socket type: 'tcp', ...
s:family() -> s address family: 'inet', ...
s:protocol() -> s protocol: 'tcp', 'icmp', ...
s:[try_]close() send FIN and/or RST and free socket
s:closed() -> t|f check if the socket is closed
s:onclose(fn) exec fn after the socket is closed
s:[try_]bind([host], [port], [aflags]) bind socket to an address
s:[try_]bind(sa) bind socket to an address
s:[try_]bind('unix:FILE') bind socket to a unix domain socket
s:[try_]setopt(opt, val) set socket option (`'so_*'` or `'tcp_*'`)
s:[try_]getopt(opt) -> val get socket option
tcp|udp:[try_]connect(host, port, [aflags], ...) connect to an address
tcp:[try_]send(s|buf, [len]) -> true send bytes to connected address
udp:[try_]send(s|buf, [len]) -> len send bytes to connected address
tcp|udp:[try_]recv(buf, maxlen) -> len receive bytes
tcp:[try_]listen([backlog, ]host, port, [onaccept], [aflags]) put socket in listening mode
tcp:[try_]accept() -> ctcp | nil,err,[retry] accept a client connection
tcp:[try_]recvn(buf, n) -> buf, n receive n bytes
tcp:[try_]recvall() -> buf, len receive until closed
tcp:[try_]recvall_read() -> read make a buffered read function
udp:[try_]sendto(host, port, s|buf, [len], [aflags]) -> len send a datagram to an address
udp:[try_]recvnext(buf, maxlen, [flags]) -> len, sa receive the next datagram
tcp:[try_]shutdown(['r'|'w'|'rw']) send FIN
s:debug([protocol]) enable debugging
THREADS
thread(func[, fmt, ...]) -> co create a coroutine for async I/O
resume(thread, ...) resume thread
yield(...) -> ... safe yield (see [coro])
suspend() -> ... suspend thread
cowrap(f) -> wrapper see coro.safewrap()
currentthread() -> co, is_main current coroutine and whether it's the main one
threadstatus(co) -> s coroutine.status()
transfer(co, ...) -> ... see coro.transfer()
cofinish(co, ...) -> ... see coro.finish()
threadenv([co]) -> t get (current) thread's own enviornment
ownthreadenv([co], [create]) -> t get/create (current) thread's own environment
onthreadfinish(co, f) run `f(thread)` when thread finishes
SCHEDULER
poll([ignore_interrupts]) poll for I/O
start([ignore_interrupts]) keep polling until all threads finish
stop() stop polling
run(f, ...) -> ... run a function inside a thread
TIMERS
wait_job() -> sj make an interruptible async wait job
sj:wait_until(t) -> ... wait until clock()
sj:wait(s) -> ... wait for `s` seconds
sj:resume(...) resume the waiting thread
sj:cancel() calls sj:resume(sj.CANCEL)
sj.CANCEL magic arg that can cancel runat()
wait_until(t) -> ... wait until clock() value
wait(s) -> ... wait for s seconds
runat(t, f) -> sj run `f` at clock `t`
runafter(s, f) -> sj run `f` after `s` seconds
runevery(s, f) -> sj run `f` every `s` seconds
runagainevery(s, f) -> sj run `f` now and every `s` seconds afterwards
s:wait_job() -> sj wait job that is auto-canceled on socket close
s:wait_until(t) -> ... wait_until() on auto-canceled wait job
s:wait(s) -> ... wait() on auto-canceled wait job
THREAD SETS
threadset() -> ts
ts:thread(f, [fmt, ...]) -> co
ts:join() -> {{ok=,ret=,thread=},...}
MULTI-THREADING (WITH OS THREADS)
iocp([iocp_h]) -> iocp_h get/set IOCP handle (Windows)
epoll_fd([epfd]) -> epfd get/set epoll fd (Linux)
------------------------------------------------------------------------------
All function return `nil, err` on error (but raise on user error
or unrecoverable OS failure). Some error messages are normalized
across platforms, like 'access_denied' and 'address_already_in_use'
so they can be used in conditionals.
I/O functions only work inside threads created with `thread()`.
`host, port` args are passed to `getaddrinfo()` (with the optional `af` arg),
which means that an already resolved address can be passed as `ai, nil`
in place of `host, port`.
ADDRESS LOOKUP ---------------------------------------------------------------
getaddrinfo(...) -> ai
Look-up a hostname. Returns an "address info" object which is a OS-allocated
linked list of one or more addresses resolved with the system's `getaddrinfo()`.
The args can be either:
* an existing `ai` object which is passed through, or
* `host, port, [socket_type], [family], [protocol], [aflags]`, or
* `'unix:PATH', [socket_type]
where:
* `host` can be a hostname, ip address or `'*'` which means "all interfaces".
* `port` can be a port number, a service name or `0` which means "any available port".
* `'unix:PATH'` creates an object representing a unix domain socket.
* `socket_type` can be `'tcp'`, `'udp'`, `'raw'` or `0` (the default, meaning "all").
* `family` can be `'inet'`, `'inet6'` or `'unix'` or `0` (the default, meaning "all").
* `protocol` can be `'ip'`, `'ipv6'`, `'tcp'`, `'udp'`, `'raw'`, `'icmp'`,
`'igmp'` or `'icmpv6'` or `0` (the default is either `'tcp'`, `'udp'`
or `'raw'`, based on socket type).
* `aflags` are a `bor()` list of `passive`, `cannonname`,
`numerichost`, `numericserv`, `all`, `v4mapped`, `addrconfig`
which map to `getaddrinfo()` flags.
NOTE: `getaddrinfo()` is blocking! Use resolve() to resolve hostnames first!
SOCKETS ----------------------------------------------------------------------
tcp([opt], [family], [protocol]) -> tcp
Make a TCP socket. The default family is `'inet'`.
udp([opt], [family], [protocol]) -> udp
Make an UDP socket. The default family is `'inet'`.
rawsocket([opt], [family], [protocol]) -> raw`
Make a raw socket. The default family is `'inet'`.
s:[try_]close()
Close the connection and free the socket.
For TCP sockets, if 1) there's unread incoming data (i.e. recv() hasn't
returned 0 yet), or 2) `so_linger` socket option was set with a zero timeout,
then a TCP RST packet is sent to the client, otherwise a FIN is sent.
s:[try_]bind([host], [port], [af])
s:[try_]bind('unix:FILE')
Bind socket to an interface/port (which default to '*' and 0 respectively
meaning all interfaces and a random port).
s:setexpires(['r'|'w'], clock|nil)
s:settimeout(['r'|'w'], seconds|nil)
Set or clear the expiration clock for all subsequent I/O operations.
If the expiration clock is reached before an operation completes,
`nil, 'timeout'` is returned.
tcp|udp:[try_]connect(host, port, [af], ...)
Connect to an address, binding the socket to `('*', 0)` if not bound already.
For UDP sockets, this has the effect of filtering incoming packets so that
only those coming from the connected address get through the socket. Also,
you can call connect() multiple times (use `('*', 0)` to switch back to
unfiltered mode).
tcp:[try_]send(s|buf, [len], [flags]) -> true
Send bytes to the connected address.
Partial writes are signaled with `nil, err, writelen`.
Trying to send zero bytes is allowed but it's a no-op (doesn't go to the OS).
udp:[try_]send(s|buf, [len], [flags]) -> len
Send bytes to the connected address.
Empty packets (zero bytes) are allowed.
tcp|udp:[try_]recv(buf, maxlen, [flags]) -> len
Receive bytes from the connected address.
With TCP, returning 0 means that the socket was closed on the other side.
With UDP it just means that an empty packet was received.
tcp:[try_]listen([backlog, ]host, port, [onaccept], [af])
Put the socket in listening mode, binding the socket if not bound already
(in which case `host` and `port` args are ignored). The `backlog` defaults
to `1/0` which means "use the maximum allowed".
tcp:[try_]accept() -> ctcp | nil,err,[retry]
Accept a client connection. The connection socket has additional fields:
`remote_addr`, `remote_port`, `local_addr`, `local_port`.
A third return value indicates that the error is a network error and thus
the call be retried.
tcp:[try_]recvn(buf, len) -> true
Repeat recv until `len` bytes are received.
Partial reads are signaled with `nil, err, readlen`.
tcp:[try_]recvall() -> buf,len | nil,err,buf,len
Receive until closed into an accumulating buffer. If an error occurs
before the socket is closed, the partial buffer and length is returned after it.
tcp:recvall_read() -> read
Receive all data into a buffer and make a `read` function that consumes it.
Useful for APIs that require an input `read` function that cannot yield.
udp:[try_]sendto(host, port, s|buf, [maxlen], [flags], [af]) -> len
Send a datagram to a specific destination, regardless of whether the socket
is connected or not.
udp:[try_]recvnext(buf, maxlen, [flags]) -> len, sa
Receive the next incoming datagram, wherever it came from, along with the
source address. If the socket is connected, packets are still filtered though.
tcp:[try_]shutdown(['r'|'w'|'rw'])
Shutdown the socket for receiving, sending or both (default). Does not block.
Sends a TCP FIN packet to indicate refusal to send/receive any more data
on the connection. The FIN packet is only sent after all the current pending
data is sent (unlike RST which is sent immediately). When a FIN is received
recv() returns 0.
Calling close() without shutdown may send a RST (see the notes on `close()`
for when that can happen) which may cause any data that is pending either
on the sender side or on the receiving side to be discarded (that's how TCP
works: RST has that data-cutting effect).
Required for lame protocols like HTTP with pipelining: a HTTP server
that wants to close the connection before honoring all the received
pipelined requests needs to call `s:shutdown'w'` (which sends a FIN to
the client) and then continue to receive (and discard) everything until
a recv that returns 0 comes in (which is a FIN from the client, as a reply
to the FIN from the server) and only then it can close the connection without
messing up the client.
SCHEDULING -------------------------------------------------------------------
Scheduling is based on synchronous coroutines provided by coro.lua which
allows coroutine-based iterators that perform socket I/O to be written.
thread(func[, fmt, ...]) -> co
Create a coroutine for performing async I/O. The coroutine must be resumed
to start. When the coroutine finishes, the control is transfered to
the I/O thread (the thread that called `start()`).
Full-duplex I/O on a socket can be achieved by performing reads in one thread
and writes in another.
resume(thread, ...)
Resume a thread, which means transfer control to it, but also temporarily
change the I/O thread to be this thread so that the first suspending call
(send, recv, wait, suspend, etc.) gives control back to this thread.
_This_ is the trick to starting multiple threads before starting polling.
suspend() -> ...
Suspend current thread, transfering to the polling thread (but see resume()).
poll([ignore_interrupts]) -> true | false,'empty'
Poll for the next I/O event and resume the coroutine that waits for it.
start([ignore_interrupts])
Start polling. Stops when no more I/O or `stop()` was called.
stop()
Tell the loop to stop dequeuing and return.
wait_until(t)
Wait until a clock() value without blocking other threads.
wait(s) -> ...
Wait `s` seconds without blocking other threads.
wait_job() -> sj
Make an interruptible waiting job. Put the current thread wait using
`sj:wait()` or `sj:wait_until()` and then from another thread call
`sj:resume()` to resume the waiting thread. Any arguments passed to
`sj:resume()` will be returned by `wait()`.
MULTI-THREADING --------------------------------------------------------------
iocp([iocp_handle]) -> iocp_handle
Get/set the global IOCP handle (Windows).
IOCPs can be shared between OS threads and having a single IOCP for all
threads (as opposed to having one IOCP per thread/Lua state) enables the
kernel to better distribute the completion events between threads.
To share the IOCP with another Lua state running on a different thread,
get the IOCP handle with `iocp()`, copy it over to the other state,
then set it with `iocp(copied_iocp)`.
epoll_fd([epfd]) -> epfd
Get/set the global epoll fd (Linux).
Epoll fds can be shared between OS threads and having a single epfd for all
threads is more efficient for the kernel than having one epfd per thread.
To share the epfd with another Lua state running on a different thread,
get the epfd with `epoll_fd()`, copy it over to the other state,
then set it with `epoll_fd(copied_epfd)`.
]=]
if not ... then require'sock_test'; return end
require'glue'
require'heap'
local coro = require'coro'
coro.live = live
coro.pcall = pcall
local
assert, isstr, clock, max, abs, min, bor, band, cast, u8p, fill, str, errno =
assert, isstr, clock, max, abs, min, bor, band, cast, u8p, fill, str, errno
local coro_create = coro.create
local coro_safewrap = coro.safewrap
local coro_transfer = coro.transfer
local coro_finish = coro.finish
do
local debug_getinfo = debug.getinfo
local string_format = string.format
local function trace_line(level, t)
local info = debug_getinfo(level + 1, 'nS')
local line = string_format('%s: %d',
info.source:match('[^\\/]-$'), info.linedefined)
t[line] = (t[line] or 0) + 1
return line
end
local coro_create0 = coro_create
local coro_safewrap0 = coro_safewrap
local counts --{line->count}
local threads --{line->{thread->true}}
local weak_keys
local function trace_coro_at(line, th)
local t = threads[line]
if not t then
t = setmetatable({}, weak_keys)
threads[line] = t
end
t[th] = true
end
function trace_coro()
counts = {}
threads = {}
weak_keys = {__mode = 'k'}
function coro_create(...)
local line = trace_line(3, counts)
local th = coro_create0(...)
trace_coro_at(line, th)
return th
end
function coro_safewrap(...)
local line = trace_line(3, counts)
local f, th = coro_safewrap0(...)
trace_coro_at(line, th)
return f, th
end
end
function coro_counts()
local all = cat(imap(sort(keys(counts), function(k1, k2)
if counts[k1] == counts[k2] then
return k1 < k2
end
return counts[k1] < counts[k2]
end), function(k)
return format('%5d %s', counts[k], k)
end), '\n')
local live = cat(imap(sort(keys(threads), function(k1, k2)
local n1 = count(threads[k1])
local n2 = count(threads[k2])
if n1 == n2 then
return k1 < k2
end
return n1 < n2
end), function(k)
return format('%5d %s', count(threads[k]), k)
end), '\n')
return all, live
end
end
assert(Windows or Linux or OSX, 'unsupported platform')
local C = Windows and ffi.load'ws2_32' or C
local socket = {debug_prefix = 'S'} --common socket methods
local tcp = {type = 'tcp_socket'}
local udp = {type = 'udp_socket'}
local raw = {type = 'raw_socket'}
local wait_job_class = {type = 'wait_job', debug_prefix = 'W'}
function issocket(s)
local mt = getmetatable(s)
return istab(mt) and rawget(mt, 'issocket') or false
end
--forward declarations
local check, _poll, wait_io, cancel_wait_io, create_socket, wrap_socket, waiting
--NOTE: close() returns `false` on error but it should be ignored.
function socket:try_close()
if not self.s then return true end
_sock_unregister(self)
local ok, err = self:_close()
local ps = self.listen_socket
if ps then
ps.n = ps.n - 1
ps.sockets[self] = nil
end
if self.sockets then --close all accepted sockets if any.
for s in pairs(self.sockets) do
s:try_close()
end
assert(isempty(self.sockets))
end
if self._after_close then
self:_after_close()
end
if not ok then return false, err end
log('', 'sock', 'closed', '%-4s r:%d w:%d%s', self, self.r, self.w,
self.n and ' live:'..self.n or '')
live(self, nil)
return true
end
function socket:closed()
return not self.s
end
function socket:onclose(fn)
after(self, '_after_close', fn)
end
--getaddrinfo() --------------------------------------------------------------
cdef[[
struct sockaddr_in {
short family_num;
uint8_t port_bytes[2];
uint8_t ip_bytes[4];
char _zero[8];
};
struct sockaddr_in6 {
short family_num;
uint8_t port_bytes[2];
unsigned long flowinfo;
uint8_t ip_bytes[16];
unsigned long scope_id;
};
struct sockaddr_un {
short family_num;
char path[108];
};
typedef struct sockaddr {
union {
struct {
short family_num;
uint8_t port_bytes[2];
};
struct sockaddr_in addr_in;
struct sockaddr_in6 addr_in6;
struct sockaddr_un addr_un;
};
} sockaddr;
]]
local sockaddr_ct = ctype'sockaddr'
-- working around ABI blindness of C programmers...
if Windows then
cdef[[
struct addrinfo {
int flags;
int family_num;
int socktype_num;
int protocol_num;
size_t addrlen;
char *name_ptr;
struct sockaddr *addr;
struct addrinfo *next_ptr;
struct sockaddr addrs[?];
};
]]
else
cdef[[
struct addrinfo {
int flags;
int family_num;
int socktype_num;
int protocol_num;
size_t addrlen;
struct sockaddr *addr;
char *name_ptr;
struct addrinfo *next_ptr;
struct sockaddr addrs[?];
};
]]
end
cdef[[
int getaddrinfo(const char *node, const char *service,
const struct addrinfo *hints, struct addrinfo **res);
void freeaddrinfo(struct addrinfo *);
]]
local socketargs
do
local families = {
inet = Windows and 2 or Linux and 2,
inet6 = Windows and 23 or Linux and 10,
unix = Linux and 1,
}
local family_map = index(families)
local AF_INET = families.inet
local AF_INET6 = families.inet6
local AF_UNIX = families.unix
local socket_types = {
tcp = Windows and 1 or Linux and 1,
udp = Windows and 2 or Linux and 2,
raw = Windows and 3 or Linux and 3,
}
local socket_type_map = index(socket_types)
local protocols = {
ip = Windows and 0 or Linux and 0,
icmp = Windows and 1 or Linux and 1,
igmp = Windows and 2 or Linux and 2,
tcp = Windows and 6 or Linux and 6,
udp = Windows and 17 or Linux and 17,
raw = Windows and 255 or Linux and 255,
ipv6 = Windows and 41 or Linux and 41,
icmpv6 = Windows and 58 or Linux and 58,
}
local protocol_map = index(protocols)
local flag_bits = {
passive = Windows and 0x00000001 or 0x0001,
cannonname = Windows and 0x00000002 or 0x0002,
numerichost = Windows and 0x00000004 or 0x0004,
numericserv = Windows and 0x00000008 or 0x0400,
all = Windows and 0x00000100 or 0x0010,
v4mapped = Windows and 0x00000800 or 0x0008,
addrconfig = Windows and 0x00000400 or 0x0020,
}
local default_protocols = {
[socket_types.tcp] = protocols.tcp,
[socket_types.udp] = protocols.udp,
[socket_types.raw] = protocols.raw,
}
function socketargs(socket_type, family, protocol)
local st = socket_types[socket_type] or socket_type or 0
local af = families[family] or family or 0
local pr = protocols[protocol] or protocol
or (af ~= AF_UNIX and default_protocols[st]) or 0
return st, af, pr
end
local hints = new('struct addrinfo', 0)
local addrs = new'struct addrinfo*[1]'
local addrinfo_ct = ctype'struct addrinfo'
local getaddrinfo_error
if Windows then
function getaddrinfo_error()
return check()
end
else
cdef'const char *gai_strerror(int ecode);'
function getaddrinfo_error(err)
return nil, str(C.gai_strerror(err))
end
end
function try_getaddrinfo(host, port, socket_type, family, protocol, flags)
if host == '*' then host = '0.0.0.0' end --all.
if host:starts'unix:' then
local ai = addrinfo_ct(1)
local sa = ai.addrs[0]
local path = host:sub(6)
sa.family_num = AF_UNIX
assert(#path < sizeof(sa.addr_un.path))
copy(sa.addr_un.path, path)
ai.socktype_num = socket_types[socket_type] or socket_type or 0
ai.family_num = AF_UNIX
ai.addrlen = sizeof(hints)
ai.addr = sa
return ai, true --second retval is to prevent calling free() on it
elseif isctype(addrinfo_ct, host) then
return host, true --pass-through and return "not owned" flag
elseif istab(host) then
local t = host
host, port, family, socket_type, protocol, flags =
t.host, t.port or port, t.family, t.socket_type, t.protocol, t.flags
end
assert(host, 'host required')
assert(port, 'port required')
fill(hints, sizeof(hints))
hints.socktype_num, hints.family_num, hints.protocol_num
= socketargs(socket_type, family, protocol)
hints.flags = flags and bitflags(flags, flag_bits, nil, true) or 0
local ret = C.getaddrinfo(host, port and tostring(port), hints, addrs)
if ret ~= 0 then return getaddrinfo_error(ret) end
return gc(addrs[0], C.freeaddrinfo)
end
function getaddrinfo(...)
return assert(try_getaddrinfo(...))
end
local ai = {}
function ai:free()
gc(self, nil)
C.freeaddrinfo(self)
end
function ai:next(ai)
local ai = ai and ai.next_ptr or self
return ai ~= nil and ai or nil
end
function ai:addrs()
return ai.next, self
end
function ai:type () return socket_type_map[self.socktype_num] end
function ai:family () return family_map [self.family_num ] end
function ai:protocol () return protocol_map [self.protocol_num] end
function ai:name () return str(self.name_ptr) end
function ai:tostring () return self.addr:tostring() end
local sa = {}
function sa:family() return family_map[self.family_num] end
function sa:port()
if self.family_num == AF_INET or self.family_num == AF_INET6 then
return self.port_bytes[0] * 0x100 + self.port_bytes[1]
else
return 0
end
end
function sa:addr()
return self.family_num == AF_INET and self.addr_in
or self.family_num == AF_INET6 and self.addr_in6
or self.family_num == AF_UNIX and self.addr_un
or error'NYI'
end
function sa:tostring()
return self:addr():tostring()..(self:port() ~= 0 and ':'..self:port() or '')
end
metatype('struct sockaddr', {__index = sa})
local sa_in4 = {}
function sa_in4:tobinary()
return self.ip_bytes, 4
end
function sa_in4:tostring()
local b = self.ip_bytes
return format('%d.%d.%d.%d', b[0], b[1], b[2], b[3])
end
metatype('struct sockaddr_in', {__index = sa_in4})
local sa_in6 = {}
function sa_in6:tobinary()
return self.ip_bytes, 16
end
function sa_in6:tostring()
local b = self.ip_bytes
return format('%x:%x:%x:%x:%x:%x:%x:%x',
b[ 0]*0x100+b[ 1], b[ 2]*0x100+b[ 3], b[ 4]*0x100+b[ 5], b[ 6]*0x100+b[ 7],
b[ 8]*0x100+b[ 9], b[10]*0x100+b[11], b[12]*0x100+b[13], b[14]*0x100+b[15])
end
metatype('struct sockaddr_in6', {__index = sa_in6})
metatype(addrinfo_ct, {__index = ai})
function socket:socktype () return socket_type_map[self._st] end
function socket:family () return family_map [self._af] end
function socket:protocol () return protocol_map [self._pr] end
function socket:addr(host, port, flags)
return getaddrinfo(host, port, self._st, self._af, self._pr, flags)
end
local sa_un = {}
function sa_un:tostring()
return str(self.path)
end
metatype('struct sockaddr_un', {__index = sa_un})
end
--Winsock2 & IOCP ------------------------------------------------------------
if Windows then
cdef[[
// required types from `winapi.types` ----------------------------------------
typedef unsigned long ULONG;
typedef unsigned long DWORD;
typedef int BOOL;
typedef unsigned short WORD;
typedef BOOL *LPBOOL;
typedef int *LPINT;
typedef DWORD *LPDWORD;
typedef void VOID;
typedef VOID *LPVOID;
typedef const VOID *LPCVOID;
typedef uint64_t ULONG_PTR, *PULONG_PTR;
typedef VOID *PVOID;
typedef char CHAR;
typedef CHAR *LPSTR;
typedef VOID *HANDLE;
typedef struct {
unsigned long Data1;
unsigned short Data2;
unsigned short Data3;
unsigned char Data4[8];
} GUID, *LPGUID;
// IOCP ----------------------------------------------------------------------
typedef struct _OVERLAPPED {
ULONG_PTR Internal;
ULONG_PTR InternalHigh;
PVOID Pointer;
HANDLE hEvent;
} OVERLAPPED, *LPOVERLAPPED;
HANDLE CreateIoCompletionPort(
HANDLE FileHandle,
HANDLE ExistingCompletionPort,
ULONG_PTR CompletionKey,
DWORD NumberOfConcurrentThreads
);
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytesTransferred,
PULONG_PTR lpCompletionKey,
LPOVERLAPPED *lpOverlapped,
DWORD dwMilliseconds
);
BOOL CancelIoEx(
HANDLE hFile,
LPOVERLAPPED lpOverlapped
);
// Sockets -------------------------------------------------------------------
typedef uintptr_t SOCKET;
typedef HANDLE WSAEVENT;
typedef unsigned int GROUP;
typedef struct _WSAPROTOCOL_INFOW WSAPROTOCOL_INFOW, *LPWSAPROTOCOL_INFOW;
SOCKET WSASocketW(
int af,
int type,
int protocol,
LPWSAPROTOCOL_INFOW lpProtocolInfo,
GROUP g,
DWORD dwFlags
);
int closesocket(SOCKET s);
typedef struct WSAData {
WORD wVersion;
WORD wHighVersion;
char szDescription[257];
char szSystemStatus[129];
unsigned short iMaxSockets; // to be ignored
unsigned short iMaxUdpDg; // to be ignored
char *lpVendorInfo; // to be ignored
} WSADATA, *LPWSADATA;
int WSAStartup(WORD wVersionRequested, LPWSADATA lpWSAData);
int WSACleanup(void);
int WSAGetLastError();
int getsockopt(
SOCKET s,
int level,
int optname,
char *optval,
int *optlen
);
int setsockopt(
SOCKET s,
int level,
int optname,
const char *optval,
int optlen
);
typedef struct _WSABUF {
ULONG len;
CHAR *buf;
} WSABUF, *LPWSABUF;
int WSAIoctl(
SOCKET s,
DWORD dwIoControlCode,
LPVOID lpvInBuffer,
DWORD cbInBuffer,
LPVOID lpvOutBuffer,
DWORD cbOutBuffer,
LPDWORD lpcbBytesReturned,
LPOVERLAPPED lpOverlapped,
void* lpCompletionRoutine
);
typedef BOOL (*LPFN_CONNECTEX) (
SOCKET s,
const sockaddr* name,
int namelen,
PVOID lpSendBuffer,
DWORD dwSendDataLength,
LPDWORD lpdwBytesSent,
LPOVERLAPPED lpOverlapped
);
typedef BOOL (*LPFN_ACCEPTEX) (
SOCKET sListenSocket,
SOCKET sAcceptSocket,
PVOID lpOutputBuffer,
DWORD dwReceiveDataLength,
DWORD dwLocalAddressLength,
DWORD dwRemoteAddressLength,
LPDWORD lpdwBytesReceived,
LPOVERLAPPED lpOverlapped
);
int connect(
SOCKET s,
const sockaddr *name,
int namelen
);
int WSASend(
SOCKET s,
LPWSABUF lpBuffers,
DWORD dwBufferCount,
LPDWORD lpNumberOfBytesSent,
DWORD dwFlags,
LPOVERLAPPED lpOverlapped,
void* lpCompletionRoutine
);
int WSARecv(
SOCKET s,
LPWSABUF lpBuffers,
DWORD dwBufferCount,
LPDWORD lpNumberOfBytesRecvd,
LPDWORD lpFlags,
LPOVERLAPPED lpOverlapped,
void* lpCompletionRoutine
);
int WSASendTo(
SOCKET s,
LPWSABUF lpBuffers,
DWORD dwBufferCount,
LPDWORD lpNumberOfBytesSent,
DWORD dwFlags,
const sockaddr *lpTo,
int iTolen,
LPOVERLAPPED lpOverlapped,
void* lpCompletionRoutine
);
int WSARecvFrom(
SOCKET s,
LPWSABUF lpBuffers,
DWORD dwBufferCount,
LPDWORD lpNumberOfBytesRecvd,
LPDWORD lpFlags,
sockaddr* lpFrom,
LPINT lpFromlen,
LPOVERLAPPED lpOverlapped,
void* lpCompletionRoutine
);
void GetAcceptExSockaddrs(
PVOID lpOutputBuffer,
DWORD dwReceiveDataLength,
DWORD dwLocalAddressLength,
DWORD dwRemoteAddressLength,
sockaddr** LocalSockaddr,
LPINT LocalSockaddrLength,
sockaddr** RemoteSockaddr,
LPINT RemoteSockaddrLength
);
]]
local WSAGetLastError = C.WSAGetLastError
local nbuf = new'DWORD[1]' --global buffer shared between many calls.
--error handling
do
cdef[[
DWORD FormatMessageA(
DWORD dwFlags,
LPCVOID lpSource,
DWORD dwMessageId,
DWORD dwLanguageId,
LPSTR lpBuffer,
DWORD nSize,
va_list *Arguments
);
]]
local FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
local error_msgs = {
[10013] = 'access_denied', --WSAEACCES
[10048] = 'address_already_in_use', --WSAEADDRINUSE
[10053] = 'connection_aborted', --WSAECONNABORTED
[10054] = 'connection_reset', --WSAECONNRESET
[10061] = 'connection_refused', --WSAECONNREFUSED
[ 1225] = 'connection_refused', --ERROR_CONNECTION_REFUSED
[ 109] = 'eof', --ERROR_BROKEN_PIPE, ReadFile (masked)
}
local buf
function check(ret, err)
if ret then return ret end
local err = err or WSAGetLastError()
local msg = error_msgs[err]
if not msg then
buf = buf or new('char[?]', 256)
local sz = ffi.C.FormatMessageA(