Skip to content

Serving RAG as an API

Once your RAG pipeline works locally, the next step is exposing it as an HTTP API so other services or a frontend can call it.

This tutorial builds a minimal but production-ready FastAPI app.


Install

uv pip install fastapi uvicorn[standard] psycopg[binary] psycopg-pool openai python-dotenv

Minimal endpoint: POST /query

# app.py
from __future__ import annotations

import os
from contextlib import asynccontextmanager

from dotenv import load_dotenv
from fastapi import FastAPI
from pydantic import BaseModel

from rag.retrieve import retrieve        # your existing retrieve()
from rag.generate import answer_question  # your existing answer_question()

load_dotenv()


class QueryRequest(BaseModel):
    question: str
    k: int = 8


class QueryResponse(BaseModel):
    answer: str
    sources: list[str]


@asynccontextmanager
async def lifespan(app: FastAPI):
    # startup
    yield
    # shutdown


app = FastAPI(lifespan=lifespan)


@app.get("/health")
def health():
    return {"status": "ok"}


@app.post("/query", response_model=QueryResponse)
def query(req: QueryRequest):
    chunks = retrieve(req.question, k=req.k)
    answer = answer_question(req.question, chunks=chunks)
    sources = list({c["source"] for c in chunks})
    return QueryResponse(answer=answer, sources=sources)

Run locally:

uvicorn app:app --reload

Test:

curl -s -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -d '{"question": "What is SSO?"}' | python3 -m json.tool

Streaming responses

For long answers, stream tokens back to the client so the UI feels responsive:

# app.py (add this import and endpoint)
import json
from fastapi.responses import StreamingResponse
from openai import OpenAI

client = OpenAI()
SYSTEM_PROMPT = "You are a helpful assistant. Answer using ONLY the provided context."


def _stream_answer(question: str, chunks: list[dict]):
    """Yield SSE-formatted chunks."""
    context = "\n\n".join(c["content"] for c in chunks)
    user_content = f"Question:\n{question}\n\nContext:\n{context}"

    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        temperature=0,
        stream=True,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_content},
        ],
    )
    for chunk in stream:
        delta = chunk.choices[0].delta.content or ""
        if delta:
            yield f"data: {json.dumps({'token': delta})}\n\n"
    yield "data: [DONE]\n\n"


@app.post("/query/stream")
def query_stream(req: QueryRequest):
    chunks = retrieve(req.question, k=req.k)
    return StreamingResponse(
        _stream_answer(req.question, chunks),
        media_type="text/event-stream",
    )

Connection pooling with psycopg_pool

Opening a new database connection per request is slow (~10–50 ms). Use a connection pool instead:

# app.py — updated with connection pool
import os
from contextlib import asynccontextmanager

import psycopg_pool
from fastapi import FastAPI

pool: psycopg_pool.ConnectionPool | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool
    pool = psycopg_pool.ConnectionPool(
        conninfo=os.environ["DATABASE_URL"],
        min_size=2,
        max_size=10,
        open=True,
    )
    yield
    pool.close()


app = FastAPI(lifespan=lifespan)

Use the pool in your retrieve function:

from fastapi import Depends


def get_conn():
    with pool.connection() as conn:
        yield conn


@app.post("/query", response_model=QueryResponse)
def query(req: QueryRequest, conn=Depends(get_conn)):
    chunks = retrieve(req.question, k=req.k, conn=conn)
    answer = answer_question(req.question, chunks=chunks)
    sources = list({c["source"] for c in chunks})
    return QueryResponse(answer=answer, sources=sources)

Async version

For higher throughput, use psycopg async + openai async client:

# async_app.py
import os
from contextlib import asynccontextmanager

import psycopg_pool
from fastapi import FastAPI
from openai import AsyncOpenAI

async_client = AsyncOpenAI()
async_pool: psycopg_pool.AsyncConnectionPool | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global async_pool
    async_pool = psycopg_pool.AsyncConnectionPool(
        conninfo=os.environ["DATABASE_URL"],
        min_size=2,
        max_size=20,
    )
    await async_pool.open()
    yield
    await async_pool.close()


app = FastAPI(lifespan=lifespan)


@app.post("/query", response_model=QueryResponse)
async def query(req: QueryRequest):
    async with async_pool.connection() as conn:
        chunks = await retrieve_async(req.question, k=req.k, conn=conn)

    answer = await answer_question_async(req.question, chunks=chunks)
    sources = list({c["source"] for c in chunks})
    return QueryResponse(answer=answer, sources=sources)

Minimal Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install uv
RUN pip install uv

# Install dependencies
COPY requirements.txt .
RUN uv pip install --system -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Build and run:

docker build -t rag-api .
docker run -p 8000:8000 --env-file .env rag-api

Production checklist

  • Rate limiting: use slowapi (starlette limiter)
  • Request ID logging: add X-Request-ID header middleware for tracing
  • Health check: the /health endpoint above — hook it to your load balancer
  • Graceful shutdown: psycopg_pool closes cleanly in the lifespan context
  • Secrets: load from .env locally; use environment injection in production (K8s secrets, ECS task def, etc.)

Request ID middleware example:

import uuid
from fastapi import Request

@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

Next steps