What You’ll Connect
After this guide, your applications will submit AI inference requests to a RabbitMQ queue and receive results asynchronously — decoupling request submission from GPU processing. Workers on your dedicated GPU server consume messages from the queue, process them through vLLM, and publish results back. This pattern handles traffic spikes gracefully, survives server restarts without losing requests, and scales horizontally by adding more GPU workers.
The integration uses RabbitMQ as a reliable message broker between your application and GPU inference workers. Producers publish inference tasks (text to summarise, images to classify, documents to embed) to a task queue. GPU workers consume tasks at their processing speed, call your OpenAI-compatible API, and publish results to a results queue or callback URL.
Prerequisites
- A GigaGPU server running a self-hosted LLM (setup guide)
- RabbitMQ 3.12+ installed (Docker:
docker run -d rabbitmq:3-management) - Python 3.10+ with
pikaandrequests - Network access between RabbitMQ, your app, and the GPU endpoint
Integration Steps
Deploy RabbitMQ and create two queues: ai_tasks for incoming inference requests and ai_results for completed results. Configure the task queue as durable with message persistence so requests survive broker restarts. Set a prefetch count on the worker to control how many tasks it processes concurrently — matching your GPU’s batch processing capacity.
Build a GPU worker that connects to RabbitMQ, consumes messages from the task queue, calls your inference endpoint, and publishes results. The worker acknowledges messages only after successful processing, so failed tasks automatically re-queue. For batch efficiency, the worker can accumulate messages over a short window and send them to vLLM as a batch.
Add a producer client that your applications use to submit inference tasks. The producer publishes a JSON message with the task type, input data, and a correlation ID. The consumer can either publish results to the results queue or call a webhook URL specified in the task message.
Code Example
RabbitMQ producer and GPU worker for AI inference from your self-hosted models:
import pika, json, uuid, requests
RABBIT_URL = "amqp://guest:guest@localhost:5672"
GPU_URL = "http://localhost:8000/v1/chat/completions"
GPU_KEY = "your-api-key"
# --- Producer: submit AI tasks ---
def submit_task(text, task_type="summarise", callback_queue="ai_results"):
conn = pika.BlockingConnection(pika.URLParameters(RABBIT_URL))
ch = conn.channel()
ch.queue_declare(queue="ai_tasks", durable=True)
task = {
"id": str(uuid.uuid4()),
"type": task_type,
"text": text,
"callback_queue": callback_queue
}
ch.basic_publish(
exchange="", routing_key="ai_tasks",
body=json.dumps(task),
properties=pika.BasicProperties(delivery_mode=2) # persistent
)
conn.close()
return task["id"]
# --- GPU Worker: process AI tasks ---
def process_task(task):
prompts = {
"summarise": f"Summarise this text:\n{task['text']}",
"classify": f"Classify this text: {task['text']}",
"embed": None # Use embedding endpoint
}
resp = requests.post(GPU_URL, json={
"model": "meta-llama/Llama-3-8b-chat-hf",
"messages": [{"role": "user", "content": prompts[task["type"]]}],
"max_tokens": 500, "temperature": 0.2
}, headers={"Authorization": f"Bearer {GPU_KEY}"})
return resp.json()["choices"][0]["message"]["content"]
def worker():
conn = pika.BlockingConnection(pika.URLParameters(RABBIT_URL))
ch = conn.channel()
ch.queue_declare(queue="ai_tasks", durable=True)
ch.basic_qos(prefetch_count=4)
def on_message(ch, method, props, body):
task = json.loads(body)
print(f"Processing task {task['id']} ({task['type']})")
result = process_task(task)
# Publish result
ch.basic_publish(
exchange="", routing_key=task["callback_queue"],
body=json.dumps({"id": task["id"], "result": result})
)
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_consume(queue="ai_tasks", on_message_callback=on_message)
print("GPU worker ready. Waiting for tasks...")
ch.start_consuming()
if __name__ == "__main__":
worker()
Testing Your Integration
Start the GPU worker and submit a test task using the producer function. Check the results queue for the completed response. Submit 100 tasks rapidly and verify they process in order without loss. Stop the worker mid-processing, restart it, and verify unacknowledged tasks re-deliver automatically.
Monitor the RabbitMQ management UI (port 15672) to watch queue depth, consumer throughput, and message rates. Verify that the prefetch count matches your GPU’s concurrent processing capacity — too high wastes GPU memory, too low leaves the GPU idle between tasks.
Production Tips
Set up dead letter queues for tasks that fail repeatedly — after three retries, route the message to a DLQ for manual inspection rather than blocking the main queue. Add priority queues so urgent tasks (real-time user requests) process before batch jobs (overnight document processing). Monitor queue depth alerts to trigger GPU scaling when backlog grows.
For multi-GPU deployments, multiple workers consume from the same queue with RabbitMQ distributing tasks round-robin. Each worker runs on a different GPU server, providing horizontal scaling. Add task-type routing with topic exchanges so specialised GPU workers handle specific inference types. Explore more tutorials or get started with GigaGPU to power your message-driven AI pipeline.