You will set up Celery to distribute AI inference tasks across GPU workers so that long-running model operations process asynchronously without blocking your application. By the end, you will have Celery workers on your GPU servers processing inference tasks from a shared queue with result tracking and task chaining.
Architecture
Celery uses a broker (Redis or RabbitMQ) to distribute tasks to workers. Each GPU server runs a Celery worker that pulls tasks, runs inference, and stores results in the result backend.
| Component | Role | Server |
|---|---|---|
| Application | Submits tasks | Web server |
| Broker | Message queue | Redis server |
| Workers | GPU inference | GPU servers |
| Result Backend | Stores outputs | Redis server |
Setup and Configuration
pip install celery redis openai
# celery_app.py
from celery import Celery
app = Celery(
"ai_tasks",
broker="redis://redis-server:6379/0",
backend="redis://redis-server:6379/1"
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_acks_late=True, # Requeue if worker crashes
worker_prefetch_multiplier=1, # One task at a time per worker
task_time_limit=300, # 5 minute hard limit
task_soft_time_limit=240, # 4 minute soft limit
)
GPU Inference Tasks
Define Celery tasks that run inference against a vLLM backend on the GPU server.
# tasks.py
from celery_app import app
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="not-needed")
@app.task(bind=True, max_retries=3, default_retry_delay=10)
def generate_text(self, messages: list, max_tokens: int = 256,
temperature: float = 0.7) -> dict:
try:
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=messages,
max_tokens=max_tokens,
temperature=temperature
)
return {
"content": response.choices[0].message.content,
"tokens": response.usage.total_tokens,
"model": response.model
}
except Exception as exc:
raise self.retry(exc=exc)
@app.task(bind=True, max_retries=2)
def batch_generate(self, prompts: list, max_tokens: int = 256) -> list:
results = []
for prompt in prompts:
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens
)
results.append(response.choices[0].message.content)
return results
@app.task
def summarise_document(text: str) -> str:
chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
summaries = []
for chunk in chunks:
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[{"role": "user", "content": f"Summarise:\n{chunk}"}],
max_tokens=256
)
summaries.append(response.choices[0].message.content)
return " ".join(summaries)
Task Chaining and Workflows
Chain tasks together for multi-step AI pipelines like extract-summarise-classify.
from celery import chain, group, chord
# Sequential pipeline
pipeline = chain(
generate_text.s([{"role": "user", "content": "Extract key points from: ..."}]),
summarise_document.s()
)
result = pipeline.apply_async()
# Parallel processing
parallel = group(
generate_text.s([{"role": "user", "content": prompt}])
for prompt in ["Summarise topic A", "Summarise topic B", "Summarise topic C"]
)
results = parallel.apply_async()
# Fan-out then combine
workflow = chord(
[generate_text.s([{"role": "user", "content": p}]) for p in prompts],
combine_results.s()
)
workflow.apply_async()
GPU-Aware Task Routing
Route specific tasks to workers with particular GPU capabilities.
# celery_app.py
app.conf.task_routes = {
"tasks.generate_text": {"queue": "gpu-inference"},
"tasks.batch_generate": {"queue": "gpu-batch"},
"tasks.summarise_document": {"queue": "gpu-inference"},
}
# Start GPU worker listening to specific queues
# celery -A celery_app worker -Q gpu-inference -c 1 --hostname=gpu1@%h
# celery -A celery_app worker -Q gpu-batch -c 1 --hostname=gpu2@%h
Set concurrency to 1 per GPU worker (-c 1) to prevent VRAM contention. Scale by adding more GPU servers running additional workers. For the simpler Redis Queue approach, see the Redis queue guide.
Monitoring and Production
# Install Flower for Celery monitoring
pip install flower
celery -A celery_app flower --port=5555
# Or export Prometheus metrics
pip install celery-exporter
Integrate with Prometheus and Grafana for GPU utilisation alongside task metrics. For container orchestration, see Kubernetes GPU pods. For webhook result delivery, check the webhook integration guide. The self-hosting guide covers infrastructure, and our tutorials section has more distributed patterns. Set up the backend with the vLLM production guide.
Distribute AI Tasks Across Dedicated GPUs
Run Celery workers on bare-metal GPU servers for distributed inference. Scale horizontally, process asynchronously.
Browse GPU Servers