-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomm_ip.py
51 lines (47 loc) · 1.88 KB
/
comm_ip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#-*-coding:utf-8;-*-
#qpy:2
#qpy:console
from binascii import b2a_hex
import socket, struct
from collections import namedtuple
import lytro
magic = 0xfaaa55af
Response = namedtuple('Response', ['magic', 'size', 'seq', 'command'])
class IpTarget(lytro.Target):
def __init__(self, address, timeout=1):
self.s = socket.create_connection(address, timeout)
self.s.settimeout(timeout)
def read(self, command, size):
maxsize = 1<<15
if size > maxsize:
size = maxsize
prefix=struct.pack(b'<3I', magic, size, 1)
self.s.send(prefix+command)
#print(f"{command!r}")
assert 3*4+len(command)==28
response_s=self.s.recv(3*4+len(command))
#print(repr(response_s), b2a_hex(response_s))
response=Response(*struct.unpack(b'<3IB', response_s[:13]))
# Check that the response does match
#print(f"Response: {response!r} {response_s!r}")
assert response.magic == magic
assert response.command == command[0]
payload=self.s.recv(response.size)
while len(payload)<response.size:
#print(f"{len(payload)=}")
payload+=self.s.recv(response.size-len(payload))
return payload
def write(self, command, data):
prefix=struct.pack(b'<3I', magic, len(data), 0)
self.s.send(prefix+command+data)
print(f"{3*4+len(command)=}")
response=self.s.recv(3*4+len(command))
print(repr(response), b2a_hex(response))
# Check that the response does match
#print(f"Response: {response!r}")
assert response[:3*4+1]==struct.pack(b'<3IB', magic, len(data), 2, command[0])
self.s.recv(len(data)) # Read any response data; tcp proto mirrors our output
return response[3*4+len(command):]
def probe():
# TODO: Check if it's actually there? DNS-SD should be used.
return [("10.100.1.1", 5678)]