WebSockets
在掌握了服务器发送事件的单向通信之后,蝙蝠侠意识到他需要更强大的功能。当戈登局长想要在危机情况下与他实时聊天时,蝙蝠侠需要双向通信。
"SSE 很适合向我的仪表板推送更新,"蝙蝠侠想道,"但我需要双向通信来与我的盟友协调!"
为了实现双向实时通信,蝙蝠侠学习了如何使用 Robyn 的现代装饰器式 API 来处理 WebSocket:
py
from robyn import Robyn
app = Robyn(__file__)
@app.websocket("/web_socket")
async def websocket_endpoint(websocket):
try:
while True:
message = await websocket.receive_text()
websocket_id = websocket.id
await websocket.send_text(f"来自 {websocket_id} 的回显: {message}")
except Exception as e:
print(f"WebSocket {websocket.id} 错误: {e}")
@websocket_endpoint.on_connect
async def on_connect(websocket):
await websocket.send_text("已连接!")
return "连接已建立"
@websocket_endpoint.on_close
async def on_close(websocket):
print(f"WebSocket {websocket.id} 已断开连接")
return "已断开连接"
py
from robyn import Robyn, WebSocketAdapter
app = Robyn(__file__)
@app.websocket("/web_socket")
async def websocket_endpoint(websocket: WebSocketAdapter):
try:
while True:
message: str = await websocket.receive_text()
websocket_id: str = websocket.id
await websocket.send_text(f"来自 {websocket_id} 的回显: {message}")
except Exception as e:
print(f"WebSocket {websocket.id} 错误: {e}")
@websocket_endpoint.on_connect
async def on_connect(websocket: WebSocketAdapter) -> str:
await websocket.send_text("已连接!")
return "连接已建立"
@websocket_endpoint.on_close
async def on_close(websocket: WebSocketAdapter) -> str:
print(f"WebSocket {websocket.id} 已断开连接")
return "已断开连接"
WebSocket 方法
现代 WebSocket API 提供了简洁、直观的方法来处理实时通信:
注意:在 Robyn 中 WebSocket 连接会自动接受 - 无需调用 accept()
!
websocket.id
- 获取唯一连接 IDwebsocket.send_text(data)
- 发送文本消息websocket.send_json(data)
- 发送 JSON 消息websocket.receive_text()
- 接收文本消息websocket.receive_json()
- 接收 JSON 消息websocket.broadcast(data)
- 向所有连接的客户端广播websocket.close()
- 关闭连接
py
@app.websocket("/chat")
async def chat_handler(websocket):
while True:
message = await websocket.receive_text()
websocket_id = websocket.id
# 发送给此客户端
await websocket.send_text(f"你说: {message}")
# 广播给所有客户端
await websocket.broadcast(f"用户 {websocket_id}: {message}")
py
@app.websocket("/api/ws")
async def api_handler(websocket):
while True:
data = await websocket.receive_json()
response = {
"id": websocket.id,
"echo": data,
"timestamp": time.time()
}
await websocket.send_json(response)
依赖注入
WebSocket 支持与 HTTP 路由相同的依赖注入系统,使用 global_dependencies
和 router_dependencies
参数:
py
from robyn import Robyn
import logging
app = Robyn(__file__)
# 配置依赖
app.inject_global(
logger=logging.getLogger(__name__),
database=DatabaseConnection(),
metrics=MetricsCollector()
)
app.inject(
cache=RedisCache(),
auth_service=JWTAuthService()
)
py
@app.websocket("/chat")
async def chat_handler(websocket, global_dependencies=None, router_dependencies=None):
# 访问注入的依赖
logger = global_dependencies.get("logger")
database = global_dependencies.get("database")
cache = router_dependencies.get("cache")
auth = router_dependencies.get("auth_service")
logger.info(f"新的聊天连接: {websocket.id}")
try:
while True:
message = await websocket.receive_text()
# 使用依赖
saved_msg = database.save_message(message, websocket.id)
cache.invalidate(f"chat_history_{websocket.id}")
await websocket.broadcast(f"用户 {websocket.id}: {message}")
except Exception as e:
logger.error(f"聊天错误: {e}")
@chat_handler.on_connect
async def chat_connect(websocket, global_dependencies=None, router_dependencies=None):
logger = global_dependencies.get("logger")
auth = router_dependencies.get("auth_service")
# 认证连接
if not auth.verify_websocket_token(websocket.query_params.get("token")):
await websocket.close()
return "未授权"
logger.info(f"已认证连接: {websocket.id}")
await websocket.send_text("欢迎来到聊天室!")
return "已连接"
广播消息
蝙蝠侠学会了使用简洁的 broadcast()
方法向所有连接的客户端发送消息:
py
@app.websocket("/notifications")
async def notification_handler(websocket):
while True:
message = await websocket.receive_text()
# 广播给所有连接的客户端
await websocket.broadcast(f"通知: {message}")
# 确认给发送者
await websocket.send_text("通知已发送!")
py
@app.websocket("/game")
async def game_handler(websocket):
while True:
action = await websocket.receive_json()
if action["type"] == "move":
# 向所有玩家广播玩家移动
await websocket.broadcast(json.dumps({
"type": "player_moved",
"player_id": websocket.id,
"position": action["position"]
}))
elif action["type"] == "chat":
# 广播聊天消息
await websocket.broadcast(json.dumps({
"type": "chat_message",
"player_id": websocket.id,
"message": action["message"]
}))
下一步是什么?
掌握了实时通信后,蝙蝠侠准备扩展他的应用程序。他学习了如何使用视图和子路由器来组织不断增长的代码库,以便在正义联盟加入他的任务时保持代码的可维护性。