RTX 3050 - Order Now
Home / Blog / Tutorials / Celery + GPU: Distributed AI Tasks
Tutorials

Celery + GPU: Distributed AI Tasks

Complete guide to running distributed AI tasks with Celery on GPU servers covering task routing, GPU worker configuration, result backends, chaining, and scaling inference across multiple machines.

You will set up Celery to distribute AI inference tasks across GPU workers so that long-running model operations process asynchronously without blocking your application. By the end, you will have Celery workers on your GPU servers processing inference tasks from a shared queue with result tracking and task chaining.

Architecture

Celery uses a broker (Redis or RabbitMQ) to distribute tasks to workers. Each GPU server runs a Celery worker that pulls tasks, runs inference, and stores results in the result backend.

ComponentRoleServer
ApplicationSubmits tasksWeb server
BrokerMessage queueRedis server
WorkersGPU inferenceGPU servers
Result BackendStores outputsRedis server

Setup and Configuration

pip install celery redis openai

# celery_app.py
from celery import Celery

app = Celery(
    "ai_tasks",
    broker="redis://redis-server:6379/0",
    backend="redis://redis-server:6379/1"
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,          # Requeue if worker crashes
    worker_prefetch_multiplier=1,  # One task at a time per worker
    task_time_limit=300,           # 5 minute hard limit
    task_soft_time_limit=240,      # 4 minute soft limit
)

GPU Inference Tasks

Define Celery tasks that run inference against a vLLM backend on the GPU server.

# tasks.py
from celery_app import app
from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="not-needed")

@app.task(bind=True, max_retries=3, default_retry_delay=10)
def generate_text(self, messages: list, max_tokens: int = 256,
                  temperature: float = 0.7) -> dict:
    try:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=messages,
            max_tokens=max_tokens,
            temperature=temperature
        )
        return {
            "content": response.choices[0].message.content,
            "tokens": response.usage.total_tokens,
            "model": response.model
        }
    except Exception as exc:
        raise self.retry(exc=exc)

@app.task(bind=True, max_retries=2)
def batch_generate(self, prompts: list, max_tokens: int = 256) -> list:
    results = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=max_tokens
        )
        results.append(response.choices[0].message.content)
    return results

@app.task
def summarise_document(text: str) -> str:
    chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
    summaries = []
    for chunk in chunks:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "user", "content": f"Summarise:\n{chunk}"}],
            max_tokens=256
        )
        summaries.append(response.choices[0].message.content)
    return " ".join(summaries)

Task Chaining and Workflows

Chain tasks together for multi-step AI pipelines like extract-summarise-classify.

from celery import chain, group, chord

# Sequential pipeline
pipeline = chain(
    generate_text.s([{"role": "user", "content": "Extract key points from: ..."}]),
    summarise_document.s()
)
result = pipeline.apply_async()

# Parallel processing
parallel = group(
    generate_text.s([{"role": "user", "content": prompt}])
    for prompt in ["Summarise topic A", "Summarise topic B", "Summarise topic C"]
)
results = parallel.apply_async()

# Fan-out then combine
workflow = chord(
    [generate_text.s([{"role": "user", "content": p}]) for p in prompts],
    combine_results.s()
)
workflow.apply_async()

GPU-Aware Task Routing

Route specific tasks to workers with particular GPU capabilities.

# celery_app.py
app.conf.task_routes = {
    "tasks.generate_text": {"queue": "gpu-inference"},
    "tasks.batch_generate": {"queue": "gpu-batch"},
    "tasks.summarise_document": {"queue": "gpu-inference"},
}

# Start GPU worker listening to specific queues
# celery -A celery_app worker -Q gpu-inference -c 1 --hostname=gpu1@%h
# celery -A celery_app worker -Q gpu-batch -c 1 --hostname=gpu2@%h

Set concurrency to 1 per GPU worker (-c 1) to prevent VRAM contention. Scale by adding more GPU servers running additional workers. For the simpler Redis Queue approach, see the Redis queue guide.

Monitoring and Production

# Install Flower for Celery monitoring
pip install flower
celery -A celery_app flower --port=5555

# Or export Prometheus metrics
pip install celery-exporter

Integrate with Prometheus and Grafana for GPU utilisation alongside task metrics. For container orchestration, see Kubernetes GPU pods. For webhook result delivery, check the webhook integration guide. The self-hosting guide covers infrastructure, and our tutorials section has more distributed patterns. Set up the backend with the vLLM production guide.

Distribute AI Tasks Across Dedicated GPUs

Run Celery workers on bare-metal GPU servers for distributed inference. Scale horizontally, process asynchronously.

Browse GPU Servers

Need a Dedicated GPU Server?

Deploy from RTX 3050 to RTX 5090. Full root access, NVMe storage, 1Gbps — UK datacenter.

Browse GPU Servers

admin

We benchmark, deploy, and optimise GPU infrastructure for AI workloads. All data in our guides comes from real-world testing on our UK-based dedicated GPU servers.

Ready to deploy your AI workload?

Dedicated GPU servers from our UK datacenter. NVMe storage, 1Gbps networking, full root access.

Browse GPU Servers Contact Sales

Have a question? Need help?