Skip to content

title: Build a Complete RAG with Postgres (pgvector) + OpenAI description: End-to-end tutorial: ingest docs, chunk, embed, store in pgvector, retrieve, and answer with citations.


Build a Complete RAG with Postgres (pgvector) + OpenAI

In the earlier tutorial, you built a minimal RAG to understand the core mechanics.

Now we’ll build a complete, practical RAG system:

  1. Ingest documents (.txt / .md)
  2. Chunk them (token-aware)
  3. Generate embeddings
  4. Store chunks + embeddings in Postgres + pgvector
  5. Retrieve top‑K chunks for a query
  6. Generate a grounded answer with citations

This tutorial is framework-neutral (plain Python + SQL), with a production-friendly default stack.


Prerequisites

  • Python 3.11+
  • Docker
  • An OpenAI API key set as an environment variable: OPENAI_API_KEY

Install Python deps:

pip install openai psycopg[binary] tiktoken

1) Start Postgres + pgvector (Docker)

docker run --name buildrag-postgres \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_DB=buildrag \
  -p 5432:5432 \
  -d pgvector/pgvector:pg16

Set your database URL:

export DATABASE_URL="postgresql://postgres:postgres@localhost:5432/buildrag"

On Windows (PowerShell):

$env:DATABASE_URL="postgresql://postgres:postgres@localhost:5432/buildrag"

2) Create schema + indexes

Connect with any SQL client and run:

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE IF NOT EXISTS documents (
  id UUID PRIMARY KEY,
  source TEXT NOT NULL,
  title TEXT,
  metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE IF NOT EXISTS chunks (
  id BIGSERIAL PRIMARY KEY,
  doc_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
  chunk_index INT NOT NULL,
  section_path TEXT,
  content TEXT NOT NULL,
  metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
  embedding vector(1536) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', content)) STORED
);

CREATE INDEX IF NOT EXISTS chunks_doc_id_idx ON chunks (doc_id);
CREATE INDEX IF NOT EXISTS chunks_metadata_gin_idx ON chunks USING GIN (metadata);
CREATE INDEX IF NOT EXISTS chunks_tsv_gin_idx ON chunks USING GIN (tsv);

-- Fast cosine-distance ANN index (recommended default)
CREATE INDEX IF NOT EXISTS chunks_embedding_hnsw_idx
  ON chunks
  USING hnsw (embedding vector_cosine_ops);

Note

vector(1536) matches many embedding models. If you change embedding models, update this dimension.


3) Prepare documents to ingest

Create a folder (anywhere) with .txt / .md files, for example:

mkdir -p ./documents
echo "# Account\n\nTo reset your password, click 'Forgot password' on the login page." > ./documents/account.md
echo "Pricing:\nEnterprise supports SSO.\nTeam does not." > ./documents/pricing.txt

Want PDFs or webpages? - PDFs: Parsing PDF Documents - Web pages: Parsing Webpages


4) Ingest: chunk + embed + store

This script: - reads files from ./documents - splits into structure-aware, token-sized chunks - embeds each chunk - inserts into Postgres

from __future__ import annotations

import json
import os
import re
import uuid
from pathlib import Path

import psycopg
import tiktoken
from openai import OpenAI


EMBED_MODEL = "text-embedding-3-small"
ENCODING_NAME = "o200k_base"

client = OpenAI()

HEADING_RE = re.compile(r"^(#{1,6})\\s+(.*)\\s*$")


def vector_to_pg(vec: list[float]) -> str:
    # pgvector text format: [1,2,3]
    return "[" + ",".join(f"{x:.6f}" for x in vec) + "]"


def chunk_by_tokens(
    text: str,
    *,
    max_tokens: int = 300,
    overlap_tokens: int = 50,
    encoding_name: str = ENCODING_NAME,
) -> list[str]:
    enc = tiktoken.get_encoding(encoding_name)
    tokens = enc.encode(text)

    chunks: list[str] = []
    start = 0
    while start < len(tokens):
        end = min(start + max_tokens, len(tokens))
        chunks.append(enc.decode(tokens[start:end]))
        if end >= len(tokens):
            break
        start = max(0, end - overlap_tokens)
    return chunks


def split_markdown_sections(md: str) -> list[tuple[str, str]]:
    sections: list[tuple[str, str]] = []
    heading_stack: list[str] = []
    current_lines: list[str] = []
    current_path = "Document"

    def flush() -> None:
        nonlocal current_lines
        text = "\n".join(current_lines).strip()
        if text:
            sections.append((current_path, text))
        current_lines = []

    for line in md.splitlines():
        match = HEADING_RE.match(line)
        if match:
            flush()
            level = len(match.group(1))
            title = match.group(2).strip()
            heading_stack[:] = heading_stack[: max(0, level - 1)]
            heading_stack.append(title)
            current_path = " > ".join(heading_stack)
            current_lines.append(line)
        else:
            current_lines.append(line)

    flush()
    return sections


def chunk_document(text: str, *, is_markdown: bool) -> list[dict]:
    chunks: list[dict] = []
    if is_markdown:
        sections = split_markdown_sections(text)
    else:
        sections = [("Document", text)]

    for section_path, section_text in sections:
        for chunk in chunk_by_tokens(section_text):
            chunks.append({"section_path": section_path, "text": chunk})
    return chunks


def embed_texts(texts: list[str]) -> list[list[float]]:
    resp = client.embeddings.create(model=EMBED_MODEL, input=texts)
    return [item.embedding for item in resp.data]


def ingest_directory(directory: str) -> None:
    db_url = os.environ["DATABASE_URL"]
    root = Path(directory)
    files = sorted([p for p in root.glob("*") if p.suffix.lower() in {".md", ".txt"}])

    if not files:
        raise RuntimeError(f"No .md/.txt files found in {root.resolve()}")

    with psycopg.connect(db_url) as conn:
        with conn.cursor() as cur:
            for path in files:
                source = str(path)
                text = path.read_text(encoding="utf-8", errors="replace")
                is_markdown = path.suffix.lower() == ".md"

                doc_id = uuid.uuid4()
                cur.execute(
                    "INSERT INTO documents (id, source, title, metadata) VALUES (%s, %s, %s, %s::jsonb)",
                    (doc_id, source, path.stem, json.dumps({"filetype": path.suffix.lower()})),
                )

                chunk_objs = chunk_document(text, is_markdown=is_markdown)
                chunk_texts = [c["text"] for c in chunk_objs]

                embeddings = embed_texts(chunk_texts)

                for idx, (c, emb) in enumerate(zip(chunk_objs, embeddings)):
                    metadata = {"source": source, "chunker": "token_300_50", "embedding_model": EMBED_MODEL}
                    cur.execute(
                        """
                        INSERT INTO chunks (doc_id, chunk_index, section_path, content, metadata, embedding)
                        VALUES (%s, %s, %s, %s, %s::jsonb, %s::vector)
                        """,
                        (
                            doc_id,
                            idx,
                            c["section_path"],
                            c["text"],
                            json.dumps(metadata),
                            vector_to_pg(emb),
                        ),
                    )

                print(f"Ingested {path.name}: {len(chunk_objs)} chunks")

        conn.commit()


if __name__ == "__main__":
    ingest_directory("./documents")

5) Retrieve top‑K chunks for a question

from __future__ import annotations

import os
import psycopg
from openai import OpenAI


client = OpenAI()
EMBED_MODEL = "text-embedding-3-small"


def vector_to_pg(vec: list[float]) -> str:
    return "[" + ",".join(f"{x:.6f}" for x in vec) + "]"


def embed_query(q: str) -> list[float]:
    resp = client.embeddings.create(model=EMBED_MODEL, input=q)
    return resp.data[0].embedding


def retrieve(q: str, *, k: int = 8) -> list[dict]:
    db_url = os.environ["DATABASE_URL"]
    q_vec = embed_query(q)

    with psycopg.connect(db_url) as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT
                  chunks.id,
                  documents.source,
                  chunks.section_path,
                  chunks.content
                FROM chunks
                JOIN documents ON documents.id = chunks.doc_id
                ORDER BY chunks.embedding <=> %s::vector
                LIMIT %s
                """,
                (vector_to_pg(q_vec), k),
            )
            rows = cur.fetchall()

    return [
        {"id": r[0], "source": r[1], "section_path": r[2], "content": r[3]}
        for r in rows
    ]

6) Answer with citations (grounded generation)

from __future__ import annotations

from openai import OpenAI


client = OpenAI()

SYSTEM_PROMPT = """You are a helpful assistant. Answer the user's question using ONLY the provided context.

Rules:
- If the context does not contain the answer, say: "I don't know based on the provided context."
- Do not use outside knowledge.
- Cite sources in this format: [source: {source}#chunk:{chunk_id}]
"""


def format_context(chunks: list[dict]) -> str:
    parts: list[str] = ["--- BEGIN CONTEXT ---"]
    for c in chunks:
        section = c.get("section_path") or ""
        section_part = f' section="{section}"' if section else ""
        parts.append(
            f'\n[chunk_id={c["id"]} source="{c["source"]}"{section_part}]\n{c["content"]}'
        )
    parts.append("\n--- END CONTEXT ---")
    return "\n".join(parts)


def answer(question: str, *, chunks: list[dict]) -> str:
    user_content = f"Question:\n{question}\n\nContext:\n{format_context(chunks)}"
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        temperature=0,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_content},
        ],
    )
    return resp.choices[0].message.content or ""

Debugging checklist

If answers are wrong, debug in this order:

1) Inspect retrieved chunks (are they relevant?)
2) Fix parsing/boilerplate (web pages are common offenders)
3) Fix chunking (size/overlap/section headings)
4) Add metadata filters (product/version/source)
5) Improve retrieval (per-doc caps, hybrid, reranking)
6) Tighten the prompt and citation rules


What’s next