-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcli.py
170 lines (123 loc) · 4.87 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import time
import asyncio
import websockets
from aiortc import RTCIceCandidate, RTCSessionDescription
from glonax.message import Engine, Instance, Motion
async def consume_signaling(pc, signaling):
while True:
obj = await signaling.receive()
if isinstance(obj, RTCSessionDescription):
await pc.setRemoteDescription(obj)
if obj.type == "offer":
# send answer
await pc.setLocalDescription(await pc.createAnswer())
await signaling.send(pc.localDescription)
elif isinstance(obj, RTCIceCandidate):
await pc.addIceCandidate(obj)
elif obj is BYE:
print("Exiting")
break
time_start = None
def current_stamp():
global time_start
if time_start is None:
time_start = time.time()
return 0
else:
return int((time.time() - time_start) * 1000000)
async def run_offer(pc, signaling):
await signaling.connect()
channel = pc.createDataChannel("chat")
print("created by local party")
async def send_pings():
while True:
channel.send("ping %d" % current_stamp())
await asyncio.sleep(1)
@channel.on("open")
def on_open():
asyncio.ensure_future(send_pings())
@channel.on("message")
def on_message(message):
print("<", message)
if isinstance(message, str) and message.startswith("pong"):
elapsed_ms = (current_stamp() - int(message[5:])) / 1000
print(" RTT %.2f ms" % elapsed_ms)
await pc.setLocalDescription(await pc.createOffer())
await signaling.send(pc.localDescription)
await consume_signaling(pc, signaling)
# if __name__ == "__main__":
# parser = argparse.ArgumentParser(description="Data channels ping/pong")
# parser.add_argument("role", choices=["offer", "answer"])
# parser.add_argument("--verbose", "-v", action="count")
# args = parser.parse_args()
# if args.verbose:
# logging.basicConfig(level=logging.DEBUG)
# # signaling = create_signaling(args)
# pc = RTCPeerConnection()
# coro = run_offer(pc, signaling)
# # run event loop
# loop = asyncio.get_event_loop()
# try:
# asyncio.create_task(coro)
# except KeyboardInterrupt:
# pass
# finally:
# loop.run_until_complete(pc.close())
# loop.run_until_complete(signaling.close())
import json
from glonax_agent.jsonrpc import JSONRPCRequest, JSONRPCResponse, JSONRPCError
from abc import ABC, abstractmethod
class RPCProxyBase(ABC):
@abstractmethod
async def remote_call(self, req: str) -> str:
pass
async def _remote_call(
self, method: str, params: list | dict = [], return_type=None
):
req = JSONRPCRequest(id=1, method=method, params=params)
resp = await self.remote_call(req.json())
data = json.loads(resp)
if "error" in data:
response = JSONRPCError(id=data["id"], **data["error"])
# TODO: raise custom exception
raise Exception(f"Error: {response.code} - {response.message}")
else:
response = JSONRPCResponse(**data)
if return_type:
# TODO: This is temporary
if return_type == str:
return return_type(response.result)
# TODO: check if return_type is dataclass
return return_type(**response.result)
class WebsocketRPC(RPCProxyBase):
def __init__(self, uri: str):
self.uri = uri
async def remote_call(self, req: str) -> str:
await self.websocket.send(req)
return await self.websocket.recv()
async def __aenter__(self):
self.websocket = await websockets.connect(self.uri)
return self
async def __aexit__(self, exc_type, exc, tb):
await self.websocket.close()
class GlonaxRPC(WebsocketRPC):
async def echo(self, message: str) -> str:
return await self._remote_call("echo", params=[message], return_type=str)
async def glonax_instance(self) -> Instance:
return await self._remote_call("glonax_instance", return_type=Instance)
async def glonax_engine(self) -> Engine:
return await self._remote_call("glonax_engine", return_type=Engine)
async def glonax_motion(self) -> Motion:
return await self._remote_call("glonax_motion", return_type=Motion)
async def apt(self, operation: str, package: str):
await self._remote_call("apt", params=[operation, package])
async def main():
uri = "wss://edge.laixer.equipment/api/app/d6d1a2db-52b9-4abb-8bea-f2d0537432e2/ws"
async with GlonaxRPC(uri) as rpc:
print(await rpc.echo("Hello, World"))
# print(await rpc.glonax_instance())
print(await rpc.glonax_engine())
print(await rpc.glonax_motion())
await rpc.apt("upgrade", "-")
if __name__ == "__main__":
asyncio.run(main())