Skip to main content
Back to Blog
AI/MLWeb DevelopmentDatabases
24 April 202619 min readUpdated 24 April 2026

Efficient Storage and Querying of AI Inference Results with MongoDB

Introduction This tutorial outlines how to use FastAPI with MongoDB to handle image uploads, run vision model inference, store predictions, and set up query endpoints for analyt...

Efficient Storage and Querying of AI Inference Results with MongoDB

Introduction

This tutorial outlines how to use FastAPI with MongoDB to handle image uploads, run vision model inference, store predictions, and set up query endpoints for analytics. For those seeking an introduction to FastAPI with NoSQL databases, a broader tutorial is available elsewhere.

Computer vision models create structured outputs such as image captions, detected objects, and their confidence scores. While running these models is one task, storing the results so they can be queried, filtered, and utilized for applications is another.

In this guide, you'll develop an application that uploads images, stores them in a cloud object storage service, processes them using a vision model via an API, and saves the structured results in MongoDB. You'll also establish query endpoints to filter data by detected labels, confidence levels, and processing status.

What You'll Learn

  • How to store nested model outputs (e.g., captions, labels, confidence scores) in MongoDB documents.
  • Querying nested arrays using dot notation and $elemMatch for compound filters.
  • Writing aggregation pipelines to compute label frequency and average confidence.
  • Using FastAPI background tasks to run inference asynchronously, keeping upload responses fast.

The complete source code is available in the project repository on GitHub.

Data flows through the system as follows:

1. Client uploads an image via POST /upload  
2. The image is stored in cloud storage  
3. A "pending" document is inserted into MongoDB  
4. A background task processes the image via the API  
5. The API returns a caption and detected objects with confidence scores  
6. The MongoDB document is updated with inference results

Once images are processed, the API exposes five endpoints:

| Endpoint | Purpose | |----------------------|----------------------------------------------| | POST /upload | Upload an image and trigger inference | | GET /assets | List assets, filter by label, confidence, or status | | GET /assets/{id} | Retrieve a single asset with its inference results | | GET /insights/labels | Count how often each label appears across all images | | GET /insights/confidence | Compute average confidence per label |

Illustration for: | Endpoint             | Purpo...

By the end of this tutorial, you'll have a local API running with images stored in cloud storage, inference results in MongoDB, and query endpoints using dot notation, $elemMatch, and aggregation pipelines.

Key Takeaways

  • FastAPI background tasks keep the upload API responsive while running model inference asynchronously.
  • Store large image files in cloud storage and persist structured inference metadata in MongoDB documents.
  • Use MongoDB dot notation and $elemMatch for precise filtering on nested labels and confidence values.
  • Leverage MongoDB aggregation pipelines to compute label frequency and average confidence directly in the database.
  • This architecture is suitable for multimodal AI workloads that require synchronized object storage and queryable metadata.

Illustration for: - FastAPI background tasks kee...

Prerequisites

To follow this tutorial, you will need:

  • Python 3.10 or later installed. Instructions for setting up Python are available online.
  • A cloud storage account with access to Spaces. You can create Spaces with a quickstart guide available online. Obtain your access key and secret key from the control panel.
  • A MongoDB cluster or local MongoDB instance. Instructions for setting up a free-tier cluster or a local install are available.
  • An account with an API key for the inference service. The free tier is sufficient for this tutorial.
  • Familiarity with Python and REST APIs. Review relevant FastAPI documentation if needed.

Illustration for: - Python 3.10 or later install...

Step 1: Setting Up the Project and Configuring Cloud Storage

You will first set up the project, install dependencies, and configure the connection to cloud storage.

Create a project directory and set up a virtual environment:

mkdir multimodal-insights && cd multimodal-insights
python -m venv venv
source venv/bin/activate

Create a requirements.txt file with the project dependencies:

fastapi==0.115.6  
uvicorn==0.34.0  
python-multipart==0.0.20  
pymongo==4.11.3  
boto3==1.36.14  
requests==2.32.3  
pydantic-settings==2.7.1

Install the dependencies:

pip install -r requirements.txt

Create the app directory and an empty __init__.py file:

mkdir app && touch app/__init__.py

Create a .env file to store your credentials:

# Cloud Storage
SPACES_KEY=your_spaces_access_key  
SPACES_SECRET=your_spaces_secret_key  
SPACES_ENDPOINT=https://nyc3.digitaloceanspaces.com  
SPACES_BUCKET=your_bucket_name

# Groq API
GROQ_API_KEY=your_groq_api_key

# MongoDB
MONGODB_URI=mongodb+srv://username:password@cluster0.example.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0  
MONGODB_DB=multimodal_insights

Replace placeholders with your actual credentials. Next, create a configuration module to load these environment variables:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):  
    SPACES_KEY: str  
    SPACES_SECRET: str  
    SPACES_ENDPOINT: str  
    SPACES_BUCKET: str

    GROQ_API_KEY: str

    MONGODB_URI: str  
    MONGODB_DB: str = "multimodal_insights"

    class Config:  
        env_file = ".env"

settings = Settings()

The Settings class loads values from environment variables and the .env file. Now, implement a helper to upload files to cloud storage:

import uuid
import boto3
from app.config import settings

def get_spaces_client():  
    return boto3.client(  
        "s3",  
        endpoint_url=settings.SPACES_ENDPOINT,  
        aws_access_key_id=settings.SPACES_KEY,  
        aws_secret_access_key=settings.SPACES_SECRET,  
    )

def upload_file(file_bytes: bytes, original_filename: str, content_type: str) -> dict:  
    client = get_spaces_client()

    extension = original_filename.rsplit(".", 1)[-1] if "." in original_filename else "bin"  
    key = f"uploads/{uuid.uuid4().hex}.{extension}"

    client.put_object(  
        Bucket=settings.SPACES_BUCKET,  
        Key=key,  
        Body=file_bytes,  
        ContentType=content_type,  
        ACL="public-read",  
    )

    url = f"{settings.SPACES_ENDPOINT}/{settings.SPACES_BUCKET}/{key}"

    return {"key": key, "url": url}

This setup allows credentials to be loaded from the environment and files to be uploaded to cloud storage. Next, design the MongoDB document schema for storing model outputs.

Step 2: Designing the MongoDB Document Schema for Model Outputs

Vision models provide outputs suitable for nested documents. An image yields a caption and detected objects, each with a label and confidence score.

Store this within a single document for each image asset. Here's an example document structure:

{  
  "_id": ObjectId("..."),  
  "filename": "street-photo.jpg",  
  "spaces_key": "uploads/abc123.jpg",  
  "spaces_url": "https://your-storage-url/abc123.jpg",  
  "status": "completed",  
  "inference": {  
    "caption": "a busy city street with tall buildings and cars",  
    "labels": [  
      {"name": "cars", "confidence": 0.92},  
      {"name": "buildings", "confidence": 0.98},  
      {"name": "people", "confidence": 0.85}  
    ]  
  },  
  "error": null,  
  "created_at": ISODate("2025-01-15T10:30:00Z"),  
  "updated_at": ISODate("2025-01-15T10:30:05Z")  
}

The status field indicates the processing lifecycle: "pending", "completed", or "failed". The inference field remains null during processing and gets populated post-inference. Each label includes a name and a confidence score between 0 and 1.

With this structure, you can query nested fields using dot notation and $elemMatch for compound filters on array elements. Implement the database module to handle this structure:

from datetime import datetime, timezone
from bson import ObjectId  
from pymongo import MongoClient
from app.config import settings

client = MongoClient(settings.MONGODB_URI, appname="devrel-tutorial-multimodal-insights")  
db = client[settings.MONGODB_DB]  
assets_collection = db["assets"]

def create_indexes():  
    assets_collection.create_index("inference.labels.name")  
    assets_collection.create_index("inference.labels.confidence")  
    assets_collection.create_index("status")

def insert_pending_asset(filename: str, spaces_key: str, spaces_url: str) -> str:  
    doc = {  
        "filename": filename,  
        "spaces_key": spaces_key,  
        "spaces_url": spaces_url,  
        "status": "pending",  
        "inference": None,  
        "error": None,  
        "created_at": datetime.now(timezone.utc),  
        "updated_at": datetime.now(timezone.utc),  
    }  
    result = assets_collection.insert_one(doc)  
    return str(result.inserted_id)

def update_asset_inference(asset_id: str, inference: dict):  
    assets_collection.update_one(  
        {"_id": ObjectId(asset_id)},  
        {  
            "$set": {  
                "status": "completed",  
                "inference": inference,  
                "updated_at": datetime.now(timezone.utc),  
            }  
        },  
    )

def mark_asset_failed(asset_id: str, error_message: str):  
    assets_collection.update_one(  
        {"_id": ObjectId(asset_id)},  
        {  
            "$set": {  
                "status": "failed",  
                "error": error_message,  
                "updated_at": datetime.now(timezone.utc),  
            }  
        },  
    )

def get_asset(asset_id: str) -> dict | None:  
    doc = assets_collection.find_one({"_id": ObjectId(asset_id)})  
    if doc:  
        doc["id"] = str(doc.pop("_id"))  
    return doc

def query_assets(  
    label: str | None = None,  
    min_confidence: float | None = None,  
    status: str | None = None,  
    skip: int = 0,  
    limit: int = 20,  
) -> tuple[list[dict], int]:  
    query = {}

    if status:  
        query["status"] = status

    if label and min_confidence is not None:  
        query["inference.labels"] = {  
            "$elemMatch": {"name": label, "confidence": {"$gte": min_confidence}}  
        }  
    elif label:  
        query["inference.labels.name"] = label  
    elif min_confidence is not None:  
        query["inference.labels.confidence"] = {"$gte": min_confidence}

    total = assets_collection.count_documents(query)  
    cursor = assets_collection.find(query).sort("_id", 1).skip(skip).limit(limit)

    results = []  
    for doc in cursor:  
        doc["id"] = str(doc.pop("_id"))  
        results.append(doc)

    return results, total

def aggregate_label_counts() -> list[dict]:  
    pipeline = [  
        {"$match": {"status": "completed"}},  
        {"$unwind": "$inference.labels"},  
        {"$group": {"_id": "$inference.labels.name", "count": {"$sum": 1}}},  
        {"$sort": {"count": -1}},  
        {"$project": {"_id": 0, "label": "$_id", "count": 1}},  
    ]  
    return list(assets_collection.aggregate(pipeline))

def aggregate_avg_confidence() -> list[dict]:  
    pipeline = [  
        {"$match": {"status": "completed"}},  
        {"$unwind": "$inference.labels"},  
        {  
            "$group": {  
                "_id": "$inference.labels.name",  
                "average_confidence": {"$avg": "$inference.labels.confidence"},  
            }  
        },  
        {"$sort": {"average_confidence": -1}},  
        {  
            "$project": {  
                "_id": 0,  
                "label": "$_id",  
                "average_confidence": {"$round": ["$average_confidence", 4]},  
            }  
        },  
    ]  
    return list(assets_collection.aggregate(pipeline))

The MongoClient connects to your cluster, and ObjectId converts string IDs to MongoDB’s native type. Indices are created on fields most queried. CRUD functions handle document lifecycle, and the query_assets function builds a query dynamically. Aggregation functions compute label frequency and confidence metrics.

Pydantic models validate API inputs and outputs:

from pydantic import BaseModel

class Label(BaseModel):  
    name: str  
    confidence: float

class InferenceResult(BaseModel):  
    caption: str  
    labels: list[Label]

class UploadResponse(BaseModel):  
    asset_id: str  
    status: str  
    spaces_url: str

class AssetResponse(BaseModel):  
    id: str  
    filename: str  
    spaces_key: str  
    spaces_url: str  
    status: str  
    inference: InferenceResult | None = None  
    error: str | None = None

class AssetListResponse(BaseModel):  
    assets: list[AssetResponse]  
    total: int  
    skip: int  
    limit: int

class LabelCount(BaseModel):  
    label: str  
    count: int

class LabelConfidence(BaseModel):  
    label: str  
    average_confidence: float

These models ensure data integrity, with the database layer handling document operations and the response models enforcing consistent API responses.

Step 3: Wiring the Upload-to-Inference Pipeline with Background Tasks

The pipeline consists of three components: a module to call the inference API, a processing function to update MongoDB with results, and a FastAPI app to trigger these processes.

The inference module interacts with the API:

import base64  
import json
import requests
from app.config import settings

GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions"  
VISION_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct"

SYSTEM_PROMPT = """You are an image analysis assistant. Analyze the provided image and return a JSON object with exactly this structure:

{  
  "caption": "a one-sentence description of the image",  
  "labels": [  
    {"name": "object_name", "confidence": 0.95},  
    {"name": "another_object", "confidence": 0.82}  
  ]  
}

Rules:  
- The caption should be a single, concise sentence describing the image content.  
- The labels array should list every distinct object you can identify in the image.  
- Confidence should be a float between 0.0 and 1.0 representing how certain you are the object is present.  
- Use lowercase for all label names.  
- Return ONLY the JSON object, no other text."""

def run_inference(image_bytes: bytes) -> dict:  
    b64_image = base64.b64encode(image_bytes).decode("utf-8")

    response = requests.post(  
        GROQ_API_URL,  
        headers={  
            "Authorization": f"Bearer {settings.GROQ_API_KEY}",  
            "Content-Type": "application/json",  
        },  
        json={  
            "model": VISION_MODEL,  
            "messages": [  
                {"role": "system", "content": SYSTEM_PROMPT},  
                {  
                    "role": "user",  
                    "content": [  
                        {"type": "text", "text": "Analyze this image."},  
                        {  
                            "type": "image_url",  
                            "image_url": {  
                                "url": f"data:image/jpeg;base64,{b64_image}",  
                            },  
                        },  
                    ],  
                },  
            ],  
            "response_format": {"type": "json_object"},  
            "temperature": 0,  
        },  
        timeout=60,  
    )  
    response.raise_for_status()

    content = response.json()["choices"][0]["message"]["content"]  
    result = json.loads(content)

    labels = []  
    for label in result.get("labels", []):  
        labels.append(  
            {  
                "name": label["name"],  
                "confidence": round(float(label["confidence"]), 4),  
            }  
        )

    return {  
        "caption": result["caption"],  
        "labels": labels,  
    }

The function encodes the image and sends it to the API, parsing the response to extract structured data. The processing module integrates this with MongoDB:

import requests
from app.database import mark_asset_failed, update_asset_inference  
from app.inference import run_inference

def process_asset(asset_id: str, spaces_url: str):  
    try:  
        response = requests.get(spaces_url, timeout=30)  
        response.raise_for_status()  
        image_bytes = response.content

        inference_result = run_inference(image_bytes)  
        update_asset_inference(asset_id, inference_result)

    except Exception as exc:  
        mark_asset_failed(asset_id, str(exc))

This function runs as a background task after an image upload, ensuring any failures are recorded with a status of "failed".

Finally, create a FastAPI application to manage this process:

from contextlib import asynccontextmanager
from fastapi import BackgroundTasks, FastAPI, HTTPException, UploadFile
from app.database import (  
    aggregate_avg_confidence,  
    aggregate_label_counts,  
    create_indexes,  
    get_asset,  
    insert_pending_asset,  
    query_assets,  
)  
from app.models import (  
    AssetListResponse,  
    AssetResponse,  
    LabelConfidence,  
    LabelCount,  
    UploadResponse,  
)  
from app.pipeline import process_asset  
from app.spaces import upload_file

@asynccontextmanager  
async def lifespan(app: FastAPI):  
    create_indexes()  
    yield

app = FastAPI(  
    title="Multi-Modal Insights API",  
    description="Upload images, run inference, and query structured results from MongoDB.",  
    lifespan=lifespan,  
)

from fastapi import File

@app.post("/upload", response_model=UploadResponse)  
async def upload_image(background_tasks: BackgroundTasks, file: UploadFile = File(...)):  
    if not file.content_type or not file.content_type.startswith("image/"):  
        raise HTTPException(status_code=400, detail="File must be an image.")

    file_bytes = await file.read()  
    result = upload_file(file_bytes, file.filename, file.content_type)

    asset_id = insert_pending_asset(  
        filename=file.filename,  
        spaces_key=result["key"],  
        spaces_url=result["url"],  
    )

    background_tasks.add_task(process_asset, asset_id, result["url"])

    return UploadResponse(  
        asset_id=asset_id,  
        status="pending",  
        spaces_url=result["url"],  
    )

The POST /upload endpoint uses UploadFile to accept image uploads, validating them before uploading to cloud storage. A pending document is inserted into MongoDB, and the inference pipeline is scheduled using BackgroundTasks to run asynchronously.

Step 4: Building Query and Filtering Endpoints for Model-Generated Insights

With uploads and inference in place, the final step is to implement query endpoints to access the stored data.

Add these endpoints to your FastAPI application:

from fastapi import Query

@app.get("/assets", response_model=AssetListResponse)  
async def list_assets(  
    label: str | None = None,  
    min_confidence: float | None = None,  
    status: str | None = None,  
    skip: int = Query(default=0, ge=0),  
    limit: int = Query(default=20, ge=1, le=100),  
):  
    results, total = query_assets(  
        label=label,  
        min_confidence=min_confidence,  
        status=status,  
        skip=skip,  
        limit=limit,  
    )

    assets = []  
    for doc in results:  
        assets.append(AssetResponse(**doc))

    return AssetListResponse(assets=assets, total=total, skip=skip, limit=limit)

@app.get("/assets/{asset_id}", response_model=AssetResponse)  
async def get_single_asset(asset_id: str):  
    doc = get_asset(asset_id)  
    if not doc:  
        raise HTTPException(status_code=404, detail="Asset not found.")  
    return AssetResponse(**doc)

@app.get("/insights/labels", response_model=list[LabelCount])  
async def label_counts():  
    return aggregate_label_counts()

@app.get("/insights/confidence", response_model=list[LabelConfidence])  
async def label_confidence():  
    return aggregate_avg_confidence()

These endpoints enable complex queries and aggregations, making the API data-rich and useful for analysis. The GET /assets endpoint allows filtering by label, confidence, and status, while aggregation endpoints provide insights on label frequency and confidence.

Step 5: Testing the Full Workflow

With the application complete, it's time to test the workflow using FastAPI’s interactive documentation.

Start the server:

uvicorn app.main:app --reload

Visit http://127.0.0.1:8000/docs to access the Swagger UI. This interface allows you to interact with all the endpoints and validate the complete application workflow.

Upload images, check asset statuses, query with filters, and run aggregation endpoints to explore the system's capabilities.

If any uploads fail, check the error field for troubleshooting information.

Conclusion

You now have a FastAPI application that efficiently uploads images, processes them with a vision model, and stores results in MongoDB. The API supports complex queries and aggregations, making it ideal for multimodal AI applications.

This setup is broadly applicable to AI pipelines, with MongoDB's dot notation enabling direct queries on nested fields, and $elemMatch maintaining query accuracy for compound conditions. Aggregation pipelines offload computation to the database, keeping the API streamlined.

To expand this project, consider deploying the API on a cloud platform and scaling the workflow.