RTX 3050 - Order Now
Home / Blog / Tutorials / AI Workflow: Celery + Redis + GPU
Tutorials

AI Workflow: Celery + Redis + GPU

Build a production AI workflow system using Celery and Redis to orchestrate multi-step GPU inference tasks with queuing, retries, and priority scheduling.

You will build a production workflow orchestrator that manages multi-step AI tasks across a GPU server: queuing inference requests, chaining model outputs, handling retries on failure, and prioritising urgent jobs. A document processing company handling 5,000 documents daily uses this architecture to manage OCR, classification, extraction, and summarisation steps with 99.7% completion rates and automatic retry on the 0.3% that fail. The system runs on a single dedicated GPU server with Celery and Redis.

Workflow Architecture

ComponentToolRole
Task queueCeleryTask definition and orchestration
Message brokerRedisQueue storage and pub/sub
Result backendRedisTask state and results
GPU workersCelery workersModel inference execution
MonitoringFlowerTask dashboard

Celery Configuration

from celery import Celery

app = Celery("gpu_workflows",
             broker="redis://localhost:6379/0",
             backend="redis://localhost:6379/1")

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,         # Re-queue if worker crashes
    worker_prefetch_multiplier=1, # One task at a time per GPU worker
    task_track_started=True,
    task_time_limit=600,          # 10 min max per task
    task_soft_time_limit=540,     # Warn at 9 min
    task_default_queue="gpu",
    task_routes={
        "tasks.ocr_extract": {"queue": "gpu"},
        "tasks.llm_classify": {"queue": "gpu"},
        "tasks.llm_summarise": {"queue": "gpu"},
        "tasks.notify": {"queue": "cpu"},  # Non-GPU tasks on separate queue
    }
)

Setting worker_prefetch_multiplier=1 ensures each GPU worker handles one inference task at a time, preventing VRAM overflows from concurrent model execution.

Defining GPU Tasks

from celery import shared_task
import torch

@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def ocr_extract(self, document_path: str) -> dict:
    try:
        from paddleocr import PaddleOCR
        ocr = get_or_create_model("paddleocr")
        result = ocr.ocr(document_path, cls=True)
        text = "\n".join([line[1][0] for line in result[0] if line[1][1] > 0.7])
        return {"text": text, "pages": 1, "status": "success"}
    except torch.cuda.OutOfMemoryError:
        torch.cuda.empty_cache()
        raise self.retry(exc=Exception("OOM"), countdown=60)

@shared_task(bind=True, max_retries=3)
def llm_classify(self, text: str, categories: list) -> dict:
    try:
        from openai import OpenAI
        client = OpenAI(base_url="http://localhost:8000/v1", api_key="none")
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[{"role": "system",
                       "content": f"Classify into: {categories}. Return JSON: {{\"category\": \"\", \"confidence\": 0.0}}"},
                      {"role": "user", "content": text[:4000]}],
            max_tokens=100, temperature=0.0
        )
        return parse_json(response.choices[0].message.content)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=30)

@shared_task(bind=True, max_retries=3)
def llm_summarise(self, text: str, max_words: int = 150) -> dict:
    client = OpenAI(base_url="http://localhost:8000/v1", api_key="none")
    response = client.chat.completions.create(
        model="meta-llama/Llama-3.1-8B-Instruct",
        messages=[{"role": "system",
                   "content": f"Summarise in under {max_words} words."},
                  {"role": "user", "content": text[:8000]}],
        max_tokens=300, temperature=0.3
    )
    return {"summary": response.choices[0].message.content}

Chaining Tasks into Workflows

from celery import chain, group, chord

def process_document(document_path: str):
    """Full document processing pipeline."""
    workflow = chain(
        ocr_extract.s(document_path),
        llm_classify.s(categories=["invoice", "contract", "letter", "report"]),
        llm_summarise.s(max_words=150),
        store_result.s(document_path=document_path)
    )
    return workflow.apply_async(priority=5)

def process_batch(document_paths: list):
    """Process multiple documents in parallel then aggregate."""
    workflow = chord(
        [process_document_chain(path) for path in document_paths],
        aggregate_results.s()
    )
    return workflow.apply_async()

def process_urgent(document_path: str):
    """Priority processing for urgent documents."""
    workflow = chain(
        ocr_extract.s(document_path),
        llm_classify.s(categories=["invoice", "contract", "letter", "report"]),
        llm_summarise.s(max_words=150),
        store_result.s(document_path=document_path)
    )
    return workflow.apply_async(priority=9)  # Higher priority

Celery chains pass the output of each task as input to the next. The chord pattern processes multiple documents in parallel, then aggregates when all complete.

Running GPU Workers

# Terminal 1: GPU worker (concurrency=1 for single GPU)
# celery -A tasks worker --queue=gpu --concurrency=1 --pool=solo -n gpu@%h

# Terminal 2: CPU worker for non-GPU tasks
# celery -A tasks worker --queue=cpu --concurrency=4 -n cpu@%h

# Terminal 3: Flower monitoring dashboard
# celery -A tasks flower --port=5555

The GPU worker uses --concurrency=1 and --pool=solo to ensure single-threaded execution. CPU workers handle notification, storage, and aggregation tasks with higher concurrency.

Monitoring and Production

Flower provides a real-time dashboard showing task throughput, failure rates, and queue depth. Set Redis alerts for queue lengths exceeding 1,000 (indicating GPU worker saturation). The architecture handles 5,000+ documents daily on a single GPU. For higher throughput, add a second GPU worker on the same server or scale to a multi-GPU setup. Deploy on private infrastructure for document confidentiality. See vLLM hosting for the LLM backend, PaddleOCR hosting for OCR, document AI, GDPR compliance, more tutorials, and use cases.

AI Workflow GPU Servers

Orchestrate multi-step AI pipelines with Celery and Redis on dedicated UK GPU infrastructure.

Browse GPU Servers

Need a Dedicated GPU Server?

Deploy from RTX 3050 to RTX 5090. Full root access, NVMe storage, 1Gbps — UK datacenter.

Browse GPU Servers

admin

We benchmark, deploy, and optimise GPU infrastructure for AI workloads. All data in our guides comes from real-world testing on our UK-based dedicated GPU servers.

Ready to deploy your AI workload?

Dedicated GPU servers from our UK datacenter. NVMe storage, 1Gbps networking, full root access.

Browse GPU Servers Contact Sales

Have a question? Need help?