Serving RAG as an API¶
Once your RAG pipeline works locally, the next step is exposing it as an HTTP API so other services or a frontend can call it.
This tutorial builds a minimal but production-ready FastAPI app.
Install¶
uv pip install fastapi uvicorn[standard] psycopg[binary] psycopg-pool openai python-dotenv
Minimal endpoint: POST /query¶
# app.py
from __future__ import annotations
import os
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI
from pydantic import BaseModel
from rag.retrieve import retrieve # your existing retrieve()
from rag.generate import answer_question # your existing answer_question()
load_dotenv()
class QueryRequest(BaseModel):
question: str
k: int = 8
class QueryResponse(BaseModel):
answer: str
sources: list[str]
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup
yield
# shutdown
app = FastAPI(lifespan=lifespan)
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/query", response_model=QueryResponse)
def query(req: QueryRequest):
chunks = retrieve(req.question, k=req.k)
answer = answer_question(req.question, chunks=chunks)
sources = list({c["source"] for c in chunks})
return QueryResponse(answer=answer, sources=sources)
Run locally:
uvicorn app:app --reload
Test:
curl -s -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"question": "What is SSO?"}' | python3 -m json.tool
Streaming responses¶
For long answers, stream tokens back to the client so the UI feels responsive:
# app.py (add this import and endpoint)
import json
from fastapi.responses import StreamingResponse
from openai import OpenAI
client = OpenAI()
SYSTEM_PROMPT = "You are a helpful assistant. Answer using ONLY the provided context."
def _stream_answer(question: str, chunks: list[dict]):
"""Yield SSE-formatted chunks."""
context = "\n\n".join(c["content"] for c in chunks)
user_content = f"Question:\n{question}\n\nContext:\n{context}"
stream = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
stream=True,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
)
for chunk in stream:
delta = chunk.choices[0].delta.content or ""
if delta:
yield f"data: {json.dumps({'token': delta})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/query/stream")
def query_stream(req: QueryRequest):
chunks = retrieve(req.question, k=req.k)
return StreamingResponse(
_stream_answer(req.question, chunks),
media_type="text/event-stream",
)
Connection pooling with psycopg_pool¶
Opening a new database connection per request is slow (~10–50 ms). Use a connection pool instead:
# app.py — updated with connection pool
import os
from contextlib import asynccontextmanager
import psycopg_pool
from fastapi import FastAPI
pool: psycopg_pool.ConnectionPool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = psycopg_pool.ConnectionPool(
conninfo=os.environ["DATABASE_URL"],
min_size=2,
max_size=10,
open=True,
)
yield
pool.close()
app = FastAPI(lifespan=lifespan)
Use the pool in your retrieve function:
from fastapi import Depends
def get_conn():
with pool.connection() as conn:
yield conn
@app.post("/query", response_model=QueryResponse)
def query(req: QueryRequest, conn=Depends(get_conn)):
chunks = retrieve(req.question, k=req.k, conn=conn)
answer = answer_question(req.question, chunks=chunks)
sources = list({c["source"] for c in chunks})
return QueryResponse(answer=answer, sources=sources)
Async version¶
For higher throughput, use psycopg async + openai async client:
# async_app.py
import os
from contextlib import asynccontextmanager
import psycopg_pool
from fastapi import FastAPI
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async_pool: psycopg_pool.AsyncConnectionPool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global async_pool
async_pool = psycopg_pool.AsyncConnectionPool(
conninfo=os.environ["DATABASE_URL"],
min_size=2,
max_size=20,
)
await async_pool.open()
yield
await async_pool.close()
app = FastAPI(lifespan=lifespan)
@app.post("/query", response_model=QueryResponse)
async def query(req: QueryRequest):
async with async_pool.connection() as conn:
chunks = await retrieve_async(req.question, k=req.k, conn=conn)
answer = await answer_question_async(req.question, chunks=chunks)
sources = list({c["source"] for c in chunks})
return QueryResponse(answer=answer, sources=sources)
Minimal Dockerfile¶
FROM python:3.11-slim
WORKDIR /app
# Install uv
RUN pip install uv
# Install dependencies
COPY requirements.txt .
RUN uv pip install --system -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Build and run:
docker build -t rag-api .
docker run -p 8000:8000 --env-file .env rag-api
Production checklist¶
- Rate limiting: use slowapi (starlette limiter)
- Request ID logging: add
X-Request-IDheader middleware for tracing - Health check: the
/healthendpoint above — hook it to your load balancer - Graceful shutdown:
psycopg_poolcloses cleanly in thelifespancontext - Secrets: load from
.envlocally; use environment injection in production (K8s secrets, ECS task def, etc.)
Request ID middleware example:
import uuid
from fastapi import Request
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
Next steps¶
- Track latency and costs in production: Monitoring & Observability
- Add caching to reduce cost: Caching for RAG