Monitoring & Observability for RAG¶
You can't improve what you can't measure.
In production, three signals matter most:
| Signal | Why it matters |
|---|---|
| Latency | Users abandon slow interfaces; SLAs require targets |
| Cost | LLM API calls are billed per token; surprises are expensive |
| Quality | Silent regressions — answer quality drops without errors |
This tutorial shows how to instrument each one.
Install¶
uv pip install openai psycopg[binary] python-dotenv
Structured JSON request logging¶
Log every request as a structured JSON record. This makes it easy to query in your log aggregator (Datadog, Grafana Loki, CloudWatch, etc.):
# rag/logging.py
from __future__ import annotations
import hashlib
import json
import logging
import sys
from datetime import datetime, timezone
# Set up a JSON logger
logger = logging.getLogger("rag")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def log_request(
*,
question: str,
chunk_ids: list[int],
latency_ms: float,
prompt_tokens: int,
completion_tokens: int,
cache_hit: bool = False,
model: str = "gpt-4o-mini",
) -> None:
record = {
"ts": datetime.now(timezone.utc).isoformat(),
"question_hash": hashlib.sha256(question.encode()).hexdigest()[:16],
"chunk_ids": chunk_ids,
"latency_ms": round(latency_ms, 1),
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
"cache_hit": cache_hit,
"model": model,
}
logger.info(json.dumps(record))
Output looks like:
{"ts":"2026-02-25T10:00:00Z","question_hash":"3f2a8b1c...","chunk_ids":[42,17,88],"latency_ms":512.3,"prompt_tokens":1840,"completion_tokens":312,"total_tokens":2152,"cache_hit":false,"model":"gpt-4o-mini"}
Stage-level latency timer¶
Break latency down by stage (embed, retrieve, generate) to find bottlenecks:
# rag/timer.py
from __future__ import annotations
import time
from contextlib import contextmanager
from typing import Iterator
class RequestTimer:
"""Collect per-stage latencies for a single request."""
def __init__(self) -> None:
self._stages: dict[str, float] = {}
@contextmanager
def stage(self, name: str) -> Iterator[None]:
start = time.perf_counter()
yield
self._stages[name] = round((time.perf_counter() - start) * 1000, 1)
def total_ms(self) -> float:
return round(sum(self._stages.values()), 1)
def to_dict(self) -> dict[str, float]:
return {**self._stages, "total_ms": self.total_ms()}
Usage:
from rag.timer import RequestTimer
from rag.embed import embed_texts
from rag.retrieve import retrieve
from rag.generate import answer_question
def rag_query(question: str) -> dict:
timer = RequestTimer()
with timer.stage("embed"):
query_embedding = embed_texts([question])[0]
with timer.stage("retrieve"):
chunks = retrieve(question, k=8)
with timer.stage("generate"):
result = answer_question(question, chunks=chunks)
print(timer.to_dict())
# → {"embed": 45.2, "retrieve": 12.8, "generate": 489.1, "total_ms": 547.1}
return {"answer": result, "latency": timer.to_dict()}
Cost tracking¶
Track cost per request using token counts + pricing:
# rag/cost.py
from __future__ import annotations
# Prices per 1M tokens (as of early 2026 — check openai.com/pricing for updates)
_PRICES: dict[str, dict[str, float]] = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o": {"input": 2.50, "output": 10.00},
"text-embedding-3-small": {"input": 0.02, "output": 0.0},
}
def cost_usd(
model: str,
prompt_tokens: int,
completion_tokens: int,
) -> float:
"""Return estimated cost in USD for a single API call."""
prices = _PRICES.get(model, {"input": 0.0, "output": 0.0})
return (
prompt_tokens * prices["input"] / 1_000_000
+ completion_tokens * prices["output"] / 1_000_000
)
# Example
c = cost_usd("gpt-4o-mini", prompt_tokens=1840, completion_tokens=312)
print(f"${c:.6f}") # → $0.000463
rag_requests table in Postgres¶
For production analytics, write every request to a Postgres table. This lets you query "daily cost", "p99 latency", "cache hit rate", etc.:
CREATE TABLE rag_requests (
id BIGSERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
question_hash TEXT NOT NULL,
chunk_ids INTEGER[],
latency_ms REAL NOT NULL,
embed_ms REAL,
retrieve_ms REAL,
generate_ms REAL,
prompt_tokens INTEGER,
completion_tokens INTEGER,
cost_usd REAL,
cache_hit BOOLEAN NOT NULL DEFAULT false,
model TEXT
);
CREATE INDEX ON rag_requests (created_at);
import psycopg
def record_request(
conn: psycopg.Connection,
*,
question_hash: str,
chunk_ids: list[int],
latency: dict[str, float],
prompt_tokens: int,
completion_tokens: int,
cost: float,
cache_hit: bool = False,
model: str = "gpt-4o-mini",
) -> None:
conn.execute(
"""
INSERT INTO rag_requests (
question_hash, chunk_ids,
latency_ms, embed_ms, retrieve_ms, generate_ms,
prompt_tokens, completion_tokens, cost_usd,
cache_hit, model
) VALUES (%(qh)s, %(cids)s, %(lat)s, %(emb)s, %(ret)s, %(gen)s,
%(pt)s, %(ct)s, %(cost)s, %(ch)s, %(model)s)
""",
{
"qh": question_hash,
"cids": chunk_ids,
"lat": latency.get("total_ms"),
"emb": latency.get("embed"),
"ret": latency.get("retrieve"),
"gen": latency.get("generate"),
"pt": prompt_tokens,
"ct": completion_tokens,
"cost": cost,
"ch": cache_hit,
"model": model,
},
)
conn.commit()
Useful queries:
-- Daily cost
SELECT date_trunc('day', created_at) AS day, round(sum(cost_usd)::numeric, 4) AS cost
FROM rag_requests GROUP BY 1 ORDER BY 1 DESC;
-- p50/p95/p99 latency
SELECT
percentile_cont(0.50) WITHIN GROUP (ORDER BY latency_ms) AS p50,
percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms) AS p95,
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99
FROM rag_requests
WHERE created_at > now() - interval '24 hours';
-- Cache hit rate
SELECT round(avg(cache_hit::int)::numeric, 3) AS hit_rate
FROM rag_requests WHERE created_at > now() - interval '1 hour';
Implicit quality signals¶
When you can't run LLM judges on every request, watch these proxy metrics:
| Signal | How to measure | What it indicates |
|---|---|---|
| "I don't know" rate | Count responses containing "I don't know based on" | Retrieval is missing relevant chunks |
| Answer length distribution | Histogram of completion_tokens |
Very short answers often mean poor context |
| Empty chunk IDs | Requests where chunk_ids = [] |
Retrieval completely failed |
| Latency spikes | p99 > 3× median | Model or DB issues |
Log all of these in rag_requests and set up alerts on anomalies.
Going further: OpenTelemetry and Langfuse¶
For distributed tracing across services, add OpenTelemetry:
uv pip install opentelemetry-sdk opentelemetry-exporter-otlp
For a purpose-built RAG observability platform, Langfuse provides tracing, cost tracking, and a prompt management UI with minimal instrumentation overhead.
Next steps¶
- Add automated quality gates in CI: Testing RAG Components
- Reduce costs by caching repeated queries: Caching for RAG