跳转到内容

服务器发送事件 (SSE)

在学习了表单数据处理之后,蝙蝠侠意识到他需要一种方法来向他的犯罪监控仪表板推送实时更新。犯罪分子不会等待蝙蝠侠刷新浏览器!

他发现了服务器发送事件 (SSE) - 一个完美的解决方案,用于通过 HTTP 进行服务器到客户端的单向通信。SSE 允许蝙蝠侠将实时数据流式传输到他的仪表板,而无需完整双向通信的复杂性。

"这正是我的犯罪警报系统所需要的!"蝙蝠侠惊呼道。"我可以在检测到新犯罪时立即向仪表板推送更新。"

服务器发送事件非常适合:

  • 实时通知
  • 实时数据源
  • 进度更新
  • 聊天应用程序(仅服务器到客户端)
  • 仪表板更新
  • 日志流式传输

如何工作?

蝙蝠侠可以使用 SSEResponseSSEMessage 类创建服务器发送事件流。他可以根据需要使用常规生成器和异步生成器:

  • 常规生成器:非常适合简单的数据流或使用同步操作时
  • 异步生成器:当蝙蝠侠需要在流中执行异步操作(如数据库查询或 API 调用)时的理想选择
python
from robyn import Robyn, SSEResponse, SSEMessage
import time

app = Robyn(__file__)

@app.get("/events")
def stream_events(request):
    def event_generator():
        for i in range(10):
            yield SSEMessage(f"事件 {i}", id=str(i))
            time.sleep(1)
    
    return SSEResponse(event_generator())
python
from robyn import Robyn, SSEResponse, SSEMessage
import json
import time

app = Robyn(__file__)

@app.get("/events/json")
def stream_json_events(request):
    def json_event_generator():
        for i in range(5):
            data = {
                "id": i,
                "message": f"更新 {i}",
                "timestamp": time.time()
            }
            yield SSEMessage(
                json.dumps(data), 
                event="update", 
                id=str(i)
            )
            time.sleep(2)
    
    return SSEResponse(json_event_generator())
python
from robyn import Robyn, SSEResponse, SSEMessage
import asyncio
import time

app = Robyn(__file__)

@app.get("/events/async")
async def stream_async_events(request):
    async def async_event_generator():
        for i in range(8):
            # 模拟异步工作,如数据库调用
            await asyncio.sleep(0.5)
            yield SSEMessage(
                f"异步消息 {i} - {time.strftime('%H:%M:%S')}", 
                event="async", 
                id=str(i)
            )
    
    return SSEResponse(async_event_generator())

异步生成器

当蝙蝠侠需要在他的 SSE 流中执行异步操作时(如从数据库获取数据或进行 API 调用),他使用带有 async defawait 的异步生成器。这使他能够并发处理多个流,而不会阻塞其他操作。

关键区别是为生成器函数使用 async def,并在生成器内部为异步操作使用 await

python
from robyn import Robyn, SSEResponse, SSEMessage
import asyncio
import json
import time

app = Robyn(__file__)

@app.get("/events/database")
async def stream_database_events(request):
    async def database_event_generator():
        for i in range(10):
            # 模拟异步数据库查询
            await asyncio.sleep(0.3)
            
            # 模拟从数据库获取数据
            data = {
                "crime_id": i,
                "location": f"哥谭市第{i}区",
                "severity": "高" if i % 2 == 0 else "低",
                "timestamp": time.time()
            }
            
            yield SSEMessage(
                json.dumps(data),
                event="crime_alert",
                id=str(i)
            )
    
    return SSEResponse(database_event_generator())
python
from robyn import Robyn, SSEResponse, SSEMessage
import asyncio
import aiohttp
import json

app = Robyn(__file__)

@app.get("/events/api")
async def stream_api_events(request):
    async def api_event_generator():
        async with aiohttp.ClientSession() as session:
            for i in range(5):
                try:
                    # 进行异步 API 调用
                    async with session.get(f"https://api.example.com/data/{i}") as response:
                        data = await response.json()
                        
                        yield SSEMessage(
                            json.dumps(data),
                            event="api_update",
                            id=str(i)
                        )
                except Exception as e:
                    yield SSEMessage(
                        f"获取数据错误: {str(e)}",
                        event="error",
                        id=f"error_{i}"
                    )
                
                await asyncio.sleep(1)
    
    return SSEResponse(api_event_generator())

接下来做什么?

蝙蝠侠已经掌握了服务器发送事件,现在可以向他的犯罪仪表板流式传输实时更新。虽然 SSE 非常适合服务器到客户端的单向通信,但蝙蝠侠意识到他需要双向通信来实现更多交互功能,比如与他的盟友进行实时聊天。

接下来,他想探索如何使用 WebSocket 处理双向通信,以获得更多交互功能。

如果蝙蝠侠需要处理意外情况,他将学习异常处理以使他的应用程序更加健壮。

为了将他的犯罪监控系统扩展到多个进程,蝙蝠侠将探索多核扩展

基于 MIT 许可发布