跳转到内容

WebSockets

在掌握了服务器发送事件的单向通信之后,蝙蝠侠意识到他需要更强大的功能。当戈登局长想要在危机情况下与他实时聊天时,蝙蝠侠需要双向通信。

"SSE 很适合向我的仪表板推送更新,"蝙蝠侠想道,"但我需要双向通信来与我的盟友协调!"

为了实现双向实时通信,蝙蝠侠学习了如何使用 Robyn 的现代装饰器式 API 来处理 WebSocket:

py
from robyn import Robyn

app = Robyn(__file__)

@app.websocket("/web_socket")
async def websocket_endpoint(websocket):
    
    try:
        while True:
            message = await websocket.receive_text()
            websocket_id = websocket.id
            await websocket.send_text(f"来自 {websocket_id} 的回显: {message}")
    except Exception as e:
        print(f"WebSocket {websocket.id} 错误: {e}")

@websocket_endpoint.on_connect
async def on_connect(websocket):
    await websocket.send_text("已连接!")
    return "连接已建立"

@websocket_endpoint.on_close
async def on_close(websocket):
    print(f"WebSocket {websocket.id} 已断开连接")
    return "已断开连接"
py
from robyn import Robyn, WebSocketAdapter

app = Robyn(__file__)

@app.websocket("/web_socket")
async def websocket_endpoint(websocket: WebSocketAdapter):
    
    try:
        while True:
            message: str = await websocket.receive_text()
            websocket_id: str = websocket.id
            await websocket.send_text(f"来自 {websocket_id} 的回显: {message}")
    except Exception as e:
        print(f"WebSocket {websocket.id} 错误: {e}")

@websocket_endpoint.on_connect
async def on_connect(websocket: WebSocketAdapter) -> str:
    await websocket.send_text("已连接!")
    return "连接已建立"

@websocket_endpoint.on_close
async def on_close(websocket: WebSocketAdapter) -> str:
    print(f"WebSocket {websocket.id} 已断开连接")
    return "已断开连接"

WebSocket 方法

现代 WebSocket API 提供了简洁、直观的方法来处理实时通信:

注意:在 Robyn 中 WebSocket 连接会自动接受 - 无需调用 accept()

  • websocket.id - 获取唯一连接 ID
  • websocket.send_text(data) - 发送文本消息
  • websocket.send_json(data) - 发送 JSON 消息
  • websocket.receive_text() - 接收文本消息
  • websocket.receive_json() - 接收 JSON 消息
  • websocket.broadcast(data) - 向所有连接的客户端广播
  • websocket.close() - 关闭连接
py
@app.websocket("/chat")
async def chat_handler(websocket):
    
    while True:
        message = await websocket.receive_text()
        websocket_id = websocket.id
        
        # 发送给此客户端
        await websocket.send_text(f"你说: {message}")
        
        # 广播给所有客户端
        await websocket.broadcast(f"用户 {websocket_id}: {message}")
py
@app.websocket("/api/ws")
async def api_handler(websocket):
    
    while True:
        data = await websocket.receive_json()
        
        response = {
            "id": websocket.id,
            "echo": data,
            "timestamp": time.time()
        }
        
        await websocket.send_json(response)

依赖注入

WebSocket 支持与 HTTP 路由相同的依赖注入系统,使用 global_dependenciesrouter_dependencies 参数:

py
from robyn import Robyn
import logging

app = Robyn(__file__)

# 配置依赖
app.inject_global(
    logger=logging.getLogger(__name__),
    database=DatabaseConnection(),
    metrics=MetricsCollector()
)

app.inject(
    cache=RedisCache(),
    auth_service=JWTAuthService()
)
py
@app.websocket("/chat")
async def chat_handler(websocket, global_dependencies=None, router_dependencies=None):
    # 访问注入的依赖
    logger = global_dependencies.get("logger")
    database = global_dependencies.get("database")
    cache = router_dependencies.get("cache")
    auth = router_dependencies.get("auth_service")
    
    logger.info(f"新的聊天连接: {websocket.id}")
    
    try:
        while True:
            message = await websocket.receive_text()
            
            # 使用依赖
            saved_msg = database.save_message(message, websocket.id)
            cache.invalidate(f"chat_history_{websocket.id}")
            
            await websocket.broadcast(f"用户 {websocket.id}: {message}")
    except Exception as e:
        logger.error(f"聊天错误: {e}")

@chat_handler.on_connect
async def chat_connect(websocket, global_dependencies=None, router_dependencies=None):
    logger = global_dependencies.get("logger")
    auth = router_dependencies.get("auth_service")
    
    # 认证连接
    if not auth.verify_websocket_token(websocket.query_params.get("token")):
        await websocket.close()
        return "未授权"
    
    logger.info(f"已认证连接: {websocket.id}")
    await websocket.send_text("欢迎来到聊天室!")
    return "已连接"

广播消息

蝙蝠侠学会了使用简洁的 broadcast() 方法向所有连接的客户端发送消息:

py
@app.websocket("/notifications")
async def notification_handler(websocket):
    
    while True:
        message = await websocket.receive_text()
        
        # 广播给所有连接的客户端
        await websocket.broadcast(f"通知: {message}")
        
        # 确认给发送者
        await websocket.send_text("通知已发送!")
py
@app.websocket("/game")
async def game_handler(websocket):
    
    while True:
        action = await websocket.receive_json()
        
        if action["type"] == "move":
            # 向所有玩家广播玩家移动
            await websocket.broadcast(json.dumps({
                "type": "player_moved",
                "player_id": websocket.id,
                "position": action["position"]
            }))
        elif action["type"] == "chat":
            # 广播聊天消息
            await websocket.broadcast(json.dumps({
                "type": "chat_message",
                "player_id": websocket.id,
                "message": action["message"]
            }))

下一步是什么?

掌握了实时通信后,蝙蝠侠准备扩展他的应用程序。他学习了如何使用视图和子路由器来组织不断增长的代码库,以便在正义联盟加入他的任务时保持代码的可维护性。

基于 MIT 许可发布