-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathchannel.py
58 lines (48 loc) · 2.01 KB
/
channel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio
import aioredis
from starlette.applications import Starlette
from starlette.routing import WebSocketRoute
from redismpx import Multiplexer
# Pass to Multiplexer the same connection options that
# aioredis.create_redis() would accept.
mpx = Multiplexer('redis://localhost')
pub_conn = None
async def handle_ws(ws):
global pub_conn
await ws.accept()
# Create a separate connection for publishing messages:
if pub_conn is None:
pub_conn = await aioredis.create_redis('redis://localhost')
# Define a callback that sends messages to this websocket
async def on_message(channel: bytes, message: bytes):
await ws.send_text(f"ch: [{channel.decode()}] msg: [{message.decode()}]\n")
# Create a subscription for this websocket
sub = mpx.new_channel_subscription(on_message,
lambda e: print(f"Network Error: {type(e)}: {e}"),
lambda s: print(f"Subscription now active: {s}"))
# Keep reading from the websocket, use the messages sent by the user
# to add and remove channels from the subscription.
# Use +channel to join a channel, -channel to leave.
# Sending !channel will send the next message to said channel.
await ws.send_text('# Use +channel to join a channel, -channel to leave.')
await ws.send_text('# Sending !channel will send the next message to said channel.')
await ws.send_text('# To see a message sent to a given channel, you must have joined it beforehand.')
while True:
msg = None
try:
msg = await ws.receive_text()
except:
print('ws disconnected')
sub.close()
return
prefix, chan = msg[0], msg[1:]
if prefix == "+":
sub.add(chan)
elif prefix == "-":
sub.remove(chan)
elif prefix == '!':
# Send the next message to the given channel
await pub_conn.publish(chan, await ws.receive_text())
app = Starlette(debug=True, routes=[
WebSocketRoute('/ws', endpoint=handle_ws),
])