flask-websockets is an extension library for Flask, a popular web micro-framework. It adds real-time communication capabilities to your Flask application. flask-websockets implements the WebSocket protocol and allows for low-level control over the connections, as well as a high-level API for subscribing connections to rooms.
flask-websockets supports most popular HTTP WSGI servers such as Werkzeug, Gunicorn, Eventlet and Gevent.
import time
from threading import Thread
from flask import Flask
from flask_websockets import WebSocket, WebSockets
app = Flask(__name__)
websockets = WebSockets(app)
@websockets.route("/ws")
def websocket_route(ws: WebSocket) -> None:
with websockets.subscribe(ws, ["server_time"]):
for data in ws.iter_data(): # keep listening to the websocket so it doesn't disconnect
pass
def publish_server_time() -> None:
while True:
websockets.publish(str(time.time()), ["server_time"])
time.sleep(1)
Thread(target=publish_server_time).start()
app.run(host="localhost", port=6969)