Skip to content

Monitoring & Observability for RAG

You can't improve what you can't measure.

In production, three signals matter most:

Signal Why it matters
Latency Users abandon slow interfaces; SLAs require targets
Cost LLM API calls are billed per token; surprises are expensive
Quality Silent regressions — answer quality drops without errors

This tutorial shows how to instrument each one.


Install

uv pip install openai psycopg[binary] python-dotenv

Structured JSON request logging

Log every request as a structured JSON record. This makes it easy to query in your log aggregator (Datadog, Grafana Loki, CloudWatch, etc.):

# rag/logging.py
from __future__ import annotations

import hashlib
import json
import logging
import sys
from datetime import datetime, timezone

# Set up a JSON logger
logger = logging.getLogger("rag")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)


def log_request(
    *,
    question: str,
    chunk_ids: list[int],
    latency_ms: float,
    prompt_tokens: int,
    completion_tokens: int,
    cache_hit: bool = False,
    model: str = "gpt-4o-mini",
) -> None:
    record = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "question_hash": hashlib.sha256(question.encode()).hexdigest()[:16],
        "chunk_ids": chunk_ids,
        "latency_ms": round(latency_ms, 1),
        "prompt_tokens": prompt_tokens,
        "completion_tokens": completion_tokens,
        "total_tokens": prompt_tokens + completion_tokens,
        "cache_hit": cache_hit,
        "model": model,
    }
    logger.info(json.dumps(record))

Output looks like:

{"ts":"2026-02-25T10:00:00Z","question_hash":"3f2a8b1c...","chunk_ids":[42,17,88],"latency_ms":512.3,"prompt_tokens":1840,"completion_tokens":312,"total_tokens":2152,"cache_hit":false,"model":"gpt-4o-mini"}

Stage-level latency timer

Break latency down by stage (embed, retrieve, generate) to find bottlenecks:

# rag/timer.py
from __future__ import annotations

import time
from contextlib import contextmanager
from typing import Iterator


class RequestTimer:
    """Collect per-stage latencies for a single request."""

    def __init__(self) -> None:
        self._stages: dict[str, float] = {}

    @contextmanager
    def stage(self, name: str) -> Iterator[None]:
        start = time.perf_counter()
        yield
        self._stages[name] = round((time.perf_counter() - start) * 1000, 1)

    def total_ms(self) -> float:
        return round(sum(self._stages.values()), 1)

    def to_dict(self) -> dict[str, float]:
        return {**self._stages, "total_ms": self.total_ms()}

Usage:

from rag.timer import RequestTimer
from rag.embed import embed_texts
from rag.retrieve import retrieve
from rag.generate import answer_question

def rag_query(question: str) -> dict:
    timer = RequestTimer()

    with timer.stage("embed"):
        query_embedding = embed_texts([question])[0]

    with timer.stage("retrieve"):
        chunks = retrieve(question, k=8)

    with timer.stage("generate"):
        result = answer_question(question, chunks=chunks)

    print(timer.to_dict())
    # → {"embed": 45.2, "retrieve": 12.8, "generate": 489.1, "total_ms": 547.1}

    return {"answer": result, "latency": timer.to_dict()}

Cost tracking

Track cost per request using token counts + pricing:

# rag/cost.py
from __future__ import annotations

# Prices per 1M tokens (as of early 2026 — check openai.com/pricing for updates)
_PRICES: dict[str, dict[str, float]] = {
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "gpt-4o":      {"input": 2.50, "output": 10.00},
    "text-embedding-3-small": {"input": 0.02, "output": 0.0},
}


def cost_usd(
    model: str,
    prompt_tokens: int,
    completion_tokens: int,
) -> float:
    """Return estimated cost in USD for a single API call."""
    prices = _PRICES.get(model, {"input": 0.0, "output": 0.0})
    return (
        prompt_tokens * prices["input"] / 1_000_000
        + completion_tokens * prices["output"] / 1_000_000
    )


# Example
c = cost_usd("gpt-4o-mini", prompt_tokens=1840, completion_tokens=312)
print(f"${c:.6f}")  # → $0.000463

rag_requests table in Postgres

For production analytics, write every request to a Postgres table. This lets you query "daily cost", "p99 latency", "cache hit rate", etc.:

CREATE TABLE rag_requests (
    id               BIGSERIAL PRIMARY KEY,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
    question_hash    TEXT NOT NULL,
    chunk_ids        INTEGER[],
    latency_ms       REAL NOT NULL,
    embed_ms         REAL,
    retrieve_ms      REAL,
    generate_ms      REAL,
    prompt_tokens    INTEGER,
    completion_tokens INTEGER,
    cost_usd         REAL,
    cache_hit        BOOLEAN NOT NULL DEFAULT false,
    model            TEXT
);

CREATE INDEX ON rag_requests (created_at);
import psycopg


def record_request(
    conn: psycopg.Connection,
    *,
    question_hash: str,
    chunk_ids: list[int],
    latency: dict[str, float],
    prompt_tokens: int,
    completion_tokens: int,
    cost: float,
    cache_hit: bool = False,
    model: str = "gpt-4o-mini",
) -> None:
    conn.execute(
        """
        INSERT INTO rag_requests (
            question_hash, chunk_ids,
            latency_ms, embed_ms, retrieve_ms, generate_ms,
            prompt_tokens, completion_tokens, cost_usd,
            cache_hit, model
        ) VALUES (%(qh)s, %(cids)s, %(lat)s, %(emb)s, %(ret)s, %(gen)s,
                  %(pt)s, %(ct)s, %(cost)s, %(ch)s, %(model)s)
        """,
        {
            "qh": question_hash,
            "cids": chunk_ids,
            "lat": latency.get("total_ms"),
            "emb": latency.get("embed"),
            "ret": latency.get("retrieve"),
            "gen": latency.get("generate"),
            "pt": prompt_tokens,
            "ct": completion_tokens,
            "cost": cost,
            "ch": cache_hit,
            "model": model,
        },
    )
    conn.commit()

Useful queries:

-- Daily cost
SELECT date_trunc('day', created_at) AS day, round(sum(cost_usd)::numeric, 4) AS cost
FROM rag_requests GROUP BY 1 ORDER BY 1 DESC;

-- p50/p95/p99 latency
SELECT
  percentile_cont(0.50) WITHIN GROUP (ORDER BY latency_ms) AS p50,
  percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms) AS p95,
  percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99
FROM rag_requests
WHERE created_at > now() - interval '24 hours';

-- Cache hit rate
SELECT round(avg(cache_hit::int)::numeric, 3) AS hit_rate
FROM rag_requests WHERE created_at > now() - interval '1 hour';

Implicit quality signals

When you can't run LLM judges on every request, watch these proxy metrics:

Signal How to measure What it indicates
"I don't know" rate Count responses containing "I don't know based on" Retrieval is missing relevant chunks
Answer length distribution Histogram of completion_tokens Very short answers often mean poor context
Empty chunk IDs Requests where chunk_ids = [] Retrieval completely failed
Latency spikes p99 > 3× median Model or DB issues

Log all of these in rag_requests and set up alerts on anomalies.


Going further: OpenTelemetry and Langfuse

For distributed tracing across services, add OpenTelemetry:

uv pip install opentelemetry-sdk opentelemetry-exporter-otlp

For a purpose-built RAG observability platform, Langfuse provides tracing, cost tracking, and a prompt management UI with minimal instrumentation overhead.


Next steps