Efficient Storage and Querying of AI Inference Results with MongoDB
Introduction This tutorial outlines how to use FastAPI with MongoDB to handle image uploads, run vision model inference, store predictions, and set up query endpoints for analyt...
Introduction
This tutorial outlines how to use FastAPI with MongoDB to handle image uploads, run vision model inference, store predictions, and set up query endpoints for analytics. For those seeking an introduction to FastAPI with NoSQL databases, a broader tutorial is available elsewhere.
Computer vision models create structured outputs such as image captions, detected objects, and their confidence scores. While running these models is one task, storing the results so they can be queried, filtered, and utilized for applications is another.
In this guide, you'll develop an application that uploads images, stores them in a cloud object storage service, processes them using a vision model via an API, and saves the structured results in MongoDB. You'll also establish query endpoints to filter data by detected labels, confidence levels, and processing status.
What You'll Learn
- How to store nested model outputs (e.g., captions, labels, confidence scores) in MongoDB documents.
- Querying nested arrays using dot notation and
$elemMatchfor compound filters. - Writing aggregation pipelines to compute label frequency and average confidence.
- Using FastAPI background tasks to run inference asynchronously, keeping upload responses fast.
The complete source code is available in the project repository on GitHub.
Data flows through the system as follows:
1. Client uploads an image via POST /upload
2. The image is stored in cloud storage
3. A "pending" document is inserted into MongoDB
4. A background task processes the image via the API
5. The API returns a caption and detected objects with confidence scores
6. The MongoDB document is updated with inference results
Once images are processed, the API exposes five endpoints:
| Endpoint | Purpose |
|----------------------|----------------------------------------------|
| POST /upload | Upload an image and trigger inference |
| GET /assets | List assets, filter by label, confidence, or status |
| GET /assets/{id} | Retrieve a single asset with its inference results |
| GET /insights/labels | Count how often each label appears across all images |
| GET /insights/confidence | Compute average confidence per label |

By the end of this tutorial, you'll have a local API running with images stored in cloud storage, inference results in MongoDB, and query endpoints using dot notation, $elemMatch, and aggregation pipelines.
Key Takeaways
- FastAPI background tasks keep the upload API responsive while running model inference asynchronously.
- Store large image files in cloud storage and persist structured inference metadata in MongoDB documents.
- Use MongoDB dot notation and
$elemMatchfor precise filtering on nested labels and confidence values. - Leverage MongoDB aggregation pipelines to compute label frequency and average confidence directly in the database.
- This architecture is suitable for multimodal AI workloads that require synchronized object storage and queryable metadata.

Prerequisites
To follow this tutorial, you will need:
- Python 3.10 or later installed. Instructions for setting up Python are available online.
- A cloud storage account with access to Spaces. You can create Spaces with a quickstart guide available online. Obtain your access key and secret key from the control panel.
- A MongoDB cluster or local MongoDB instance. Instructions for setting up a free-tier cluster or a local install are available.
- An account with an API key for the inference service. The free tier is sufficient for this tutorial.
- Familiarity with Python and REST APIs. Review relevant FastAPI documentation if needed.

Step 1: Setting Up the Project and Configuring Cloud Storage
You will first set up the project, install dependencies, and configure the connection to cloud storage.
Create a project directory and set up a virtual environment:
mkdir multimodal-insights && cd multimodal-insights
python -m venv venv
source venv/bin/activate
Create a requirements.txt file with the project dependencies:
fastapi==0.115.6
uvicorn==0.34.0
python-multipart==0.0.20
pymongo==4.11.3
boto3==1.36.14
requests==2.32.3
pydantic-settings==2.7.1
Install the dependencies:
pip install -r requirements.txt
Create the app directory and an empty __init__.py file:
mkdir app && touch app/__init__.py
Create a .env file to store your credentials:
# Cloud Storage
SPACES_KEY=your_spaces_access_key
SPACES_SECRET=your_spaces_secret_key
SPACES_ENDPOINT=https://nyc3.digitaloceanspaces.com
SPACES_BUCKET=your_bucket_name
# Groq API
GROQ_API_KEY=your_groq_api_key
# MongoDB
MONGODB_URI=mongodb+srv://username:password@cluster0.example.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0
MONGODB_DB=multimodal_insights
Replace placeholders with your actual credentials. Next, create a configuration module to load these environment variables:
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
SPACES_KEY: str
SPACES_SECRET: str
SPACES_ENDPOINT: str
SPACES_BUCKET: str
GROQ_API_KEY: str
MONGODB_URI: str
MONGODB_DB: str = "multimodal_insights"
class Config:
env_file = ".env"
settings = Settings()
The Settings class loads values from environment variables and the .env file. Now, implement a helper to upload files to cloud storage:
import uuid
import boto3
from app.config import settings
def get_spaces_client():
return boto3.client(
"s3",
endpoint_url=settings.SPACES_ENDPOINT,
aws_access_key_id=settings.SPACES_KEY,
aws_secret_access_key=settings.SPACES_SECRET,
)
def upload_file(file_bytes: bytes, original_filename: str, content_type: str) -> dict:
client = get_spaces_client()
extension = original_filename.rsplit(".", 1)[-1] if "." in original_filename else "bin"
key = f"uploads/{uuid.uuid4().hex}.{extension}"
client.put_object(
Bucket=settings.SPACES_BUCKET,
Key=key,
Body=file_bytes,
ContentType=content_type,
ACL="public-read",
)
url = f"{settings.SPACES_ENDPOINT}/{settings.SPACES_BUCKET}/{key}"
return {"key": key, "url": url}
This setup allows credentials to be loaded from the environment and files to be uploaded to cloud storage. Next, design the MongoDB document schema for storing model outputs.
Step 2: Designing the MongoDB Document Schema for Model Outputs
Vision models provide outputs suitable for nested documents. An image yields a caption and detected objects, each with a label and confidence score.
Store this within a single document for each image asset. Here's an example document structure:
{
"_id": ObjectId("..."),
"filename": "street-photo.jpg",
"spaces_key": "uploads/abc123.jpg",
"spaces_url": "https://your-storage-url/abc123.jpg",
"status": "completed",
"inference": {
"caption": "a busy city street with tall buildings and cars",
"labels": [
{"name": "cars", "confidence": 0.92},
{"name": "buildings", "confidence": 0.98},
{"name": "people", "confidence": 0.85}
]
},
"error": null,
"created_at": ISODate("2025-01-15T10:30:00Z"),
"updated_at": ISODate("2025-01-15T10:30:05Z")
}
The status field indicates the processing lifecycle: "pending", "completed", or "failed". The inference field remains null during processing and gets populated post-inference. Each label includes a name and a confidence score between 0 and 1.
With this structure, you can query nested fields using dot notation and $elemMatch for compound filters on array elements. Implement the database module to handle this structure:
from datetime import datetime, timezone
from bson import ObjectId
from pymongo import MongoClient
from app.config import settings
client = MongoClient(settings.MONGODB_URI, appname="devrel-tutorial-multimodal-insights")
db = client[settings.MONGODB_DB]
assets_collection = db["assets"]
def create_indexes():
assets_collection.create_index("inference.labels.name")
assets_collection.create_index("inference.labels.confidence")
assets_collection.create_index("status")
def insert_pending_asset(filename: str, spaces_key: str, spaces_url: str) -> str:
doc = {
"filename": filename,
"spaces_key": spaces_key,
"spaces_url": spaces_url,
"status": "pending",
"inference": None,
"error": None,
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
result = assets_collection.insert_one(doc)
return str(result.inserted_id)
def update_asset_inference(asset_id: str, inference: dict):
assets_collection.update_one(
{"_id": ObjectId(asset_id)},
{
"$set": {
"status": "completed",
"inference": inference,
"updated_at": datetime.now(timezone.utc),
}
},
)
def mark_asset_failed(asset_id: str, error_message: str):
assets_collection.update_one(
{"_id": ObjectId(asset_id)},
{
"$set": {
"status": "failed",
"error": error_message,
"updated_at": datetime.now(timezone.utc),
}
},
)
def get_asset(asset_id: str) -> dict | None:
doc = assets_collection.find_one({"_id": ObjectId(asset_id)})
if doc:
doc["id"] = str(doc.pop("_id"))
return doc
def query_assets(
label: str | None = None,
min_confidence: float | None = None,
status: str | None = None,
skip: int = 0,
limit: int = 20,
) -> tuple[list[dict], int]:
query = {}
if status:
query["status"] = status
if label and min_confidence is not None:
query["inference.labels"] = {
"$elemMatch": {"name": label, "confidence": {"$gte": min_confidence}}
}
elif label:
query["inference.labels.name"] = label
elif min_confidence is not None:
query["inference.labels.confidence"] = {"$gte": min_confidence}
total = assets_collection.count_documents(query)
cursor = assets_collection.find(query).sort("_id", 1).skip(skip).limit(limit)
results = []
for doc in cursor:
doc["id"] = str(doc.pop("_id"))
results.append(doc)
return results, total
def aggregate_label_counts() -> list[dict]:
pipeline = [
{"$match": {"status": "completed"}},
{"$unwind": "$inference.labels"},
{"$group": {"_id": "$inference.labels.name", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$project": {"_id": 0, "label": "$_id", "count": 1}},
]
return list(assets_collection.aggregate(pipeline))
def aggregate_avg_confidence() -> list[dict]:
pipeline = [
{"$match": {"status": "completed"}},
{"$unwind": "$inference.labels"},
{
"$group": {
"_id": "$inference.labels.name",
"average_confidence": {"$avg": "$inference.labels.confidence"},
}
},
{"$sort": {"average_confidence": -1}},
{
"$project": {
"_id": 0,
"label": "$_id",
"average_confidence": {"$round": ["$average_confidence", 4]},
}
},
]
return list(assets_collection.aggregate(pipeline))
The MongoClient connects to your cluster, and ObjectId converts string IDs to MongoDB’s native type. Indices are created on fields most queried. CRUD functions handle document lifecycle, and the query_assets function builds a query dynamically. Aggregation functions compute label frequency and confidence metrics.
Pydantic models validate API inputs and outputs:
from pydantic import BaseModel
class Label(BaseModel):
name: str
confidence: float
class InferenceResult(BaseModel):
caption: str
labels: list[Label]
class UploadResponse(BaseModel):
asset_id: str
status: str
spaces_url: str
class AssetResponse(BaseModel):
id: str
filename: str
spaces_key: str
spaces_url: str
status: str
inference: InferenceResult | None = None
error: str | None = None
class AssetListResponse(BaseModel):
assets: list[AssetResponse]
total: int
skip: int
limit: int
class LabelCount(BaseModel):
label: str
count: int
class LabelConfidence(BaseModel):
label: str
average_confidence: float
These models ensure data integrity, with the database layer handling document operations and the response models enforcing consistent API responses.
Step 3: Wiring the Upload-to-Inference Pipeline with Background Tasks
The pipeline consists of three components: a module to call the inference API, a processing function to update MongoDB with results, and a FastAPI app to trigger these processes.
The inference module interacts with the API:
import base64
import json
import requests
from app.config import settings
GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions"
VISION_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct"
SYSTEM_PROMPT = """You are an image analysis assistant. Analyze the provided image and return a JSON object with exactly this structure:
{
"caption": "a one-sentence description of the image",
"labels": [
{"name": "object_name", "confidence": 0.95},
{"name": "another_object", "confidence": 0.82}
]
}
Rules:
- The caption should be a single, concise sentence describing the image content.
- The labels array should list every distinct object you can identify in the image.
- Confidence should be a float between 0.0 and 1.0 representing how certain you are the object is present.
- Use lowercase for all label names.
- Return ONLY the JSON object, no other text."""
def run_inference(image_bytes: bytes) -> dict:
b64_image = base64.b64encode(image_bytes).decode("utf-8")
response = requests.post(
GROQ_API_URL,
headers={
"Authorization": f"Bearer {settings.GROQ_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": VISION_MODEL,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": [
{"type": "text", "text": "Analyze this image."},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{b64_image}",
},
},
],
},
],
"response_format": {"type": "json_object"},
"temperature": 0,
},
timeout=60,
)
response.raise_for_status()
content = response.json()["choices"][0]["message"]["content"]
result = json.loads(content)
labels = []
for label in result.get("labels", []):
labels.append(
{
"name": label["name"],
"confidence": round(float(label["confidence"]), 4),
}
)
return {
"caption": result["caption"],
"labels": labels,
}
The function encodes the image and sends it to the API, parsing the response to extract structured data. The processing module integrates this with MongoDB:
import requests
from app.database import mark_asset_failed, update_asset_inference
from app.inference import run_inference
def process_asset(asset_id: str, spaces_url: str):
try:
response = requests.get(spaces_url, timeout=30)
response.raise_for_status()
image_bytes = response.content
inference_result = run_inference(image_bytes)
update_asset_inference(asset_id, inference_result)
except Exception as exc:
mark_asset_failed(asset_id, str(exc))
This function runs as a background task after an image upload, ensuring any failures are recorded with a status of "failed".
Finally, create a FastAPI application to manage this process:
from contextlib import asynccontextmanager
from fastapi import BackgroundTasks, FastAPI, HTTPException, UploadFile
from app.database import (
aggregate_avg_confidence,
aggregate_label_counts,
create_indexes,
get_asset,
insert_pending_asset,
query_assets,
)
from app.models import (
AssetListResponse,
AssetResponse,
LabelConfidence,
LabelCount,
UploadResponse,
)
from app.pipeline import process_asset
from app.spaces import upload_file
@asynccontextmanager
async def lifespan(app: FastAPI):
create_indexes()
yield
app = FastAPI(
title="Multi-Modal Insights API",
description="Upload images, run inference, and query structured results from MongoDB.",
lifespan=lifespan,
)
from fastapi import File
@app.post("/upload", response_model=UploadResponse)
async def upload_image(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
if not file.content_type or not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="File must be an image.")
file_bytes = await file.read()
result = upload_file(file_bytes, file.filename, file.content_type)
asset_id = insert_pending_asset(
filename=file.filename,
spaces_key=result["key"],
spaces_url=result["url"],
)
background_tasks.add_task(process_asset, asset_id, result["url"])
return UploadResponse(
asset_id=asset_id,
status="pending",
spaces_url=result["url"],
)
The POST /upload endpoint uses UploadFile to accept image uploads, validating them before uploading to cloud storage. A pending document is inserted into MongoDB, and the inference pipeline is scheduled using BackgroundTasks to run asynchronously.
Step 4: Building Query and Filtering Endpoints for Model-Generated Insights
With uploads and inference in place, the final step is to implement query endpoints to access the stored data.
Add these endpoints to your FastAPI application:
from fastapi import Query
@app.get("/assets", response_model=AssetListResponse)
async def list_assets(
label: str | None = None,
min_confidence: float | None = None,
status: str | None = None,
skip: int = Query(default=0, ge=0),
limit: int = Query(default=20, ge=1, le=100),
):
results, total = query_assets(
label=label,
min_confidence=min_confidence,
status=status,
skip=skip,
limit=limit,
)
assets = []
for doc in results:
assets.append(AssetResponse(**doc))
return AssetListResponse(assets=assets, total=total, skip=skip, limit=limit)
@app.get("/assets/{asset_id}", response_model=AssetResponse)
async def get_single_asset(asset_id: str):
doc = get_asset(asset_id)
if not doc:
raise HTTPException(status_code=404, detail="Asset not found.")
return AssetResponse(**doc)
@app.get("/insights/labels", response_model=list[LabelCount])
async def label_counts():
return aggregate_label_counts()
@app.get("/insights/confidence", response_model=list[LabelConfidence])
async def label_confidence():
return aggregate_avg_confidence()
These endpoints enable complex queries and aggregations, making the API data-rich and useful for analysis. The GET /assets endpoint allows filtering by label, confidence, and status, while aggregation endpoints provide insights on label frequency and confidence.
Step 5: Testing the Full Workflow
With the application complete, it's time to test the workflow using FastAPI’s interactive documentation.
Start the server:
uvicorn app.main:app --reload
Visit http://127.0.0.1:8000/docs to access the Swagger UI. This interface allows you to interact with all the endpoints and validate the complete application workflow.
Upload images, check asset statuses, query with filters, and run aggregation endpoints to explore the system's capabilities.
If any uploads fail, check the error field for troubleshooting information.
Conclusion
You now have a FastAPI application that efficiently uploads images, processes them with a vision model, and stores results in MongoDB. The API supports complex queries and aggregations, making it ideal for multimodal AI applications.
This setup is broadly applicable to AI pipelines, with MongoDB's dot notation enabling direct queries on nested fields, and $elemMatch maintaining query accuracy for compound conditions. Aggregation pipelines offload computation to the database, keeping the API streamlined.
To expand this project, consider deploying the API on a cloud platform and scaling the workflow.