RTX 3050 - Order Now
Home / Blog / Tutorials / Connect MongoDB to AI Pipeline on GPU
Tutorials

Connect MongoDB to AI Pipeline on GPU

Connect MongoDB to your GPU-hosted AI inference pipeline for document enrichment and vector search. This guide covers change streams for real-time processing, Atlas Vector Search with self-hosted embeddings, and building AI-powered queries on your document data.

What You’ll Connect

After this guide, your MongoDB database will enrich documents with AI-generated embeddings, summaries, and classifications from your own GPU server — and support semantic vector search across your entire collection. MongoDB change streams trigger processing on your vLLM endpoint on dedicated GPU hardware whenever documents are created or updated, keeping AI enrichment in sync with your data.

The integration uses MongoDB change streams to watch for document changes and a Python worker that calls your OpenAI-compatible API for AI processing. Embeddings store directly in MongoDB documents, enabling vector search with Atlas Search or a separate vector index. All AI inference runs on your GPU — MongoDB just stores the results.

Prerequisites

  • A GigaGPU server running a self-hosted LLM (setup guide)
  • MongoDB 6.0+ (self-hosted replica set or Atlas)
  • Python 3.10+ with pymongo and requests
  • Network access between MongoDB and your GPU endpoint

Integration Steps

Set up a MongoDB change stream that watches your target collection for insert and update operations. The change stream provides a real-time feed of document changes without polling. A Python worker consumes the stream, extracts the text fields that need AI processing, calls your GPU endpoint, and updates the document with the AI-generated fields.

Define your enrichment schema — add fields like embedding (array of floats), ai_summary (string), ai_category (string), and enriched_at (timestamp) to your documents. The worker populates these fields after AI processing. Create a vector search index on the embedding field for similarity queries.

Build query helpers that combine traditional MongoDB filters with vector similarity search. Your application generates a query embedding from user input, then uses $vectorSearch (Atlas) or a nearest-neighbour query to find semantically similar documents, optionally filtered by other document fields.

Code Example

MongoDB change stream worker with AI enrichment from your self-hosted models:

from pymongo import MongoClient
import requests, time

MONGO_URI = "mongodb://localhost:27017"
EMBEDDING_URL = "http://gpu-server:8001/v1/embeddings"
VLLM_URL = "http://gpu-server:8000/v1/chat/completions"
GPU_KEY = "your-api-key"

client = MongoClient(MONGO_URI)
db = client["myapp"]
collection = db["documents"]

def enrich_document(doc):
    """Generate embedding and summary for a MongoDB document."""
    text = doc.get("content", "") or doc.get("title", "")
    if not text:
        return

    # Generate embedding
    resp = requests.post(EMBEDDING_URL, json={
        "input": [text[:8000]], "model": "bge-large"
    }, headers={"Authorization": f"Bearer {GPU_KEY}"})
    embedding = resp.json()["data"][0]["embedding"]

    # Generate summary
    resp = requests.post(VLLM_URL, json={
        "model": "meta-llama/Llama-3-8b-chat-hf",
        "messages": [{"role": "user",
            "content": f"Summarise in one sentence:\n{text[:4000]}"}],
        "max_tokens": 100, "temperature": 0.2
    }, headers={"Authorization": f"Bearer {GPU_KEY}"})
    summary = resp.json()["choices"][0]["message"]["content"]

    collection.update_one({"_id": doc["_id"]}, {"$set": {
        "embedding": embedding, "ai_summary": summary,
        "enriched_at": time.time()
    }})

def watch_and_enrich():
    """Watch collection for changes and enrich new documents."""
    pipeline = [{"$match": {
        "operationType": {"$in": ["insert", "update"]},
        "fullDocument.enriched_at": {"$exists": False}
    }}]

    with collection.watch(pipeline, full_document="updateLookup") as stream:
        for change in stream:
            doc = change["fullDocument"]
            print(f"Enriching document: {doc['_id']}")
            enrich_document(doc)

def vector_search(query_text, limit=10, filters=None):
    """Semantic search across enriched documents."""
    resp = requests.post(EMBEDDING_URL, json={
        "input": [query_text], "model": "bge-large"
    }, headers={"Authorization": f"Bearer {GPU_KEY}"})
    query_vec = resp.json()["data"][0]["embedding"]

    pipeline = [{"$vectorSearch": {
        "index": "vector_index",
        "path": "embedding",
        "queryVector": query_vec,
        "numCandidates": limit * 10,
        "limit": limit
    }}]
    if filters:
        pipeline[0]["$vectorSearch"]["filter"] = filters

    return list(collection.aggregate(pipeline))

if __name__ == "__main__":
    watch_and_enrich()

Testing Your Integration

Start the change stream worker and insert a test document into your collection. Verify the document is enriched within seconds with an embedding array, summary, and timestamp. Run a vector search query and verify semantically relevant documents rank higher than keyword matches. Insert multiple documents to test batch processing throughput.

Test resilience by stopping and restarting the worker. MongoDB change streams use resume tokens, so the worker should pick up where it left off without missing documents. Test with large documents to ensure text truncation handles the embedding model’s token limit gracefully.

Production Tips

Run the change stream worker as a managed process with automatic restart on failure. Use a resume token stored in a separate collection to survive worker restarts without reprocessing. For backfilling existing documents, run a batch job that processes documents missing the enriched_at field in chunks of 100.

Create compound indexes that combine vector search with traditional field filters for efficient hybrid queries. Monitor the enrichment lag — the time between document insertion and AI enrichment completion — and scale GPU capacity when lag exceeds your SLA. Build an AI chatbot that queries your enriched MongoDB collection. Explore more tutorials or get started with GigaGPU to power your data pipeline.

Need a Dedicated GPU Server?

Deploy from RTX 3050 to RTX 5090. Full root access, NVMe storage, 1Gbps — UK datacenter.

Browse GPU Servers

admin

We benchmark, deploy, and optimise GPU infrastructure for AI workloads. All data in our guides comes from real-world testing on our UK-based dedicated GPU servers.

Ready to deploy your AI workload?

Dedicated GPU servers from our UK datacenter. NVMe storage, 1Gbps networking, full root access.

Browse GPU Servers Contact Sales

Have a question? Need help?