You will build a production workflow orchestrator that manages multi-step AI tasks across a GPU server: queuing inference requests, chaining model outputs, handling retries on failure, and prioritising urgent jobs. A document processing company handling 5,000 documents daily uses this architecture to manage OCR, classification, extraction, and summarisation steps with 99.7% completion rates and automatic retry on the 0.3% that fail. The system runs on a single dedicated GPU server with Celery and Redis.
Workflow Architecture
| Component | Tool | Role |
|---|---|---|
| Task queue | Celery | Task definition and orchestration |
| Message broker | Redis | Queue storage and pub/sub |
| Result backend | Redis | Task state and results |
| GPU workers | Celery workers | Model inference execution |
| Monitoring | Flower | Task dashboard |
Celery Configuration
from celery import Celery
app = Celery("gpu_workflows",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1")
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_acks_late=True, # Re-queue if worker crashes
worker_prefetch_multiplier=1, # One task at a time per GPU worker
task_track_started=True,
task_time_limit=600, # 10 min max per task
task_soft_time_limit=540, # Warn at 9 min
task_default_queue="gpu",
task_routes={
"tasks.ocr_extract": {"queue": "gpu"},
"tasks.llm_classify": {"queue": "gpu"},
"tasks.llm_summarise": {"queue": "gpu"},
"tasks.notify": {"queue": "cpu"}, # Non-GPU tasks on separate queue
}
)
Setting worker_prefetch_multiplier=1 ensures each GPU worker handles one inference task at a time, preventing VRAM overflows from concurrent model execution.
Defining GPU Tasks
from celery import shared_task
import torch
@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def ocr_extract(self, document_path: str) -> dict:
try:
from paddleocr import PaddleOCR
ocr = get_or_create_model("paddleocr")
result = ocr.ocr(document_path, cls=True)
text = "\n".join([line[1][0] for line in result[0] if line[1][1] > 0.7])
return {"text": text, "pages": 1, "status": "success"}
except torch.cuda.OutOfMemoryError:
torch.cuda.empty_cache()
raise self.retry(exc=Exception("OOM"), countdown=60)
@shared_task(bind=True, max_retries=3)
def llm_classify(self, text: str, categories: list) -> dict:
try:
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="none")
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[{"role": "system",
"content": f"Classify into: {categories}. Return JSON: {{\"category\": \"\", \"confidence\": 0.0}}"},
{"role": "user", "content": text[:4000]}],
max_tokens=100, temperature=0.0
)
return parse_json(response.choices[0].message.content)
except Exception as exc:
raise self.retry(exc=exc, countdown=30)
@shared_task(bind=True, max_retries=3)
def llm_summarise(self, text: str, max_words: int = 150) -> dict:
client = OpenAI(base_url="http://localhost:8000/v1", api_key="none")
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[{"role": "system",
"content": f"Summarise in under {max_words} words."},
{"role": "user", "content": text[:8000]}],
max_tokens=300, temperature=0.3
)
return {"summary": response.choices[0].message.content}
Chaining Tasks into Workflows
from celery import chain, group, chord
def process_document(document_path: str):
"""Full document processing pipeline."""
workflow = chain(
ocr_extract.s(document_path),
llm_classify.s(categories=["invoice", "contract", "letter", "report"]),
llm_summarise.s(max_words=150),
store_result.s(document_path=document_path)
)
return workflow.apply_async(priority=5)
def process_batch(document_paths: list):
"""Process multiple documents in parallel then aggregate."""
workflow = chord(
[process_document_chain(path) for path in document_paths],
aggregate_results.s()
)
return workflow.apply_async()
def process_urgent(document_path: str):
"""Priority processing for urgent documents."""
workflow = chain(
ocr_extract.s(document_path),
llm_classify.s(categories=["invoice", "contract", "letter", "report"]),
llm_summarise.s(max_words=150),
store_result.s(document_path=document_path)
)
return workflow.apply_async(priority=9) # Higher priority
Celery chains pass the output of each task as input to the next. The chord pattern processes multiple documents in parallel, then aggregates when all complete.
Running GPU Workers
# Terminal 1: GPU worker (concurrency=1 for single GPU)
# celery -A tasks worker --queue=gpu --concurrency=1 --pool=solo -n gpu@%h
# Terminal 2: CPU worker for non-GPU tasks
# celery -A tasks worker --queue=cpu --concurrency=4 -n cpu@%h
# Terminal 3: Flower monitoring dashboard
# celery -A tasks flower --port=5555
The GPU worker uses --concurrency=1 and --pool=solo to ensure single-threaded execution. CPU workers handle notification, storage, and aggregation tasks with higher concurrency.
Monitoring and Production
Flower provides a real-time dashboard showing task throughput, failure rates, and queue depth. Set Redis alerts for queue lengths exceeding 1,000 (indicating GPU worker saturation). The architecture handles 5,000+ documents daily on a single GPU. For higher throughput, add a second GPU worker on the same server or scale to a multi-GPU setup. Deploy on private infrastructure for document confidentiality. See vLLM hosting for the LLM backend, PaddleOCR hosting for OCR, document AI, GDPR compliance, more tutorials, and use cases.
AI Workflow GPU Servers
Orchestrate multi-step AI pipelines with Celery and Redis on dedicated UK GPU infrastructure.
Browse GPU Servers