Skip to content

WebSockets

After mastering Server-Sent Events for one-way communication, Batman realized he needed something more powerful. When Commissioner Gordon wanted to chat with him in real-time during crisis situations, Batman needed bidirectional communication.

"SSE is great for pushing updates to my dashboard," Batman thought, "but I need two-way communication for coordinating with my allies!"

To handle real-time bidirectional communication, Batman learned how to work with WebSockets using Robyn's modern decorator-based API:

py
from robyn import Robyn

app = Robyn(__file__)

@app.websocket("/web_socket")
async def websocket_endpoint(websocket):
    # Connections are auto-accepted in Robyn - no need to call accept()!

    try:
        while True:
            message = await websocket.receive_text()
            websocket_id = websocket.id
            await websocket.send_text(f"Echo from {websocket_id}: {message}")
    except Exception as e:
        print(f"WebSocket {websocket.id} error: {e}")

@websocket_endpoint.on_connect
async def on_connect(websocket):
    await websocket.send_text("Connected!")
    return "Connection established"

@websocket_endpoint.on_close
async def on_close(websocket):
    print(f"WebSocket {websocket.id} disconnected")
    return "Disconnected"
py
from robyn import Robyn, WebSocketAdapter

app = Robyn(__file__)

@app.websocket("/web_socket")
async def websocket_endpoint(websocket: WebSocketAdapter):
    # Connections are auto-accepted - ready to use immediately!

    try:
        while True:
            message: str = await websocket.receive_text()
            websocket_id: str = websocket.id
            await websocket.send_text(f"Echo from {websocket_id}: {message}")
    except Exception as e:
        print(f"WebSocket {websocket.id} error: {e}")

@websocket_endpoint.on_connect
async def on_connect(websocket: WebSocketAdapter) -> str:
    await websocket.send_text("Connected!")
    return "Connection established"

@websocket_endpoint.on_close
async def on_close(websocket: WebSocketAdapter) -> str:
    print(f"WebSocket {websocket.id} disconnected")
    return "Disconnected"

WebSocket Methods

The modern WebSocket API provides clean, intuitive methods for handling real-time communication:

Note: WebSocket connections are automatically accepted in Robyn - no need to call accept()!

  • websocket.id - Get the unique connection ID
  • websocket.send_text(data) - Send text messages
  • websocket.send_json(data) - Send JSON messages
  • websocket.receive_text() - Receive text messages
  • websocket.receive_json() - Receive JSON messages
  • websocket.broadcast(data) - Broadcast to all connected clients
  • websocket.close() - Close the connection
py
@app.websocket("/chat")
async def chat_handler(websocket):
    
    while True:
        message = await websocket.receive_text()
        websocket_id = websocket.id
        
        # Send to this client
        await websocket.send_text(f"You said: {message}")
        
        # Broadcast to all clients
        await websocket.broadcast(f"User {websocket_id}: {message}")
py
@app.websocket("/api/ws")
async def api_handler(websocket):
    
    while True:
        data = await websocket.receive_json()
        
        response = {
            "id": websocket.id,
            "echo": data,
            "timestamp": time.time()
        }
        
        await websocket.send_json(response)

Dependency Injection

WebSockets support the same dependency injection system as HTTP routes, using global_dependencies and router_dependencies parameters:

py
from robyn import Robyn
import logging

app = Robyn(__file__)

# Configure dependencies
app.inject_global(
    logger=logging.getLogger(__name__),
    database=DatabaseConnection(),
    metrics=MetricsCollector()
)

app.inject(
    cache=RedisCache(),
    auth_service=JWTAuthService()
)
py
@app.websocket("/chat")
async def chat_handler(websocket, global_dependencies=None, router_dependencies=None):
    # Access injected dependencies
    logger = global_dependencies.get("logger")
    database = global_dependencies.get("database")
    cache = router_dependencies.get("cache")
    auth = router_dependencies.get("auth_service")
    
    logger.info(f"New chat connection: {websocket.id}")
    
    try:
        while True:
            message = await websocket.receive_text()
            
            # Use dependencies
            saved_msg = database.save_message(message, websocket.id)
            cache.invalidate(f"chat_history_{websocket.id}")
            
            await websocket.broadcast(f"User {websocket.id}: {message}")
    except Exception as e:
        logger.error(f"Chat error: {e}")

@chat_handler.on_connect
async def chat_connect(websocket, global_dependencies=None, router_dependencies=None):
    logger = global_dependencies.get("logger")
    auth = router_dependencies.get("auth_service")
    
    # Authenticate connection
    if not auth.verify_websocket_token(websocket.query_params.get("token")):
        await websocket.close()
        return "Unauthorized"
    
    logger.info(f"Authenticated connection: {websocket.id}")
    await websocket.send_text("Welcome to the chat!")
    return "Connected"

Broadcasting Messages

Batman learned to send messages to all connected clients using the clean broadcast() method:

py
@app.websocket("/notifications")
async def notification_handler(websocket):
    
    while True:
        message = await websocket.receive_text()
        
        # Broadcast to all connected clients
        await websocket.broadcast(f"Notification: {message}")
        
        # Confirm to sender
        await websocket.send_text("Notification sent!")
py
@app.websocket("/game")
async def game_handler(websocket):
    
    while True:
        action = await websocket.receive_json()
        
        if action["type"] == "move":
            # Broadcast player movement to all players
            await websocket.broadcast(json.dumps({
                "type": "player_moved",
                "player_id": websocket.id,
                "position": action["position"]
            }))
        elif action["type"] == "chat":
            # Broadcast chat message
            await websocket.broadcast(json.dumps({
                "type": "chat_message",
                "player_id": websocket.id,
                "message": action["message"]
            }))

Query Parameters and Headers

WebSockets can access query parameters and headers for authentication and configuration:

py
@app.websocket("/secure_chat")
async def secure_chat(websocket):
    # Access query parameters
    token = websocket.query_params.get("token")
    room_id = websocket.query_params.get("room")
    
    if not token or not authenticate_token(token):
        await websocket.close()
        return
        
    await websocket.send_text(f"Joined room: {room_id}")
    
    while True:
        message = await websocket.receive_text()
        # Broadcast only to users in the same room
        await websocket.broadcast(f"Room {room_id} - {websocket.id}: {message}")
py
@app.websocket("/api/stream")
async def stream_handler(websocket):
    # Access headers
    user_agent = websocket.headers.get("User-Agent")
    api_key = websocket.headers.get("X-API-Key")
    
    if not validate_api_key(api_key):
        await websocket.close()
        return
        
    print(f"Client connected: {user_agent}")
    
    # Stream data based on client info
    while True:
        data = await get_stream_data()
        await websocket.send_json(data)

Connection Management

Batman learned to programmatically manage WebSocket connections with the close() method. This is useful for enforcing business rules, handling authentication failures, or managing resource limits:

py
@app.websocket("/admin_panel")
async def admin_handler(websocket):
    
    while True:
        command = await websocket.receive_text()
        
        if command == "shutdown":
            await websocket.send_text("Shutting down connection...")
            await websocket.close()
            break
        elif command.startswith("kick_user"):
            user_id = command.split(":")[1]
            await websocket.broadcast(f"User {user_id} has been kicked")
            # Close specific user's connection (implementation depends on your user tracking)
        else:
            await websocket.send_text(f"Command executed: {command}")
py
connection_counts = {}

@app.websocket("/limited_service")
async def limited_handler(websocket):
    client_ip = websocket.client.get("ip", "unknown")
    
    # Check connection limit
    if connection_counts.get(client_ip, 0) >= 5:
        await websocket.send_text("Connection limit exceeded")
        await websocket.close()
        return
        
    connection_counts[client_ip] = connection_counts.get(client_ip, 0) + 1
    
    try:
        while True:
            message = await websocket.receive_text()
            await websocket.send_text(f"Processed: {message}")
    finally:
        # Clean up on disconnect
        connection_counts[client_ip] -= 1

Error Handling

Robust WebSocket applications need proper error handling for network issues, client disconnections, and application errors:

py
from robyn import WebSocketDisconnect

@app.websocket("/robust_chat")
async def robust_chat(websocket):
    
    try:
        while True:
            message = await websocket.receive_text()
            await websocket.send_text(f"Echo: {message}")
            
    except WebSocketDisconnect:
        print(f"Client {websocket.id} disconnected gracefully")
    except Exception as e:
        print(f"Unexpected error for {websocket.id}: {e}")
        await websocket.close()
py
import asyncio

@app.websocket("/monitored_service")
async def monitored_service(websocket, global_dependencies=None):
    logger = global_dependencies.get("logger")
    
    try:
        while True:
            # Set timeout for receive
            try:
                message = await asyncio.wait_for(
                    websocket.receive_text(), 
                    timeout=30.0
                )
                await websocket.send_text(f"Processed: {message}")
                
            except asyncio.TimeoutError:
                await websocket.send_text("Keep-alive ping")
                
    except WebSocketDisconnect:
        logger.info(f"Client {websocket.id} disconnected")
    except Exception as e:
        logger.error(f"Error in {websocket.id}: {e}")
        try:
            await websocket.send_text("Service error occurred")
            await websocket.close()
        except:
            pass  # Connection might already be closed

What's next?

With real-time communication mastered, Batman was ready to scale his application. He learned about organizing his growing codebase with views and subrouters to keep everything maintainable as the Justice League joined his mission.

Released under the MIT License.