You will build a real-time AI application using Python WebSockets that streams LLM responses bidirectionally between clients and a GPU-hosted model. By the end, you will have a WebSocket server on your dedicated GPU server that handles multiple concurrent connections with token-by-token delivery.
Why WebSockets for AI
HTTP Server-Sent Events work for one-directional streaming, but WebSockets enable bidirectional communication. This matters for AI applications where clients send follow-up messages mid-stream, cancel generation, or receive status updates alongside token output.
| Feature | HTTP/SSE | WebSocket |
|---|---|---|
| Direction | Server to client | Bidirectional |
| Mid-stream input | New request needed | Same connection |
| Connection overhead | Per request | Once |
| Cancel generation | Close connection | Send cancel message |
| Multiple streams | Multiple connections | Multiplexed |
WebSocket Server
Build a WebSocket server using FastAPI’s native WebSocket support connected to a vLLM backend.
import asyncio
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI(base_url="http://localhost:8000/v1", api_key="not-needed")
class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def chat_ws(ws: WebSocket):
await manager.connect(ws)
try:
while True:
data = await ws.receive_json()
if data.get("type") == "message":
await handle_message(ws, data)
elif data.get("type") == "cancel":
await ws.send_json({"type": "cancelled"})
except WebSocketDisconnect:
manager.disconnect(ws)
async def handle_message(ws: WebSocket, data: dict):
messages = data.get("messages", [])
await ws.send_json({"type": "start"})
stream = await client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=messages,
max_tokens=data.get("max_tokens", 1024),
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
await ws.send_json({"type": "token", "content": content})
await ws.send_json({"type": "done"})
For the HTTP/SSE alternative, see the FastAPI server guide. For the vLLM setup, follow the production deployment guide.
Python WebSocket Client
Build a client that connects, sends messages, and processes streaming tokens.
import asyncio
import json
import websockets
async def chat(uri: str, message: str):
async with websockets.connect(uri) as ws:
await ws.send(json.dumps({
"type": "message",
"messages": [{"role": "user", "content": message}],
"max_tokens": 512
}))
while True:
response = json.loads(await ws.recv())
if response["type"] == "token":
print(response["content"], end="", flush=True)
elif response["type"] == "done":
print("\n--- Generation complete ---")
break
elif response["type"] == "start":
print("--- Generating ---")
asyncio.run(chat("ws://localhost:8080/ws/chat", "Explain CUDA memory hierarchy."))
Handling Concurrent Connections
WebSocket connections are long-lived. Manage GPU resources by limiting concurrent inference requests.
inference_semaphore = asyncio.Semaphore(4) # Max 4 concurrent GPU requests
async def handle_message(ws: WebSocket, data: dict):
async with inference_semaphore:
messages = data.get("messages", [])
await ws.send_json({"type": "start"})
stream = await client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=messages,
max_tokens=data.get("max_tokens", 1024),
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
await ws.send_json({"type": "token", "content": content})
await ws.send_json({"type": "done"})
The semaphore prevents GPU overload when many clients connect simultaneously. For queue-based concurrency control, see the Redis queue guide.
Browser Client
Connect from JavaScript in the browser for real-time chat interfaces.
const ws = new WebSocket("ws://your-gpu-server:8080/ws/chat");
ws.onopen = () => {
ws.send(JSON.stringify({
type: "message",
messages: [{ role: "user", content: "Hello, AI!" }],
max_tokens: 512
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "token":
document.getElementById("output").textContent += data.content;
break;
case "done":
console.log("Generation complete");
break;
}
};
For a complete React chat UI using this WebSocket approach, see the React chat guide.
Production Deployment
Deploy with Uvicorn behind Nginx with WebSocket proxy support.
# Run the server
uvicorn app:app --host 0.0.0.0 --port 8080 --workers 1
# Nginx WebSocket proxy
location /ws/ {
proxy_pass http://127.0.0.1:8080;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 3600s;
}
The extended proxy_read_timeout prevents Nginx from closing idle WebSocket connections. For gRPC alternatives that offer even lower latency, see the gRPC inference guide. Add GPU monitoring to track connection counts and inference latency. The self-hosting guide covers infrastructure, and our tutorials section has additional real-time patterns.
Build Real-Time AI on Dedicated GPUs
Deploy WebSocket AI applications on bare-metal GPU servers. Bidirectional streaming, zero latency overhead.
Browse GPU Servers