WebSockets
After mastering Server-Sent Events for one-way communication, Batman realized he needed something more powerful. When Commissioner Gordon wanted to chat with him in real-time during crisis situations, Batman needed bidirectional communication.
"SSE is great for pushing updates to my dashboard," Batman thought, "but I need two-way communication for coordinating with my allies!"
To handle real-time bidirectional communication, Batman learned how to work with WebSockets using Robyn's modern decorator-based API:
from robyn import Robyn
app = Robyn(__file__)
@app.websocket("/web_socket")
async def websocket_endpoint(websocket):
# Connections are auto-accepted in Robyn - no need to call accept()!
try:
while True:
message = await websocket.receive_text()
websocket_id = websocket.id
await websocket.send_text(f"Echo from {websocket_id}: {message}")
except Exception as e:
print(f"WebSocket {websocket.id} error: {e}")
@websocket_endpoint.on_connect
async def on_connect(websocket):
await websocket.send_text("Connected!")
return "Connection established"
@websocket_endpoint.on_close
async def on_close(websocket):
print(f"WebSocket {websocket.id} disconnected")
return "Disconnected"
from robyn import Robyn, WebSocketAdapter
app = Robyn(__file__)
@app.websocket("/web_socket")
async def websocket_endpoint(websocket: WebSocketAdapter):
# Connections are auto-accepted - ready to use immediately!
try:
while True:
message: str = await websocket.receive_text()
websocket_id: str = websocket.id
await websocket.send_text(f"Echo from {websocket_id}: {message}")
except Exception as e:
print(f"WebSocket {websocket.id} error: {e}")
@websocket_endpoint.on_connect
async def on_connect(websocket: WebSocketAdapter) -> str:
await websocket.send_text("Connected!")
return "Connection established"
@websocket_endpoint.on_close
async def on_close(websocket: WebSocketAdapter) -> str:
print(f"WebSocket {websocket.id} disconnected")
return "Disconnected"
WebSocket Methods
The modern WebSocket API provides clean, intuitive methods for handling real-time communication:
Note: WebSocket connections are automatically accepted in Robyn - no need to call accept()
!
websocket.id
- Get the unique connection IDwebsocket.send_text(data)
- Send text messageswebsocket.send_json(data)
- Send JSON messageswebsocket.receive_text()
- Receive text messageswebsocket.receive_json()
- Receive JSON messageswebsocket.broadcast(data)
- Broadcast to all connected clientswebsocket.close()
- Close the connection
@app.websocket("/chat")
async def chat_handler(websocket):
while True:
message = await websocket.receive_text()
websocket_id = websocket.id
# Send to this client
await websocket.send_text(f"You said: {message}")
# Broadcast to all clients
await websocket.broadcast(f"User {websocket_id}: {message}")
@app.websocket("/api/ws")
async def api_handler(websocket):
while True:
data = await websocket.receive_json()
response = {
"id": websocket.id,
"echo": data,
"timestamp": time.time()
}
await websocket.send_json(response)
Dependency Injection
WebSockets support the same dependency injection system as HTTP routes, using global_dependencies
and router_dependencies
parameters:
from robyn import Robyn
import logging
app = Robyn(__file__)
# Configure dependencies
app.inject_global(
logger=logging.getLogger(__name__),
database=DatabaseConnection(),
metrics=MetricsCollector()
)
app.inject(
cache=RedisCache(),
auth_service=JWTAuthService()
)
@app.websocket("/chat")
async def chat_handler(websocket, global_dependencies=None, router_dependencies=None):
# Access injected dependencies
logger = global_dependencies.get("logger")
database = global_dependencies.get("database")
cache = router_dependencies.get("cache")
auth = router_dependencies.get("auth_service")
logger.info(f"New chat connection: {websocket.id}")
try:
while True:
message = await websocket.receive_text()
# Use dependencies
saved_msg = database.save_message(message, websocket.id)
cache.invalidate(f"chat_history_{websocket.id}")
await websocket.broadcast(f"User {websocket.id}: {message}")
except Exception as e:
logger.error(f"Chat error: {e}")
@chat_handler.on_connect
async def chat_connect(websocket, global_dependencies=None, router_dependencies=None):
logger = global_dependencies.get("logger")
auth = router_dependencies.get("auth_service")
# Authenticate connection
if not auth.verify_websocket_token(websocket.query_params.get("token")):
await websocket.close()
return "Unauthorized"
logger.info(f"Authenticated connection: {websocket.id}")
await websocket.send_text("Welcome to the chat!")
return "Connected"
Broadcasting Messages
Batman learned to send messages to all connected clients using the clean broadcast()
method:
@app.websocket("/notifications")
async def notification_handler(websocket):
while True:
message = await websocket.receive_text()
# Broadcast to all connected clients
await websocket.broadcast(f"Notification: {message}")
# Confirm to sender
await websocket.send_text("Notification sent!")
@app.websocket("/game")
async def game_handler(websocket):
while True:
action = await websocket.receive_json()
if action["type"] == "move":
# Broadcast player movement to all players
await websocket.broadcast(json.dumps({
"type": "player_moved",
"player_id": websocket.id,
"position": action["position"]
}))
elif action["type"] == "chat":
# Broadcast chat message
await websocket.broadcast(json.dumps({
"type": "chat_message",
"player_id": websocket.id,
"message": action["message"]
}))
Query Parameters and Headers
WebSockets can access query parameters and headers for authentication and configuration:
@app.websocket("/secure_chat")
async def secure_chat(websocket):
# Access query parameters
token = websocket.query_params.get("token")
room_id = websocket.query_params.get("room")
if not token or not authenticate_token(token):
await websocket.close()
return
await websocket.send_text(f"Joined room: {room_id}")
while True:
message = await websocket.receive_text()
# Broadcast only to users in the same room
await websocket.broadcast(f"Room {room_id} - {websocket.id}: {message}")
@app.websocket("/api/stream")
async def stream_handler(websocket):
# Access headers
user_agent = websocket.headers.get("User-Agent")
api_key = websocket.headers.get("X-API-Key")
if not validate_api_key(api_key):
await websocket.close()
return
print(f"Client connected: {user_agent}")
# Stream data based on client info
while True:
data = await get_stream_data()
await websocket.send_json(data)
Connection Management
Batman learned to programmatically manage WebSocket connections with the close()
method. This is useful for enforcing business rules, handling authentication failures, or managing resource limits:
@app.websocket("/admin_panel")
async def admin_handler(websocket):
while True:
command = await websocket.receive_text()
if command == "shutdown":
await websocket.send_text("Shutting down connection...")
await websocket.close()
break
elif command.startswith("kick_user"):
user_id = command.split(":")[1]
await websocket.broadcast(f"User {user_id} has been kicked")
# Close specific user's connection (implementation depends on your user tracking)
else:
await websocket.send_text(f"Command executed: {command}")
connection_counts = {}
@app.websocket("/limited_service")
async def limited_handler(websocket):
client_ip = websocket.client.get("ip", "unknown")
# Check connection limit
if connection_counts.get(client_ip, 0) >= 5:
await websocket.send_text("Connection limit exceeded")
await websocket.close()
return
connection_counts[client_ip] = connection_counts.get(client_ip, 0) + 1
try:
while True:
message = await websocket.receive_text()
await websocket.send_text(f"Processed: {message}")
finally:
# Clean up on disconnect
connection_counts[client_ip] -= 1
Error Handling
Robust WebSocket applications need proper error handling for network issues, client disconnections, and application errors:
from robyn import WebSocketDisconnect
@app.websocket("/robust_chat")
async def robust_chat(websocket):
try:
while True:
message = await websocket.receive_text()
await websocket.send_text(f"Echo: {message}")
except WebSocketDisconnect:
print(f"Client {websocket.id} disconnected gracefully")
except Exception as e:
print(f"Unexpected error for {websocket.id}: {e}")
await websocket.close()
import asyncio
@app.websocket("/monitored_service")
async def monitored_service(websocket, global_dependencies=None):
logger = global_dependencies.get("logger")
try:
while True:
# Set timeout for receive
try:
message = await asyncio.wait_for(
websocket.receive_text(),
timeout=30.0
)
await websocket.send_text(f"Processed: {message}")
except asyncio.TimeoutError:
await websocket.send_text("Keep-alive ping")
except WebSocketDisconnect:
logger.info(f"Client {websocket.id} disconnected")
except Exception as e:
logger.error(f"Error in {websocket.id}: {e}")
try:
await websocket.send_text("Service error occurred")
await websocket.close()
except:
pass # Connection might already be closed
What's next?
With real-time communication mastered, Batman was ready to scale his application. He learned about organizing his growing codebase with views and subrouters to keep everything maintainable as the Justice League joined his mission.