title: Build a Complete RAG with Postgres (pgvector) + OpenAI description: End-to-end tutorial: ingest docs, chunk, embed, store in pgvector, retrieve, and answer with citations.
Build a Complete RAG with Postgres (pgvector) + OpenAI¶
In the earlier tutorial, you built a minimal RAG to understand the core mechanics.
Now we’ll build a complete, practical RAG system:
- Ingest documents (
.txt/.md) - Chunk them (token-aware)
- Generate embeddings
- Store chunks + embeddings in Postgres + pgvector
- Retrieve top‑K chunks for a query
- Generate a grounded answer with citations
This tutorial is framework-neutral (plain Python + SQL), with a production-friendly default stack.
Prerequisites¶
- Python 3.11+
- Docker
- An OpenAI API key set as an environment variable:
OPENAI_API_KEY
Install Python deps:
pip install openai psycopg[binary] tiktoken
1) Start Postgres + pgvector (Docker)¶
docker run --name buildrag-postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=buildrag \
-p 5432:5432 \
-d pgvector/pgvector:pg16
Set your database URL:
export DATABASE_URL="postgresql://postgres:postgres@localhost:5432/buildrag"
On Windows (PowerShell):
$env:DATABASE_URL="postgresql://postgres:postgres@localhost:5432/buildrag"
2) Create schema + indexes¶
Connect with any SQL client and run:
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS documents (
id UUID PRIMARY KEY,
source TEXT NOT NULL,
title TEXT,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS chunks (
id BIGSERIAL PRIMARY KEY,
doc_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INT NOT NULL,
section_path TEXT,
content TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
embedding vector(1536) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', content)) STORED
);
CREATE INDEX IF NOT EXISTS chunks_doc_id_idx ON chunks (doc_id);
CREATE INDEX IF NOT EXISTS chunks_metadata_gin_idx ON chunks USING GIN (metadata);
CREATE INDEX IF NOT EXISTS chunks_tsv_gin_idx ON chunks USING GIN (tsv);
-- Fast cosine-distance ANN index (recommended default)
CREATE INDEX IF NOT EXISTS chunks_embedding_hnsw_idx
ON chunks
USING hnsw (embedding vector_cosine_ops);
Note
vector(1536) matches many embedding models. If you change embedding models, update this dimension.
3) Prepare documents to ingest¶
Create a folder (anywhere) with .txt / .md files, for example:
mkdir -p ./documents
echo "# Account\n\nTo reset your password, click 'Forgot password' on the login page." > ./documents/account.md
echo "Pricing:\nEnterprise supports SSO.\nTeam does not." > ./documents/pricing.txt
Want PDFs or webpages? - PDFs: Parsing PDF Documents - Web pages: Parsing Webpages
4) Ingest: chunk + embed + store¶
This script:
- reads files from ./documents
- splits into structure-aware, token-sized chunks
- embeds each chunk
- inserts into Postgres
from __future__ import annotations
import json
import os
import re
import uuid
from pathlib import Path
import psycopg
import tiktoken
from openai import OpenAI
EMBED_MODEL = "text-embedding-3-small"
ENCODING_NAME = "o200k_base"
client = OpenAI()
HEADING_RE = re.compile(r"^(#{1,6})\\s+(.*)\\s*$")
def vector_to_pg(vec: list[float]) -> str:
# pgvector text format: [1,2,3]
return "[" + ",".join(f"{x:.6f}" for x in vec) + "]"
def chunk_by_tokens(
text: str,
*,
max_tokens: int = 300,
overlap_tokens: int = 50,
encoding_name: str = ENCODING_NAME,
) -> list[str]:
enc = tiktoken.get_encoding(encoding_name)
tokens = enc.encode(text)
chunks: list[str] = []
start = 0
while start < len(tokens):
end = min(start + max_tokens, len(tokens))
chunks.append(enc.decode(tokens[start:end]))
if end >= len(tokens):
break
start = max(0, end - overlap_tokens)
return chunks
def split_markdown_sections(md: str) -> list[tuple[str, str]]:
sections: list[tuple[str, str]] = []
heading_stack: list[str] = []
current_lines: list[str] = []
current_path = "Document"
def flush() -> None:
nonlocal current_lines
text = "\n".join(current_lines).strip()
if text:
sections.append((current_path, text))
current_lines = []
for line in md.splitlines():
match = HEADING_RE.match(line)
if match:
flush()
level = len(match.group(1))
title = match.group(2).strip()
heading_stack[:] = heading_stack[: max(0, level - 1)]
heading_stack.append(title)
current_path = " > ".join(heading_stack)
current_lines.append(line)
else:
current_lines.append(line)
flush()
return sections
def chunk_document(text: str, *, is_markdown: bool) -> list[dict]:
chunks: list[dict] = []
if is_markdown:
sections = split_markdown_sections(text)
else:
sections = [("Document", text)]
for section_path, section_text in sections:
for chunk in chunk_by_tokens(section_text):
chunks.append({"section_path": section_path, "text": chunk})
return chunks
def embed_texts(texts: list[str]) -> list[list[float]]:
resp = client.embeddings.create(model=EMBED_MODEL, input=texts)
return [item.embedding for item in resp.data]
def ingest_directory(directory: str) -> None:
db_url = os.environ["DATABASE_URL"]
root = Path(directory)
files = sorted([p for p in root.glob("*") if p.suffix.lower() in {".md", ".txt"}])
if not files:
raise RuntimeError(f"No .md/.txt files found in {root.resolve()}")
with psycopg.connect(db_url) as conn:
with conn.cursor() as cur:
for path in files:
source = str(path)
text = path.read_text(encoding="utf-8", errors="replace")
is_markdown = path.suffix.lower() == ".md"
doc_id = uuid.uuid4()
cur.execute(
"INSERT INTO documents (id, source, title, metadata) VALUES (%s, %s, %s, %s::jsonb)",
(doc_id, source, path.stem, json.dumps({"filetype": path.suffix.lower()})),
)
chunk_objs = chunk_document(text, is_markdown=is_markdown)
chunk_texts = [c["text"] for c in chunk_objs]
embeddings = embed_texts(chunk_texts)
for idx, (c, emb) in enumerate(zip(chunk_objs, embeddings)):
metadata = {"source": source, "chunker": "token_300_50", "embedding_model": EMBED_MODEL}
cur.execute(
"""
INSERT INTO chunks (doc_id, chunk_index, section_path, content, metadata, embedding)
VALUES (%s, %s, %s, %s, %s::jsonb, %s::vector)
""",
(
doc_id,
idx,
c["section_path"],
c["text"],
json.dumps(metadata),
vector_to_pg(emb),
),
)
print(f"Ingested {path.name}: {len(chunk_objs)} chunks")
conn.commit()
if __name__ == "__main__":
ingest_directory("./documents")
5) Retrieve top‑K chunks for a question¶
from __future__ import annotations
import os
import psycopg
from openai import OpenAI
client = OpenAI()
EMBED_MODEL = "text-embedding-3-small"
def vector_to_pg(vec: list[float]) -> str:
return "[" + ",".join(f"{x:.6f}" for x in vec) + "]"
def embed_query(q: str) -> list[float]:
resp = client.embeddings.create(model=EMBED_MODEL, input=q)
return resp.data[0].embedding
def retrieve(q: str, *, k: int = 8) -> list[dict]:
db_url = os.environ["DATABASE_URL"]
q_vec = embed_query(q)
with psycopg.connect(db_url) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
chunks.id,
documents.source,
chunks.section_path,
chunks.content
FROM chunks
JOIN documents ON documents.id = chunks.doc_id
ORDER BY chunks.embedding <=> %s::vector
LIMIT %s
""",
(vector_to_pg(q_vec), k),
)
rows = cur.fetchall()
return [
{"id": r[0], "source": r[1], "section_path": r[2], "content": r[3]}
for r in rows
]
6) Answer with citations (grounded generation)¶
from __future__ import annotations
from openai import OpenAI
client = OpenAI()
SYSTEM_PROMPT = """You are a helpful assistant. Answer the user's question using ONLY the provided context.
Rules:
- If the context does not contain the answer, say: "I don't know based on the provided context."
- Do not use outside knowledge.
- Cite sources in this format: [source: {source}#chunk:{chunk_id}]
"""
def format_context(chunks: list[dict]) -> str:
parts: list[str] = ["--- BEGIN CONTEXT ---"]
for c in chunks:
section = c.get("section_path") or ""
section_part = f' section="{section}"' if section else ""
parts.append(
f'\n[chunk_id={c["id"]} source="{c["source"]}"{section_part}]\n{c["content"]}'
)
parts.append("\n--- END CONTEXT ---")
return "\n".join(parts)
def answer(question: str, *, chunks: list[dict]) -> str:
user_content = f"Question:\n{question}\n\nContext:\n{format_context(chunks)}"
resp = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
)
return resp.choices[0].message.content or ""
Debugging checklist¶
If answers are wrong, debug in this order:
1) Inspect retrieved chunks (are they relevant?)
2) Fix parsing/boilerplate (web pages are common offenders)
3) Fix chunking (size/overlap/section headings)
4) Add metadata filters (product/version/source)
5) Improve retrieval (per-doc caps, hybrid, reranking)
6) Tighten the prompt and citation rules
What’s next¶
- Improve chunking choices: Chunking Strategies
- Improve retrieval quality: Retrieval Strategies
- Add hybrid retrieval and SQL-first patterns: SQL for RAG