侧边栏壁纸
  • 累计撰写 57 篇文章
  • 累计创建 5 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

pgvector向量库学习教程

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

pgvector向量库学习教程

适用人群:后端开发、大模型应用开发、检索工程师
前置基础:掌握PostgreSQL基础增删改查、了解向量/嵌入、余弦相似度基础概念
学习周期:5天(理论+SQL实操+Python代码实操+生产调优)
pgvector版本:v0.8.2(2025年最新稳定版)
PostgreSQL版本:12-17(全版本适配)


教程总览

本教程从零基础到生产级调优,系统覆盖 pgvector 向量库的全部核心知识。聚焦国内大模型落地主流用法,剔除无效概念,每一章都配有完整的SQL示例、Python代码和实操指南。

阶段 主题 核心内容
第一阶段 前置认知与环境搭建 向量数据库概念、pgvector定位、距离算法原理、多环境部署、客户端连接
第二阶段 基础语法与向量CRUD 四种向量数据类型、建表/插入/查询/更新/删除、相似度检索、混合查询
第三阶段 向量索引原理与实操 HNSW/IVFFlat索引原理、参数调优、索引选型决策
第四阶段 Python工程化与RAG案例 psycopg对接、LangChain/LlamaIndex集成、端到端RAG实战
第五阶段 生产调优与运维 内核参数调优、冷热存储、监控备份、高频坑点规避
第六阶段 高阶拓展与选型 混合检索、分布式方案、向量库横向对比、未来展望

第一阶段:前置认知与环境搭建

1.1 基础概念扫盲

1.1.1 向量数据库是什么?为什么需要向量数据库?

在人工智能与大模型时代,数据的核心形态正在发生深刻变革——非结构化数据(文本、图片、音频、视频)占据了企业数据的 80% 以上。传统关系型数据库擅长处理结构化数据(精确匹配、范围查询、聚合统计),但面对"这段文字和那段文字语义上相似吗?"这类问题,传统数据库几乎无能为力。

向量数据库的本质,是将非结构化数据经过 Embedding 模型转化为高维浮点数数组(即向量),然后在这个高维向量空间中执行"近邻搜索"(Approximate Nearest Neighbor, ANN),从而实现语义级别的检索。

举个直观的例子:假设你有 100 万篇文档,用户输入一句"如何提升客户满意度?",你希望找出语义最相关的 10 篇文档。传统做法是用关键词搜索(LIKE ‘%客户%’ OR LIKE ‘%满意度%’),但这样会漏掉"怎样让用户更开心"这类语义相同但措辞完全不同的内容。向量数据库的做法是:把每篇文档和用户的查询都转化成向量(比如 1536 维的浮点数组),然后在高维空间中计算距离——距离越近,语义越相似。

为什么不能直接在传统数据库里做? 理论上你可以在 PostgreSQL 里存一个 float8[] 数组,然后手写欧氏距离的计算公式。但问题在于:

  • 性能灾难:对 100 万条 1536 维向量做暴力扫描(brute-force),每次查询需要计算 100 万次距离,每次距离计算涉及 1536 次浮点乘加运算,查询延迟可能高达数秒甚至数十秒。
  • 无索引支持:传统 B-Tree 索引无法有效加速高维向量的近邻搜索,因为向量空间的维度过高,B-Tree 的多维分裂策略完全失效(维度灾难)。
  • 缺少专业优化:向量检索领域有大量专门的算法优化(HNSW 图索引、IVF 倒排索引、PQ 量化压缩等),通用数据库不会内置这些。

专用向量库 vs pgvector:全方位对比

对比维度 专用向量库(Milvus/Qdrant/Chroma/FAISS) pgvector
部署复杂度 需要独立部署,通常需要额外的 etcd、MinIO、Pulsar 等依赖(如 Milvus) 作为 PostgreSQL 扩展,无需额外组件
事务保证 大多数向量库不支持 ACID 事务,或仅支持弱一致性 完整继承 PostgreSQL 的 MVCC 事务保证
混合查询 通常需要额外的元数据过滤层,与向量检索的融合不够自然 天然支持 SQL 的 WHERE + ORDER BY 联合查询
生态整合 需要额外的数据同步管道,与应用数据库之间存在数据一致性挑战 向量数据与业务数据存储在同一数据库,无需数据同步
运维成本 需要单独监控、备份、扩容一套新的存储系统 复用现有的 PostgreSQL 运维体系
海量向量性能 专为向量检索优化,百亿级向量场景下表现优异(Milvus 可支撑十亿级以上) 适合千万级到亿级向量,百亿级场景下性能不及专用库
高级向量特性 支持更多索引类型(DiskANN、GPU 加速等)、更丰富的量化策略 目前仅支持 HNSW 和 IVFFlat 两种索引
扩展上限 专为向量场景设计,可以深度优化内存管理和磁盘布局 受限于 PostgreSQL 的存储引擎和页面大小(8KB)

pgvector 的核心优势总结

  1. 零额外架构负担:不需要引入新的中间件,对于已经使用 PostgreSQL 的团队来说,启用一个扩展即可获得向量检索能力。这对于中小规模团队尤其重要——少维护一套分布式系统,就少了一大半运维烦恼。

  2. ACID 事务保证:向量数据和结构化数据在同一个事务中写入、更新、回滚,不存在数据不一致的问题。想象一个场景:用户上传文档,你需要同时在 documents 表写入元数据,在 embeddings 表写入向量——pgvector 可以保证这两步要么同时成功,要么同时回滚。

  3. 混合查询天然支持SELECT * FROM docs WHERE category = 'tech' AND created_at > '2026-01-01' ORDER BY embedding <=> $1 LIMIT 10 这一条 SQL 同时实现了结构化过滤和语义检索,这在专用向量库中通常需要额外的过滤后处理步骤。

  4. 生态完美融合:PostgreSQL 的备份工具(pg_dump/pg_restore)、连接池(PgBouncer)、高可用方案(Patroni/流复制)、监控体系(pg_stat)全部可以直接复用。

pgvector 的局限性

  1. 海量数据性能瓶颈:当向量数量达到数十亿甚至百亿级别时,pgvector 的检索延迟会明显高于 Milvus 等专用方案。专用向量库针对大规模场景做了大量底层优化(如 DiskANN 的磁盘友好索引、GPU 并行计算等),而 pgvector 受限于 PostgreSQL 的存储引擎。

  2. 内存管理受限:HNSW 索引需要大量内存来存储图结构,PostgreSQL 的 shared_buffers 配置和操作系统内存管理策略会影响性能上限。

  3. 缺少高级量化技术:专用库通常支持 Product Quantization(PQ)、Scalar Quantization(SQ)等多种量化压缩方案来降低内存占用,pgvector 在这方面选择较少(虽然 halfvec 和 bit 类型提供了部分替代方案)。

选型建议

  • 如果你的数据量在千万级到亿级,团队已有 PostgreSQL 基础设施,且需要混合查询能力 → pgvector 是首选
  • 如果你的数据量达到数十亿级以上,对检索延迟有极致要求,或需要 GPU 加速 → 考虑 Milvus/Qdrant 等专用向量库。
  • 如果你的场景是快速原型验证、中小规模 RAG 应用 → pgvector 完全够用,且开发效率最高。

1.1.2 pgvector 核心定位与发展历程

pgvector 是一个开源的 PostgreSQL 扩展(extension),它的核心定位是:让 PostgreSQL 成为一个全能型数据库——既能处理传统的关系型数据,又能处理 AI 时代的向量数据。它由 Andrew Kane 于 2021 年首次发布,以 MIT 许可证开源,目前已成为 PostgreSQL 生态中最主流的向量检索方案。

版本演进与关键里程碑

版本 发布时间 关键特性
v0.1.0 2021年4月 首次发布,提供基础的 vector 数据类型、L2 距离运算符 <->、余弦距离 <=>、负内积 <#>,以及暴力扫描检索
v0.2.0 2021年7月 引入 IVFFlat 索引,这是 pgvector 首个近似最近邻索引,大幅提升大规模向量检索性能
v0.3.0 2022年4月 支持更大的向量维度(从 2000 维提升到 16000 维),增加了 vector_dims() 函数
v0.4.0 2023年4月 引入 HNSW 索引——这是向量检索领域最重要的索引算法之一,查询性能相比 IVFFlat 有质的飞跃
v0.5.0 2023年9月 HNSW 索引性能优化,支持更多距离度量的索引构建,增加并行索引构建能力
v0.6.0 2024年1月 支持 PostgreSQL 16,优化 HNSW 索引的内存使用,提升构建速度
v0.7.0 2024年5月 里程碑版本:新增 halfvec(半精度向量)、sparsevec(稀疏向量)、bit(二进制向量)三种数据类型,支持余弦距离的二进制向量运算
v0.8.0 2024年10月 新增 iterative scans(迭代扫描)——一种全新的索引扫描策略,可以在精度和速度之间动态调整
v0.8.1-v0.8.2 2025年 稳定性修复、性能微调、文档完善

核心设计理念:pgvector 始终遵循"PostgreSQL 原生"的设计哲学。它不试图重新发明一套存储引擎或查询语言,而是深度融入 PostgreSQL 的类型系统、索引框架和查询优化器。这意味着:

  • 向量数据类型通过 PostgreSQL 的类型扩展机制注册,可以像 integertext 一样在 CREATE TABLE 中使用。
  • 向量索引通过 PostgreSQL 的 AM(Access Method)接口实现,可以使用标准的 CREATE INDEX 语法创建。
  • 向量运算符通过 PostgreSQL 的运算符框架注册,查询优化器可以自动选择是否使用向量索引。

这种设计的最大好处是:你学到的所有 PostgreSQL 知识(事务、连接、权限、备份、监控)都可以直接用于 pgvector。


1.1.3 向量基础三要素详解

一、嵌入向量(Embedding)的本质与生成原理

Embedding 是将离散的高维数据(如自然语言文本、图片像素、用户行为序列)映射到连续的低维稠密向量空间的过程。这个映射的核心思想源自分布式语义假说:语义相似的实体,在向量空间中的距离也应该更近

以文本 Embedding 为例,当你将"如何提升客户满意度"输入 OpenAI 的 text-embedding-3-small 模型时,模型内部经历了以下过程:

  1. 分词(Tokenization):将文本切分为子词单元(subword tokens),如"如何/提升/客户/满意度"可能被进一步拆分为更小的词片。
  2. 编码(Encoding):通过多层 Transformer 架构,每个 token 被编码为包含上下文信息的向量表示。模型的自注意力机制会捕获词与词之间的语义关系。
  3. 池化(Pooling):将所有 token 的向量通过平均池化或 [CLS] token 等方式聚合为一个固定维度的向量(如 1536 维)。
  4. 归一化(Normalization):部分模型会对最终向量做 L2 归一化,使其成为单位向量(模长为 1),这样余弦距离就等价于内积。

最终输出的是一个包含 1536 个浮点数的数组,例如:[0.0234, -0.0187, 0.0543, ..., 0.0091]。这个向量就是原始文本在语义空间中的"坐标"。

常见的 Embedding 模型及其维度

模型名称 提供商 默认维度 最大 Token 数 特点
text-embedding-3-small OpenAI 1536 8191 轻量高效,支持维度截断
text-embedding-3-large OpenAI 3072 8191 更高精度,适合对质量要求高的场景
text-embedding-v3 通义千问 1024 8192 中文效果优秀,支持动态维度
BGE-M3 BAAI 1024 8192 开源模型,支持多语言和稠密/稀疏/多向量三种表示
bge-large-zh-v1.5 BAAI 1024 512 中文场景经典模型
m3e-base Moka AI 768 512 中文轻量级模型
jina-embeddings-v3 Jina AI 可变 8192 支持任务特定维度调整

二、向量维度的选择与影响

向量维度是一个需要在"表达精度"和"计算/存储效率"之间权衡的参数:

  • 维度越高,语义信息越丰富,检索精度越高,但存储成本和计算开销也线性增长。
  • 维度越低,存储和计算效率更高,但可能丢失部分语义信息。

存储成本计算(以 vector(n) 类型为例,每个维度占用 4 字节的 float32):

单条向量存储大小 ≈ 4 × n + 8 字节(PostgreSQL 的 varlena 头部开销)

示例:
- 768 维向量:4 × 768 + 8 = 3,080 字节 ≈ 3 KB
- 1024 维向量:4 × 1024 + 8 = 4,104 字节 ≈ 4 KB
- 1536 维向量:4 × 1536 + 8 = 6,152 字节 ≈ 6 KB
- 3072 维向量:4 × 3072 + 8 = 12,296 字节 ≈ 12 KB

100 万条 1536 维向量的纯数据存储:约 6 GB
1000 万条 1536 维向量的纯数据存储:约 60 GB

如果使用 halfvec(半精度,2 字节/维),存储成本减半;如果使用 bit(二进制,1 bit/维),存储成本降低约 32 倍。

三、三类距离算法的数学原理与适用场景

(1)欧氏距离(L2 Distance / Euclidean Distance)

数学公式:

d(A, B) = √(Σᵢ (aᵢ - bᵢ)²)

即两个向量在各维度上差值的平方和再开方。这是最直觉的距离度量——想象两个点在 n 维空间中的直线距离。

直观理解:把两个向量想象成 n 维空间中的两个点,欧氏距离就是连接这两点的直线长度。如果两个文本的 Embedding 向量在空间中靠得很近,说明它们的语义相似。

pgvector 运算符<-> (注意:pgvector 返回的是欧氏距离的平方,省去了开方运算以提高性能,但不影响排序结果)

-- 计算两个向量的欧氏距离
SELECT '[1, 2, 3]'::vector <-> '[4, 5, 6]'::vector AS l2_distance;
-- 结果:5.196152(即 √27 = √((4-1)² + (5-2)² + (6-3)²))

适用场景

  • 通用场景的默认选择
  • 当 Embedding 模型的输出未经过归一化时,欧氏距离是最稳妥的选择
  • 图像处理中的特征向量比较

(2)余弦距离(Cosine Distance)

数学公式:

cosine_similarity(A, B) = (A · B) / (||A|| × ||B||) = Σ(aᵢ × bᵢ) / (√Σaᵢ² × √Σbᵢ²)

cosine_distance(A, B) = 1 - cosine_similarity(A, B)

余弦相似度衡量的是两个向量方向的相似程度(夹角的余弦值),而不关心它们的绝对大小(模长)。余弦距离 = 1 - 余弦相似度。

直观理解:想象两个向量是从原点出发的两根箭头。如果它们指向几乎相同的方向,余弦相似度接近 1,余弦距离接近 0;如果它们完全正交(垂直),余弦相似度为 0,余弦距离为 1;如果方向完全相反,余弦相似度为 -1,余弦距离为 2。

pgvector 运算符<=>

-- 计算两个向量的余弦距离
SELECT '[1, 2, 3]'::vector <=> '[2, 4, 6]'::vector AS cosine_distance;
-- 结果:0(因为两个向量方向完全相同,[2,4,6] = 2 × [1,2,3])

适用场景

  • 文本语义检索的首选距离度量(文本 Embedding 通常关注语义方向而非绝对大小)
  • 当 Embedding 模型输出经过 L2 归一化时,余弦距离等价于欧氏距离(此时两者结果一致)
  • 大多数 RAG 应用推荐使用余弦距离

(3)内积(Inner Product / Dot Product)

数学公式:

inner_product(A, B) = A · B = Σ(aᵢ × bᵢ)

内积同时考虑了向量的方向和模长。两个向量方向越一致、模长越大,内积值越大。

注意:pgvector 中 <#> 运算符返回的是负内积(negative inner product),即 -Σ(aᵢ × bᵢ)。这是为了让所有距离度量都遵循"值越小越相似"的统一语义(因为内积本身是"值越大越相似")。

-- 计算两个向量的负内积
SELECT '[1, 2, 3]'::vector <#> '[4, 5, 6]'::vector AS negative_inner_product;
-- 结果:-32(即 -(1×4 + 2×5 + 3×6) = -(4+10+18) = -32)
-- 值为 -32 意味着内积为 32,负内积为 -32
-- 值越小(越负)表示内积越大,向量越相似

适用场景

  • 当 Embedding 已经过归一化时,内积等价于余弦相似度
  • 推荐系统中的协同过滤
  • 需要同时考虑方向和"强度"的场景

三种距离的关系总结

距离度量 pgvector 运算符 返回值含义 越相似时值 推荐场景
欧氏距离 <-> 空间距离 越接近 0 通用、未归一化向量
余弦距离 <=> 方向差异 越接近 0 文本语义检索(首选)
负内积 <#> 负的内积值 越小(越负) 归一化向量、推荐系统

重要提示:当向量已做 L2 归一化(即 ||v|| = 1)时,三种距离是等价的(存在单调映射关系),此时选择哪种运算符对检索结果排序没有影响。但在创建索引时,需要确保索引使用的距离度量与查询时一致。


1.1.4 大模型 RAG 中 pgvector 的完整落地链路

RAG(Retrieval-Augmented Generation,检索增强生成)是 pgvector 最典型的应用场景。其核心思想是:先从知识库中检索出与用户问题最相关的文档片段,再将这些片段作为上下文拼接给大模型,让大模型基于这些"参考资料"生成准确的回答。

完整的 RAG 链路分为离线索引阶段在线检索生成阶段

┌──────────────────── 离线索引阶段 ────────────────────┐
│                                                       │
│  原始文档 → 文档切片 → Embedding向量化 → 写入pgvector   │
│                                                       │
└───────────────────────────────────────────────────────┘

┌────────────── 在线检索生成阶段 ───────────────────────┐
│                                                       │
│  用户提问 → Query向量化 → pgvector语义检索 →            │
│  Top-K文档片段 → 拼接Prompt → LLM生成回答              │
│                                                       │
└───────────────────────────────────────────────────────┘

每一步的详细说明

步骤 1:文档切片(Chunking)

原始文档(PDF、Word、网页、数据库记录等)通常很长,无法直接作为一个整体进行向量化和检索。原因有二:

  • Embedding 模型有输入长度限制(通常 512-8192 tokens),超出部分会被截断。
  • 即使不截断,过长的文本向量化后会"稀释"语义信息,导致检索质量下降。

因此,需要将文档切分为合适大小的片段(chunk)。常见策略:

切片策略 说明 适用场景
固定长度切片 按字符数或 Token 数切分,如每 512 tokens 一个 chunk 通用场景,实现简单
重叠切片 相邻 chunk 之间有重叠(如 overlap=100 tokens),保证语义连贯性 语义跨越切分边界时
语义切片 根据段落、章节等自然边界切分 结构化文档(论文、报告)
递归切片 先尝试按大粒度切分,超出长度时再递归细分 LangChain 默认策略
# 使用 LangChain 的递归文本切分器
from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,        # 每个 chunk 最大 512 个字符
    chunk_overlap=50,      # 相邻 chunk 重叠 50 个字符
    separators=["\n\n", "\n", "。", ".", " ", ""]  # 按优先级尝试切分
)

chunks = splitter.split_text(document_content)
# 输出示例:
# chunk[0]: "第一章 引言\n本文介绍了基于pgvector的向量检索方案..."
# chunk[1]: "...pgvector作为PostgreSQL扩展,提供了原生的向量数据类型支持..."
# chunk[2]: "第二章 系统架构\n本系统采用pgvector作为向量存储引擎..."

步骤 2:向量化嵌入(Embedding)

将每个文本 chunk 通过 Embedding 模型转化为高维向量:

from openai import OpenAI

client = OpenAI()

def get_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
    """将文本转换为向量嵌入"""
    response = client.embeddings.create(
        model=model,
        input=text
    )
    return response.data[0].embedding  # 返回 1536 维的浮点数列表

# 对每个 chunk 生成向量
chunk_embeddings = []
for chunk in chunks:
    embedding = get_embedding(chunk)
    chunk_embeddings.append({
        'content': chunk,
        'embedding': embedding,
        'metadata': {'source': 'document.pdf', 'page': 1}
    })

步骤 3:写入 pgvector

将向量和对应的文本内容、元数据一起写入 PostgreSQL:

import psycopg
from psycopg.adapt import Dumper

conn = psycopg.connect("postgresql://user:pass@localhost:5432/mydb")
cur = conn.cursor()

# 批量写入
for item in chunk_embeddings:
    cur.execute("""
        INSERT INTO documents (content, metadata, embedding)
        VALUES (%s, %s, %s::vector)
    """, (item['content'], json.dumps(item['metadata']), item['embedding']))

conn.commit()

步骤 4:语义检索

当用户提问时,将问题向量化,然后在 pgvector 中执行近邻搜索:

user_query = "pgvector支持哪些索引类型?"
query_embedding = get_embedding(user_query)

# 语义检索 Top-5 最相关的文档片段
cur.execute("""
    SELECT 
        content,
        metadata,
        1 - (embedding <=> %s::vector) AS similarity_score
    FROM documents
    ORDER BY embedding <=> %s::vector
    LIMIT 5
""", (query_embedding, query_embedding))

results = cur.fetchall()

步骤 5:拼接上下文给 LLM

将检索到的文档片段拼接为 Prompt 的上下文部分:

# 构建 RAG Prompt
context = "\n\n---\n\n".join([row[0] for row in results])

prompt = f"""你是一个知识库助手。请基于以下参考资料回答用户的问题。
如果参考资料中没有相关信息,请明确告知。

参考资料:
{context}

用户问题:{user_query}

请用中文回答:"""

# 调用 LLM
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": prompt}]
)

print(response.choices[0].message.content)

完整链路的数据流

用户输入: "pgvector支持哪些索引类型?"
    ↓
Query Embedding: [0.023, -0.015, ..., 0.089] (1536维)
    ↓
pgvector 语义检索 (ORDER BY embedding <=> query_vec LIMIT 5)
    ↓
检索结果:
  [0.95] "pgvector支持两种索引类型:HNSW和IVFFlat..."
  [0.89] "HNSW索引是一种基于图的近似最近邻搜索算法..."
  [0.85] "IVFFlat索引通过将向量空间划分为多个聚类..."
  [0.82] "创建HNSW索引的SQL语法为:CREATE INDEX..."
  [0.78] "索引选择建议:对于百万级以下的数据量..."
    ↓
拼接 Prompt → LLM 生成回答:
  "pgvector 支持两种主要的索引类型:HNSW(Hierarchical Navigable Small World)
   和 IVFFlat(Inverted File with Flat storage)。HNSW 是一种基于图的..."

这条完整的链路就是 RAG 的核心工作流,而 pgvector 在其中扮演的角色就是语义检索引擎——负责高效地从知识库中找到最相关的文档片段。


1.2 多环境安装部署(全覆盖实操)

1.2.1 Docker 一键部署(最快入门方案)

Docker 部署是最快的入门方式,适合学习、开发和测试环境。

方式一:docker run 快速启动

# 使用官方 pgvector 镜像(基于 PostgreSQL 17)
docker run -d \
  --name pgvector-demo \
  -e POSTGRES_USER=vector_user \
  -e POSTGRES_PASSWORD=vector_pass \
  -e POSTGRES_DB=vector_db \
  -p 5432:5432 \
  -v pgvector_data:/var/lib/postgresql/data \
  pgvector/pgvector:pg17

# 参数说明:
# -d             : 后台运行
# --name         : 容器名称
# -e POSTGRES_USER     : 数据库用户名
# -e POSTGRES_PASSWORD : 数据库密码
# -e POSTGRES_DB       : 默认数据库名
# -p 5432:5432   : 端口映射(宿主机:容器)
# -v pgvector_data     : 数据持久化卷(避免容器删除后数据丢失)
# pgvector/pgvector:pg17 : 基于 PostgreSQL 17 的 pgvector 镜像

# 等待容器启动完成(约 5-10 秒)
sleep 5

# 连接并验证
docker exec -it pgvector-demo psql -U vector_user -d vector_db -c "CREATE EXTENSION vector;"

方式二:docker-compose 配置(推荐生产开发环境)

创建 docker-compose.yml 文件:

version: '3.8'

services:
  postgres:
    image: pgvector/pgvector:pg17
    container_name: pgvector-dev
    environment:
      POSTGRES_USER: vector_user
      POSTGRES_PASSWORD: vector_pass
      POSTGRES_DB: vector_db
      # 性能调优环境变量
      POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --locale=C"
    ports:
      - "5432:5432"
    volumes:
      # 数据持久化
      - pgvector_data:/var/lib/postgresql/data
      # 初始化脚本(可选)
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    # 资源配置
    deploy:
      resources:
        limits:
          memory: 4G
          cpus: '2'
    # 健康检查
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U vector_user -d vector_db"]
      interval: 10s
      timeout: 5s
      retries: 5
    # 重启策略
    restart: unless-stopped

volumes:
  pgvector_data:
    driver: local

创建初始化脚本 init.sql(docker-compose 首次启动时自动执行):

-- 启用 pgvector 扩展
CREATE EXTENSION IF NOT EXISTS vector;

-- 创建一个示例表用于验证
CREATE TABLE IF NOT EXISTS test_vectors (
    id bigserial PRIMARY KEY,
    content text,
    embedding vector(3)
);

-- 插入测试数据
INSERT INTO test_vectors (content, embedding) VALUES
    ('hello world', '[1, 2, 3]'),
    ('goodbye world', '[4, 5, 6]'),
    ('pgvector tutorial', '[7, 8, 9]');

启动:

# 启动服务
docker-compose up -d

# 查看日志确认启动成功
docker-compose logs -f postgres

# 验证
docker exec -it pgvector-dev psql -U vector_user -d vector_db -c "
    SELECT id, content, embedding, 
           embedding <-> '[3, 1, 2]'::vector AS distance
    FROM test_vectors 
    ORDER BY embedding <-> '[3, 1, 2]'::vector;
"

# 停止服务
docker-compose down

# 停止并清除数据(慎用)
docker-compose down -v

可选镜像标签

镜像标签 说明
pgvector/pgvector:pg17 PostgreSQL 17 + pgvector(推荐)
pgvector/pgvector:pg16 PostgreSQL 16 + pgvector
pgvector/pgvector:pg15 PostgreSQL 15 + pgvector
pgvector/pgvector:pg14 PostgreSQL 14 + pgvector

1.2.2 服务器原生部署

一、源码编译安装(通用方法,适用于所有 Linux 发行版)

前提条件

  • 已安装 PostgreSQL 12-17(开发版本均可)
  • 已安装 postgresql-server-dev 开发包(提供 pg_config 和头文件)
  • 已安装编译工具链(gcc、make)

Ubuntu/Debian 系统

# 1. 安装 PostgreSQL 和开发依赖
sudo apt update
sudo apt install -y postgresql-17 postgresql-server-dev-17 build-essential git

# 如果还没安装 PostgreSQL,先安装
# sudo apt install -y postgresql-17

# 2. 确认 pg_config 可用
pg_config --version
# 输出应类似:PostgreSQL 17.x

# 3. 克隆 pgvector 源码
cd /tmp
git clone --branch v0.8.2 https://github.com/pgvector/pgvector.git
cd pgvector

# 4. 编译
make

# 5. 安装(需要 root 权限或 sudo)
sudo make install

# 6. 验证安装文件
ls -la $(pg_config --pkglibdir)/vector.so
# 应该看到 vector.so 文件存在

CentOS/RHEL/AlmaLinux 系统

# 1. 安装 PostgreSQL 仓库(以 PG 17 为例)
sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-redhat-repo-latest.noarch.rpm

# 2. 安装 PostgreSQL 和开发依赖
sudo dnf install -y postgresql17-server postgresql17-devel
sudo dnf install -y gcc make git

# 3. 初始化数据库(如果是新安装)
sudo /usr/pgsql-17/bin/postgresql-17-setup initdb
sudo systemctl start postgresql-17
sudo systemctl enable postgresql-17

# 4. 确保 pg_config 在 PATH 中
export PATH=/usr/pgsql-17/bin:$PATH
pg_config --version

# 5. 克隆并编译 pgvector
cd /tmp
git clone --branch v0.8.2 https://github.com/pgvector/pgvector.git
cd pgvector
make
sudo make install

macOS(Homebrew)

# 使用 Homebrew 安装
brew install pgvector

# 或者从源码编译
brew install postgresql@17
git clone --branch v0.8.2 https://github.com/pgvector/pgvector.git
cd pgvector
make
make install

二、包管理器安装(更简便,但版本可能不是最新)

Ubuntu/Debian(PGDG 仓库)

# 添加 PostgreSQL 官方仓库
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt update

# 安装 pgvector 包
sudo apt install -y postgresql-17-pgvector
# 其他 PG 版本:postgresql-16-pgvector, postgresql-15-pgvector, postgresql-14-pgvector

CentOS/RHEL(PGDG 仓库)

# 安装 PGDG 仓库(如果还没安装)
sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-redhat-repo-latest.noarch.rpm

# 安装 pgvector
sudo dnf install -y pgvector_17
# 其他版本:pgvector_16, pgvector_15, pgvector_14

三、配置 PostgreSQL 加载 pgvector

安装完成后,需要在 PostgreSQL 中启用扩展:

# 切换到 postgres 用户(Linux 默认超级用户)
sudo -u postgres psql

# 或者使用你的自定义用户连接
psql -U your_user -d your_database
-- 启用 pgvector 扩展
CREATE EXTENSION vector;

-- 如果需要同时启用半精度向量等功能(v0.7.0+自动包含)
-- 无需额外操作,CREATE EXTENSION vector 已包含所有类型

-- 可选:启用相关扩展以获得更好的性能
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;  -- 查询性能分析

-- 验证
SELECT extname, extversion FROM pg_extension WHERE extname = 'vector';
-- 应输出:vector | 0.8.2

注意:如果你在 CREATE EXTENSION vector 时遇到权限错误,确保你以数据库超级用户身份执行,或确保当前用户有 CREATE 权限。


1.2.3 云数据库适配

阿里云 RDS PostgreSQL

阿里云 RDS PostgreSQL 已原生支持 pgvector 扩展,无需手动编译安装。

操作步骤:

  1. 确认 RDS 版本:pgvector 支持 PostgreSQL 14 及以上版本的 RDS 实例。建议创建 PostgreSQL 17 版本的实例。

  2. 启用扩展

    -- 使用高权限账号连接 RDS 实例
    -- 注意:RDS 通常不允许使用 postgres 超级用户,需要使用创建实例时设定的管理员账号
    psql -h your-rds-endpoint.pg.rds.aliyuncs.com -U your_admin_user -d your_database
    

– 在目标数据库中启用扩展
CREATE EXTENSION IF NOT EXISTS vector;

– 验证
SELECT * FROM pg_extension WHERE extname = ‘vector’;

3. **RDS 控制台配置**(可选优化):
   - 在"参数设置"中调整 `shared_preload_libraries`,确保包含 pgvector 相关配置。
   - 调整 `shared_buffers` 为实例内存的 25%(如 8GB 实例设为 2GB),这对向量索引的缓存性能至关重要。
   - 调整 `effective_cache_size` 为实例内存的 75%。

**腾讯云数据库 PostgreSQL**:

腾讯云 TDSQL-C PostgreSQL 和云数据库 PostgreSQL 均已支持 pgvector。

操作步骤:

1. **确认实例版本**:需要 PostgreSQL 14 及以上版本。

2. **启用扩展**:
```sql
-- 通过控制台或命令行连接实例
psql -h your-instance.postgres.tencentcloudtdsql.com -U tdsqliam_user -d your_database

-- 启用扩展
CREATE EXTENSION IF NOT EXISTS vector;

-- 验证
SELECT extname, extversion FROM pg_extension;
  1. 腾讯云控制台操作(备选方案):

    • 登录腾讯云控制台 → 云数据库 PostgreSQL

    • 选择目标实例 → “数据库管理” → “插件管理”

    • 找到 vector 插件,点击"安装"

    • 选择目标数据库,确认安装

华为云 RDS PostgreSQL

华为云 RDS PostgreSQL 同样支持 pgvector:

-- 操作步骤与阿里云类似
CREATE EXTENSION IF NOT EXISTS vector;

AWS RDS/Aurora PostgreSQL

AWS RDS for PostgreSQL(15.3 及以上版本)和 Aurora PostgreSQL(3.3.4 及以上版本)已原生支持 pgvector:

-- 通过 psql 或其他客户端连接
-- 启用扩展
CREATE EXTENSION IF NOT EXISTS vector;

-- 如果使用 AWS CDK/Terraform 创建实例,可以在参数组中预配置:
-- rds:extension-name: vector

1.2.4 环境校验 SQL

安装并启用 pgvector 后,执行以下 SQL 全面验证环境是否正常:

-- ============================================================
-- 环境校验脚本:验证 pgvector 安装是否成功
-- ============================================================

-- 1. 检查扩展是否已启用
SELECT extname, extversion, extnamespace::regnamespace 
FROM pg_extension 
WHERE extname = 'vector';
-- 预期输出:vector | 0.8.2 | public

-- 2. 检查可用的向量相关数据类型
SELECT typname, typcategory
FROM pg_type
WHERE typname IN ('vector', 'halfvec', 'sparsevec', 'bit');
-- 预期输出:vector, halfvec, sparsevec 均存在

-- 3. 检查可用的距离运算符
SELECT oprname, oprleft::regtype, oprright::regtype
FROM pg_operator
WHERE oprname IN ('<->', '<=>', '<#>')
  AND oprleft::regtype::text = 'vector';
-- 预期输出三行:<->, <=>, <#>

-- 4. 创建测试表并执行基本操作
CREATE TABLE IF NOT EXISTS _vector_test (
    id serial PRIMARY KEY,
    embedding vector(3)
);

-- 5. 插入测试数据
INSERT INTO _vector_test (embedding) VALUES 
    ('[1, 0, 0]'),
    ('[0, 1, 0]'),
    ('[0, 0, 1]'),
    ('[1, 1, 1]');

-- 6. 测试三种距离计算
SELECT 
    id,
    embedding,
    embedding <-> '[1, 0, 0]'::vector AS l2_dist,
    embedding <=> '[1, 0, 0]'::vector AS cosine_dist,
    embedding <#> '[1, 0, 0]'::vector AS neg_inner_prod
FROM _vector_test
ORDER BY embedding <-> '[1, 0, 0]'::vector;

-- 预期结果:
-- id=1: l2_dist=0, cosine_dist=0, neg_inner_prod=-1(完全匹配)
-- id=4: l2_dist=1.414, cosine_dist=0.423, neg_inner_prod=-1(方向相似但模长不同)

-- 7. 测试向量维度函数
SELECT vector_dims('[1, 2, 3, 4, 5]'::vector);
-- 预期输出:5

-- 8. 测试 halfvec 类型(v0.7.0+)
SELECT '[1.5, 2.5, 3.5]'::halfvec(3);
-- 预期输出:[1.5, 2.5, 3.5]

-- 9. 测试 sparsevec 类型(v0.7.0+)
SELECT '{1:1.5, 3:2.5}/5'::sparsevec;
-- 预期输出:{1:1.5, 3:2.5}/5(5维稀疏向量,第1和第3位有值)

-- 10. 清理测试表
DROP TABLE IF EXISTS _vector_test;

-- 11. 查看 pgvector 版本详情
SELECT extversion FROM pg_extension WHERE extname = 'vector';

-- 如果所有测试通过,说明 pgvector 环境搭建成功!

1.3 客户端连接工具准备

1.3.1 psql 命令行(开发者首选)

psql 是 PostgreSQL 自带的交互式命令行工具,也是使用 pgvector 最轻量的方式。

# 连接到带 pgvector 的数据库
psql -h localhost -p 5432 -U vector_user -d vector_db

# 常用 psql 命令
\dt          -- 列出所有表
\di          -- 列出所有索引
\d table_name  -- 查看表结构
\x           -- 切换扩展显示模式(适合查看长向量)
\timing      -- 开启查询计时(用于性能测试)
\q           -- 退出

连接 pgvector 数据库无需任何特殊配置,因为 pgvector 是服务端扩展,客户端只需正常连接 PostgreSQL 即可。向量数据在 psql 中以文本格式显示(如 [0.1, 0.2, 0.3])。

1.3.2 pgAdmin 图形界面

pgAdmin 是 PostgreSQL 官方推荐的图形化管理工具。

  1. 下载安装:从 https://www.pgadmin.org/download/ 下载对应操作系统版本。

  2. 新建连接

    • 右键 “Servers” → “Create” → “Server…”

    • General 标签页:输入连接名称(如 “pgvector-dev”)

    • Connection 标签页:

      • Host: localhost(或你的服务器地址)
      • Port: 5432
      • Database: vector_db
      • Username: vector_user
      • Password: vector_pass
    • 点击 “Save”

  3. 使用 pgvector

    • 在左侧导航栏展开 Databases → vector_db → Extensions,可看到已启用的 vector 扩展。

    • 打开 “Query Tool” 即可编写和执行 SQL。

    • 注意:pgAdmin 的数据浏览器可以显示向量字段内容,但对于高维向量(如 1536 维),文本显示会很长。建议在查询中使用 vector_dims() 或截取部分维度查看。

1.3.3 DBeaver

DBeaver 是一款通用的数据库管理工具,支持 PostgreSQL 且对 pgvector 兼容良好。

  1. 下载:从 https://dbeaver.io/download/ 下载 Community Edition(免费)。

  2. 新建连接

    • Database → New Database Connection → PostgreSQL

    • 填写 Host、Port、Database、Username、Password

    • 点击 “Test Connection” 确认连通

  3. pgvector 使用提示

    • DBeaver 的 SQL 编辑器完全支持 pgvector 的语法高亮和自动补全。

    • 在结果面板中,向量字段显示为文本字符串。可以双击单元格查看完整内容。

    • 建议使用 DBeaver 的 “ER Diagram” 功能可视化你的向量表结构。

1.3.4 Python 驱动(应用开发必备)

psycopg(v3,推荐)

psycopg3 是 PostgreSQL 的 Python 驱动,性能优秀且 API 设计现代。

# 安装
pip install "psycopg[binary]"   # 二进制版本,无需编译
# 或
pip install psycopg              # 纯 Python 版本
import psycopg
import numpy as np

# 连接数据库
conn = psycopg.connect(
    host="localhost",
    port=5432,
    dbname="vector_db",
    user="vector_user",
    password="vector_pass"
)

cur = conn.cursor()

# 创建表
cur.execute("""
    CREATE TABLE IF NOT EXISTS docs (
        id bigserial PRIMARY KEY,
        content text,
        embedding vector(3)
    )
""")

# 插入向量(Python 列表直接转为字符串格式)
embedding = [0.1, 0.2, 0.3]
cur.execute(
    "INSERT INTO docs (content, embedding) VALUES (%s, %s::vector)",
    ("测试文档", embedding)
)

# 查询
cur.execute("""
    SELECT content, embedding 
    FROM docs 
    ORDER BY embedding <-> %s::vector 
    LIMIT 5
""", ([0.1, 0.2, 0.3],))

for row in cur.fetchall():
    print(f"内容: {row[0]}, 向量: {row[1]}")

conn.commit()
conn.close()

使用 numpy 数组(推荐,性能更好):

import numpy as np
import psycopg

# numpy float32 数组可以直接作为向量参数
embedding = np.array([0.1, 0.2, 0.3], dtype=np.float32)

cur.execute(
    "INSERT INTO docs (content, embedding) VALUES (%s, %s::vector)",
    ("测试文档", embedding.tolist())  # 转为 Python 列表后传入
)

asyncpg(异步场景)

pip install asyncpg
import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect(
        host='localhost', port=5432,
        database='vector_db',
        user='vector_user', password='vector_pass'
    )

    # 需要注册向量类型的编码器/解码器
    await conn.set_type_codec(
        'vector',
        encoder=lambda v: str(v),
        decoder=lambda v: [float(x) for x in v.strip('[]').split(',')]
    )

    # 插入
    embedding = [0.1, 0.2, 0.3]
    await conn.execute(
        "INSERT INTO docs (content, embedding) VALUES ($1, $2::vector)",
        "测试文档", embedding
    )

    # 查询
    rows = await conn.fetch("""
        SELECT content, embedding 
        FROM docs 
        ORDER BY embedding <-> $1::vector 
        LIMIT 5
    """, [0.1, 0.2, 0.3])

    for row in rows:
        print(row)

    await conn.close()

asyncio.run(main())

SQLAlchemy + pgvector 集成

pip install pgvector sqlalchemy
from sqlalchemy import create_engine, Column, BigInteger, Text
from sqlalchemy.orm import declarative_base, Session
from pgvector.sqlalchemy import Vector

Base = declarative_base()

class Document(Base):
    __tablename__ = 'documents'

    id = Column(BigInteger, primary_key=True, autoincrement=True)
    content = Column(Text)
    embedding = Column(Vector(1024))  # pgvector 的 SQLAlchemy 类型

engine = create_engine("postgresql://vector_user:vector_pass@localhost:5432/vector_db")
Base.metadata.create_all(engine)

# 使用 ORM 插入
with Session(engine) as session:
    doc = Document(
        content="这是测试文档",
        embedding=[0.1, 0.2, 0.3] + [0.0] * 1021  # 需要是 1024 维
    )
    session.add(doc)
    session.commit()

1.3.5 ORM 框架推荐

框架/库 语言 pgvector 支持方式 适用场景
LangChain Python 内置 PGVector VectorStore RAG 应用快速原型
LlamaIndex Python 内置 PGVectorStore 知识库构建
pgvector-python Python 提供 psycopg/SQLAlchemy/asyncpg 集成 通用开发
Prisma TypeScript 通过 raw query 支持 Node.js 项目
Drizzle ORM TypeScript 原生支持 pgvector 类型 类型安全的 Node.js 项目

第二阶段:pgvector 基础语法与向量 CRUD

2.1 向量数据类型详解

pgvector 提供了四种向量数据类型,覆盖了从稠密到稀疏、从全精度到二进制的完整场景。理解每种类型的特点、存储格式和适用场景,是高效使用 pgvector 的基础。

2.1.1 vector(n) 定长向量——主流用法

vector(n) 是 pgvector 最核心、最常用的数据类型,表示一个 n 维的稠密浮点向量。

参数说明

  • n:向量维度,取值范围 1-16000
  • 每个维度存储为 4 字节的 float32(单精度浮点数)
  • 存储大小 = 4 × n + 8 字节(8 字节为 PostgreSQL varlena 头部)

常见 Embedding 模型的维度选择

-- OpenAI text-embedding-3-small:1536 维
CREATE TABLE openai_docs (
    id bigserial PRIMARY KEY,
    content text,
    embedding vector(1536)
);

-- 通义千问 text-embedding-v3:1024 维
CREATE TABLE qwen_docs (
    id bigserial PRIMARY KEY,
    content text,
    embedding vector(1024)
);

-- BGE-M3:1024 维
CREATE TABLE bge_docs (
    id bigserial PRIMARY KEY,
    content text,
    embedding vector(1024)
);

-- OpenAI text-embedding-3-large:3072 维
CREATE TABLE openai_large_docs (
    id bigserial PRIMARY KEY,
    content text,
    embedding vector(3072)
);

存储成本估算

假设 100 万条文档,使用 text-embedding-3-small(1536 维):

每条向量存储:4 × 1536 + 8 = 6,152 字节 ≈ 6 KB
100 万条向量总存储:6,152 × 1,000,000 = 6.15 GB(纯数据)
加上 HNSW 索引(通常为数据量的 1.5-2 倍):约 15-18 GB 总磁盘占用

维度不指定的情况:如果不指定维度(即 vector 而非 vector(1536)),则该列可以存储任意维度的向量。但这不推荐,因为:

  • 不同维度的向量之间无法计算距离
  • 无法在该列上创建 HNSW/IVFFlat 索引(索引要求固定维度)
  • 查询时可能出现维度不匹配的运行时错误
-- 不推荐:不指定维度
CREATE TABLE bad_design (
    embedding vector  -- 可以存储任意维度,但无法建索引
);

-- 推荐:明确指定维度
CREATE TABLE good_design (
    embedding vector(1536)  -- 固定 1536 维,可以建索引
);

2.1.2 halfvec(n) 半精度向量

halfvec(n) 是 v0.7.0 新增的数据类型,使用 IEEE 754 半精度浮点数(float16)存储每个维度,存储空间减半

核心特性

  • 每个维度占用 2 字节(而非 vector 的 4 字节)
  • 最大维度:16000(与 vector 相同)
  • 存储大小 = 2 × n + 8 字节
  • 精度范围:约 ±65504,有效精度约 3.3 位十进制数字

存储对比

1536 维向量:
- vector(1536):4 × 1536 + 8 = 6,152 字节
- halfvec(1536):2 × 1536 + 8 = 3,080 字节(节省 50%)

100 万条 1536 维向量:
- vector:约 6.15 GB
- halfvec:约 3.08 GB(节省约 3 GB 磁盘 + 内存)

使用示例

-- 创建使用半精度向量的表
CREATE TABLE docs_halfvec (
    id bigserial PRIMARY KEY,
    content text,
    embedding halfvec(1536)
);

-- 插入数据(语法与 vector 完全相同)
INSERT INTO docs_halfvec (content, embedding) 
VALUES ('测试文档', '[0.1234, 0.5678, -0.9012]');

-- 距离查询(运算符相同)
SELECT content, 
       embedding <=> '[0.1, 0.5, -0.9]'::halfvec AS cosine_dist
FROM docs_halfvec
ORDER BY embedding <=> '[0.1, 0.5, -0.9]'::halfvec
LIMIT 5;

-- vector 与 halfvec 之间可以相互转换
SELECT '[1.5, 2.5, 3.5]'::vector(3)::halfvec(3);
-- 将 vector 转为 halfvec

精度与性能的权衡

维度 vector(float32) halfvec(float16)
有效十进制精度 约 7 位 约 3.3 位
数值范围 ±3.4 × 10^38 ±65504
每维存储 4 字节 2 字节
索引支持 HNSW, IVFFlat HNSW, IVFFlat
距离运算符 <->, <=>, <#> <->, <=>, <#>

何时使用 halfvec

  • 数据量大(亿级以上),需要降低存储和内存开销
  • 对检索精度要求不是极端严格(半精度通常足以满足语义检索需求)
  • Embedding 模型的输出值域在 halfvec 范围内(绝大多数模型满足)

何时不使用 halfvec

  • 向量值可能超出 ±65504 的范围(极少见)
  • 需要极高精度的数值计算场景
  • 需要将向量数据导出到不支持 float16 的系统

2.1.3 sparsevec(nnz, dim) 稀疏向量

sparsevec 是 v0.7.0 新增的稀疏向量类型,专门用于存储大部分元素为零的高维向量。

存储格式{index1:value1, index2:value2, ...}/total_dimensions

  • 只存储非零元素的索引和值
  • nnz(number of non-zeros):非零元素的数量,最大 16000
  • 索引从 1 开始(与 PostgreSQL 数组一致)

适用场景

  1. BM25 特征向量:传统文本检索中,每个词项对应一个维度,文档向量中大部分位置为零。
  2. BGE-M3 的稀疏表示:BGE-M3 模型同时输出稠密向量和稀疏向量,稀疏向量用于关键词级别的精确匹配。
  3. TF-IDF 向量:词频-逆文档频率向量天然是稀疏的。
  4. One-hot 编码:类别特征的独热编码。

使用示例

-- 创建稀疏向量表
CREATE TABLE docs_sparse (
    id bigserial PRIMARY KEY,
    content text,
    -- 假设使用 BM25 特征,词汇表 10000 维,但每个文档只有约 100 个非零项
    sparse_embedding sparsevec(100, 10000)  
    -- sparsevec(最大非零元素数, 总维度)
);

-- 插入稀疏向量:10000维向量,只有第2、第5、第100位有值
INSERT INTO docs_sparse (content, sparse_embedding) 
VALUES (
    '这是一篇关于 PostgreSQL 的文档',
    '{2:0.85, 5:1.23, 100:0.67}/10000'::sparsevec
);

-- 稀疏向量的距离查询
SELECT content,
       sparse_embedding <-> '{2:0.9, 5:1.0, 50:0.5}/10000'::sparsevec AS l2_dist
FROM docs_sparse
ORDER BY sparse_embedding <-> '{2:0.9, 5:1.0, 50:0.5}/10000'::sparsevec
LIMIT 5;

存储效率对比(假设 10000 维向量,100 个非零元素):

使用 vector(10000) 稠密存储:4 × 10000 + 8 = 40,008 字节 ≈ 40 KB
使用 sparsevec(100, 10000) 稀疏存储:约 (4 + 4) × 100 + 头部 ≈ 800 + 头部 字节 ≈ 1 KB
存储节省约 97.5%!

注意事项

  • 稀疏向量之间的索引支持有限,大规模稀疏向量检索可能需要全表扫描
  • 如果非零元素比例超过 30%,建议直接使用稠密 vector 类型

2.1.4 bit(n) 二进制向量

bit(n) 是 v0.7.0 新增的二进制向量类型,每个维度只占 1 bit(0 或 1),实现极致的存储效率。

核心特性

  • 每个维度占用 1 bit
  • 最大维度:64000
  • 存储大小 = ceil(n / 8) + 8 字节(向上取整到字节)
  • 距离度量:汉明距离(Hamming Distance)——两个二进制向量中不同位的数量

存储效率

1024 维向量:
- vector(1024):4 × 1024 + 8 = 4,104 字节
- bit(1024):1024 / 8 + 8 = 136 字节(节省约 97%)

1536 维向量:
- vector(1536):6,152 字节
- bit(1536):1536 / 8 + 8 = 200 字节(节省约 97%)

适用场景

  1. 哈希检索:通过 Locality-Sensitive Hashing(LSH)将高维向量压缩为二进制哈希码,实现极速粗筛。
  2. 图片指纹/感知哈希:图片相似性搜索中,将图片特征压缩为 256 或 512 bit 的指纹。
  3. 布隆过滤器类应用:快速判断某个元素是否存在于集合中。
  4. 大规模粗筛:在百亿级向量场景中,先用 bit 类型做粗筛,再用 vector 类型做精排。

使用示例

-- 创建二进制向量表
CREATE TABLE image_fingerprints (
    id bigserial PRIMARY KEY,
    image_url text,
    fingerprint bit(256)  -- 256位图片指纹
);

-- 插入二进制向量(使用 B'...' 格式或十六进制格式)
INSERT INTO image_fingerprints (image_url, fingerprint) VALUES
    ('https://example.com/photo1.jpg', B'101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010'),
    ('https://example.com/photo2.jpg', B'1100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100110011001100');

-- 使用汉明距离检索相似图片
-- 汉明距离运算符:<~>(pgvector 0.7.0+)
SELECT image_url,
       fingerprint <~> B'1011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010'::bit(256) AS hamming_dist
FROM image_fingerprints
ORDER BY fingerprint <~> B'1011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010101100101011001010110010'::bit(256)
LIMIT 5;

-- 也可以使用 Jaccard 距离:<%>
SELECT image_url,
       fingerprint <%> B'10110...'::bit(256) AS jaccard_dist
FROM image_fingerprints
ORDER BY fingerprint <%> B'10110...'::bit(256)
LIMIT 5;

四种向量类型全面对比

特性 vector(n) halfvec(n) sparsevec(nnz,dim) bit(n)
每维存储 4 bytes 2 bytes 仅存非零 1 bit
最大维度 16,000 16,000 16,000 非零 64,000
数据类型 float32 float16 float32(非零值) 0/1
HNSW 索引 支持 支持 支持 支持
IVFFlat 索引 支持 支持 支持 支持
典型场景 通用语义检索 大规模语义检索 BM25/稀疏特征 哈希/指纹粗筛
版本要求 0.1.0+ 0.7.0+ 0.7.0+ 0.7.0+

2.1.5 向量数据写入规范

Python 列表/Numpy 数组转向量的方式

# 方式 1:Python 列表 → 字符串格式(最通用)
embedding_list = [0.1, 0.2, 0.3, 0.4, 0.5]
vector_str = str(embedding_list)  # "[0.1, 0.2, 0.3, 0.4, 0.5]"
# 在 SQL 中使用:INSERT ... VALUES (... , vector_str::vector)

# 方式 2:Numpy 数组 → 列表 → 字符串
import numpy as np
embedding_np = np.array([0.1, 0.2, 0.3, 0.4, 0.5], dtype=np.float32)
vector_str = str(embedding_np.tolist())  # "[0.1, 0.2, 0.3, 0.4, 0.5]"

# 方式 3:使用 pgvector-python 库的自动转换(推荐)
from pgvector.psycopg import register_vector
import psycopg

conn = psycopg.connect("postgresql://...")
register_vector(conn)  # 注册后,可以直接传入列表/numpy数组

cur = conn.cursor()
cur.execute(
    "INSERT INTO docs (embedding) VALUES (%s)",
    (embedding_list,)  # 直接传入 Python 列表,无需手动转换
)

浮点精度限制

pgvector 的 vector 类型使用 float32(单精度),这意味着:

  • 有效十进制精度约为 7 位数字
  • 超出 7 位的精度会被截断或四舍五入
  • 如果你的 Embedding 模型输出 float64(双精度),写入 pgvector 时会自动转为 float32
-- 精度测试
SELECT '[0.123456789012345]'::vector(1);
-- 输出:[0.12345679]  (只保留了约 7 位有效数字)

-- 如果你的模型输出需要更高精度,考虑使用 double precision 数组辅助存储
-- 但注意:距离运算和索引仍然基于 float32

空向量处理

-- pgvector 不支持空向量(维度为 0)
SELECT '[]'::vector;  -- ERROR: vector must have at least 1 dimension

-- 如果需要表示"无向量"的状态,使用 NULL
CREATE TABLE docs (
    id bigserial PRIMARY KEY,
    content text,
    embedding vector(1536) DEFAULT NULL  -- 允许为空
);

-- 插入时不提供向量
INSERT INTO docs (content) VALUES ('待向量化的文档');
-- embedding 字段为 NULL

-- 查询时过滤掉 NULL 向量
SELECT * FROM docs WHERE embedding IS NOT NULL;

2.2 基础向量 CRUD SQL 实操

2.2.1 建表:设计一个 RAG 标准表结构

一个生产级的 RAG 向量表应该包含以下核心字段:

-- ============================================================
-- RAG 标准文档向量表设计
-- 适用于知识库、文档问答等场景
-- ============================================================

CREATE TABLE knowledge_base (
    -- 主键:自增长整型,用于唯一标识每条记录
    id bigserial PRIMARY KEY,

    -- 文档级字段
    doc_id text NOT NULL,                    -- 原始文档的唯一标识(如文件名、URL、数据库ID)
    title text,                              -- 文档标题

    -- 切片级字段
    chunk_id text NOT NULL,                  -- 切片的唯一标识(如 doc_id + chunk_index)
    chunk_index integer NOT NULL DEFAULT 0,  -- 切片在原文档中的序号
    content text NOT NULL,                   -- 切片文本内容
    content_length integer,                  -- 文本字符长度(冗余字段,便于查询过滤)

    -- 元数据字段(JSONB 提供灵活的键值存储)
    metadata jsonb DEFAULT '{}'::jsonb,      -- 灵活元数据,如:
    -- {
    --   "source": "company_handbook.pdf",
    --   "page": 42,
    --   "category": "HR",
    --   "language": "zh",
    --   "author": "张三",
    --   "tags": ["入职", "规章"]
    -- }

    -- 向量字段:存储 Embedding 向量
    embedding vector(1024),                  -- 1024 维向量(适配 BGE-M3/通义千问等模型)

    -- 审计字段
    created_at timestamptz NOT NULL DEFAULT now(),   -- 创建时间(带时区)
    updated_at timestamptz NOT NULL DEFAULT now()    -- 更新时间
);

-- 为常用查询字段创建索引
CREATE UNIQUE INDEX idx_knowledge_base_chunk_id ON knowledge_base (chunk_id);
CREATE INDEX idx_knowledge_base_doc_id ON knowledge_base (doc_id);
CREATE INDEX idx_knowledge_base_metadata ON knowledge_base USING GIN (metadata);
CREATE INDEX idx_knowledge_base_created_at ON knowledge_base (created_at);
CREATE INDEX idx_knowledge_base_category ON knowledge_base ((metadata->>'category'));

-- 添加表注释
COMMENT ON TABLE knowledge_base IS 'RAG 知识库文档向量表';
COMMENT ON COLUMN knowledge_base.embedding IS '文档切片的 Embedding 向量,1024 维';
COMMENT ON COLUMN knowledge_base.metadata IS 'JSONB 格式的灵活元数据';

-- 创建一个自动更新 updated_at 的触发器
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = now();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER tr_knowledge_base_updated_at
    BEFORE UPDATE ON knowledge_base
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

表设计要点说明

  1. bigserial 主键:使用 64 位自增整数,足够支撑百亿级数据量。避免使用 UUID 作为主键(UUID 随机性会导致索引页分裂,影响写入性能)。
  2. doc_id + chunk_id 分离doc_id 标识原始文档,方便按文档批量操作(如删除某个文档的所有切片);chunk_id 唯一标识每个切片。
  3. metadata 使用 JSONB:JSONB 格式支持 GIN 索引,可以高效地查询任意元数据字段,比预定义列更灵活。
  4. embedding 字段允许 NULL:支持先插入文本内容,再异步生成和回填向量的工作流。

2.2.2 插入向量数据

单行插入

-- 基本单行插入
INSERT INTO knowledge_base (doc_id, chunk_id, chunk_index, title, content, metadata, embedding)
VALUES (
    'doc_001',
    'doc_001_chunk_0',
    0,
    'PostgreSQL 入门指南',
    'PostgreSQL 是一个强大的开源关系型数据库管理系统,以其可靠性、功能丰富和性能著称。',
    '{"source": "pg_guide.pdf", "page": 1, "category": "database", "language": "zh"}'::jsonb,
    '[0.023, -0.015, 0.087, 0.042, -0.033]'::vector  -- 实际应为 1024 维,此处简化
);

-- 插入后获取生成的 id
INSERT INTO knowledge_base (doc_id, chunk_id, content, embedding)
VALUES ('doc_002', 'doc_002_chunk_0', 'pgvector 支持多种向量数据类型', '[0.1, 0.2, 0.3]'::vector)
RETURNING id, chunk_id;

批量插入(INSERT INTO … VALUES 多行)

-- 批量插入多个切片(同一文档的连续切片)
INSERT INTO knowledge_base (doc_id, chunk_id, chunk_index, title, content, metadata, embedding)
VALUES 
    ('doc_001', 'doc_001_chunk_1', 1, 'PostgreSQL 入门指南',
     'PostgreSQL 支持丰富的数据类型,包括整数、浮点数、字符串、JSON、数组等。',
     '{"source": "pg_guide.pdf", "page": 2, "category": "database"}'::jsonb,
     '[0.031, -0.022, 0.095, 0.051, -0.041]'::vector),

    ('doc_001', 'doc_001_chunk_2', 2, 'PostgreSQL 入门指南',
     'PostgreSQL 的索引机制非常强大,支持 B-Tree、Hash、GiST、GIN 等多种索引类型。',
     '{"source": "pg_guide.pdf", "page": 3, "category": "database"}'::jsonb,
     '[0.045, -0.018, 0.073, 0.063, -0.029]'::vector),

    ('doc_001', 'doc_001_chunk_3', 3, 'PostgreSQL 入门指南',
     '事务是 PostgreSQL 的核心特性之一,支持完整的 ACID 保证。',
     '{"source": "pg_guide.pdf", "page": 4, "category": "database"}'::jsonb,
     '[0.052, -0.011, 0.081, 0.038, -0.045]'::vector);

COPY 方式批量导入(适合大批量数据)

-- 准备 CSV 文件:vectors_data.csv
-- 格式:doc_id,chunk_id,chunk_index,content,embedding
-- 内容示例:
-- doc_003,doc_003_chunk_0,0,"第一条文档内容","[0.1, 0.2, 0.3, ...]"
-- doc_003,doc_003_chunk_1,1,"第二条文档内容","[0.4, 0.5, 0.6, ...]"

-- 使用 COPY 命令导入(在 psql 中执行)
\COPY knowledge_base (doc_id, chunk_id, chunk_index, content, embedding) 
FROM '/path/to/vectors_data.csv' 
WITH (FORMAT csv, HEADER true);

-- 或使用 PostgreSQL 的 COPY 命令(需要超级用户权限)
COPY knowledge_base (doc_id, chunk_id, chunk_index, content, embedding) 
FROM '/path/to/vectors_data.csv' 
WITH (FORMAT csv, HEADER true);

Python 批量插入(实际应用中最常见的方式)

import psycopg
from pgvector.psycopg import register_vector
import numpy as np

conn = psycopg.connect("postgresql://vector_user:vector_pass@localhost:5432/vector_db")
register_vector(conn)
cur = conn.cursor()

# 模拟 1000 条文档数据
documents = []
for i in range(1000):
    embedding = np.random.rand(1024).astype(np.float32)  # 模拟 1024 维向量
    documents.append((
        f'doc_{i//10:04d}',           # doc_id
        f'doc_{i//10:04d}_chunk_{i%10}',  # chunk_id
        i % 10,                        # chunk_index
        f'这是第 {i} 个文档切片的文本内容',  # content
        embedding                      # embedding
    ))

# 方式 1:executemany 批量插入
cur.executemany("""
    INSERT INTO knowledge_base (doc_id, chunk_id, chunk_index, content, embedding)
    VALUES (%s, %s, %s, %s, %s)
""", documents)
conn.commit()

# 方式 2:使用 COPY(更快,适合 10 万级以上数据)
from psycopg import sql
with cur.copy("COPY knowledge_base (doc_id, chunk_id, chunk_index, content, embedding) FROM STDIN") as copy:
    for doc in documents:
        # COPY 需要将向量转为字符串格式
        vec_str = '[' + ','.join(str(x) for x in doc[4]) + ']'
        copy.write_row((doc[0], doc[1], doc[2], doc[3], vec_str))
conn.commit()

print(f"成功插入 {len(documents)} 条记录")
cur.close()
conn.close()

2.2.3 查询向量数据

-- 1. 查看前 5 条记录(包括向量字段)
SELECT id, chunk_id, title, content, embedding
FROM knowledge_base
LIMIT 5;

-- 输出示例:
-- id | chunk_id           | title              | content              | embedding
--  1 | doc_001_chunk_0    | PostgreSQL 入门指南  | PostgreSQL 是一个...  | [0.023, -0.015, ...]

-- 2. 查看向量的维度
SELECT id, chunk_id, vector_dims(embedding) AS dims
FROM knowledge_base
WHERE embedding IS NOT NULL
LIMIT 5;

-- 输出:
-- id | chunk_id           | dims
--  1 | doc_001_chunk_0    | 1024

-- 3. 格式化查看向量的部分维度(只看前 5 个元素)
-- 注意:pgvector 没有内置的切片语法,需要通过转换为数组实现
SELECT id, chunk_id, 
       embedding::text AS embedding_text
FROM knowledge_base
WHERE id = 1;

-- 4. 统计各文档的切片数量
SELECT doc_id, title, COUNT(*) AS chunk_count
FROM knowledge_base
GROUP BY doc_id, title
ORDER BY chunk_count DESC;

-- 5. 查看向量的 L2 范数(模长)
SELECT id, chunk_id, 
       embedding <-> '0'::vector AS l2_norm  -- 与零向量的距离即模长
       -- 注意:需要维度匹配,'0' 不正确,应该用全零的 1024 维向量
FROM knowledge_base
WHERE id = 1;

-- 正确计算向量模长(使用 vector 的算术运算)
-- pgvector 没有直接的 norm 函数,但可以通过 l2_distance(zero_vector) 计算
-- 或者使用 sqrt(embedding <#> embedding 的负值),不过最直接的方式是:
SELECT id, chunk_id,
       sqrt((embedding <#> embedding) * -1) AS l2_norm
FROM knowledge_base
WHERE id = 1;
-- 原理:向量与自身的内积 = ||v||²,负内积 = -||v||²,取负后开方 = ||v||

-- 6. 查看向量字段的统计信息
SELECT 
    COUNT(*) AS total_rows,
    COUNT(embedding) AS non_null_vectors,
    COUNT(*) - COUNT(embedding) AS null_vectors
FROM knowledge_base;

2.2.4 更新与删除向量数据

-- ============================================================
-- 更新操作
-- ============================================================

-- 1. 单独更新向量字段(如重新生成 Embedding 后回填)
UPDATE knowledge_base
SET embedding = '[0.099, -0.088, 0.077, 0.066, -0.055]'::vector,
    updated_at = now()
WHERE chunk_id = 'doc_001_chunk_0';

-- 2. 批量更新某个文档的所有切片向量(如更换 Embedding 模型后)
UPDATE knowledge_base
SET embedding = NULL,        -- 先清空旧向量
    updated_at = now()
WHERE doc_id = 'doc_001';
-- 然后异步重新生成向量并回填

-- 3. 更新元数据
UPDATE knowledge_base
SET metadata = metadata || '{"reviewed": true, "reviewer": "李四"}'::jsonb
WHERE doc_id = 'doc_001';

-- ============================================================
-- 删除操作
-- ============================================================

-- 4. 按结构化条件删除(如删除某个文档的所有切片)
DELETE FROM knowledge_base
WHERE doc_id = 'doc_001';
-- 这个操作会删除 doc_001 的所有切片记录,包括它们的向量数据

-- 5. 按元数据条件删除
DELETE FROM knowledge_base
WHERE metadata->>'category' = 'deprecated';

-- 6. 按时间条件删除(如清理过期数据)
DELETE FROM knowledge_base
WHERE created_at < '2025-01-01'::timestamptz;

-- ============================================================
-- 事务一致性演示
-- ============================================================

-- 场景:更新文档元数据的同时更新向量,保证原子性
BEGIN;

-- 步骤 1:更新文档元数据
UPDATE knowledge_base
SET metadata = jsonb_set(metadata, '{version}', '"2.0"')
WHERE doc_id = 'doc_002';

-- 步骤 2:更新对应的向量
UPDATE knowledge_base
SET embedding = '[0.111, 0.222, 0.333, 0.444, 0.555]'::vector
WHERE chunk_id = 'doc_002_chunk_0';

-- 如果任何一步出错,整个事务回滚
COMMIT;
-- 如果出错:ROLLBACK;

-- 演示回滚
BEGIN;
    -- 尝试插入一条重复 chunk_id 的记录(会触发唯一索引冲突)
    INSERT INTO knowledge_base (doc_id, chunk_id, content, embedding)
    VALUES ('doc_999', 'doc_001_chunk_0', '冲突测试', '[0,0,0]'::vector);
    -- ERROR: duplicate key value violates unique constraint
    -- 此时事务中的所有操作都不会被提交
ROLLBACK;
-- 数据库状态与 BEGIN 之前完全一致

2.3 相似度检索基础用法

2.3.1 三类距离运算符实操

-- ============================================================
-- 准备测试数据
-- ============================================================
CREATE TABLE demo_vectors (
    id serial PRIMARY KEY,
    label text,
    embedding vector(5)
);

INSERT INTO demo_vectors (label, embedding) VALUES
    ('A: 原点附近',   '[0.0, 0.0, 0.0, 0.0, 0.0]'),
    ('B: 正方向',     '[1.0, 1.0, 1.0, 1.0, 1.0]'),
    ('C: 负方向',     '[-1.0, -1.0, -1.0, -1.0, -1.0]'),
    ('D: 同向但模长不同', '[2.0, 2.0, 2.0, 2.0, 2.0]'),
    ('E: 正交方向',   '[1.0, 0.0, 0.0, 0.0, 0.0]');

-- 查询向量:使用 B 向量作为查询基准
-- ============================================================

-- 1. 欧氏距离(L2 Distance)<->
-- 计算的是空间中的直线距离,值越小越相似
SELECT 
    label,
    embedding,
    embedding <-> '[1.0, 1.0, 1.0, 1.0, 1.0]'::vector AS l2_distance
FROM demo_vectors
ORDER BY embedding <-> '[1.0, 1.0, 1.0, 1.0, 1.0]'::vector;

-- 预期结果:
-- B: 正方向          | [1,1,1,1,1]     | 0.000  (完全匹配,距离为0)
-- D: 同向但模长不同    | [2,2,2,2,2]     | 2.236  (方向相同但距离较远)
-- E: 正交方向        | [1,0,0,0,0]     | 2.000  (只有第一个维度匹配)
-- A: 原点附近        | [0,0,0,0,0]     | 2.236  (对角线距离)
-- C: 负方向          | [-1,-1,-1,-1,-1] | 4.472  (最远)

-- 2. 余弦距离 <=>
-- 计算的是方向差异,忽略模长,值越小方向越一致
SELECT 
    label,
    embedding,
    embedding <=> '[1.0, 1.0, 1.0, 1.0, 1.0]'::vector AS cosine_distance
FROM demo_vectors
WHERE id != 1  -- 排除零向量(零向量无法计算余弦距离)
ORDER BY embedding <=> '[1.0, 1.0, 1.0, 1.0, 1.0]'::vector;

-- 预期结果:
-- B: 正方向          | 0.000  (方向完全一致)
-- D: 同向但模长不同    | 0.000  (方向也完全一致!余弦距离忽略模长)
-- E: 正交方向        | 0.553  (部分维度同向)
-- C: 负方向          | 2.000  (方向完全相反)

-- 注意:B 和 D 的余弦距离都是 0,因为 [2,2,2,2,2] 与 [1,1,1,1,1] 方向完全相同

-- 3. 负内积 <#>
-- 内积同时考虑方向和模长,pgvector 返回的是负内积,值越小(越负)表示越相似
SELECT 
    label,
    embedding,
    embedding <#> '[1.0, 1.0, 1.0, 1.0, 1.0]'::vector AS neg_inner_product
FROM demo_vectors
ORDER BY embedding <#> '[1.0, 1.0, 1.0, 1.0, 1.0]'::vector;

-- 预期结果:
-- D: 同向但模长不同    | -10.000  (内积=10,最相似,因为模长最大且方向一致)
-- B: 正方向          | -5.000   (内积=5)
-- A: 原点附近        | 0.000    (内积=0)
-- E: 正交方向        | -1.000   (内积=1)
-- C: 负方向          | 5.000    (内积=-5,方向相反)

-- 注意排序:负内积值越小(越负)排越前面,表示内积越大越相似

关键区别总结

查询向量 Q = [1, 1, 1, 1, 1],对比向量 B = [1, 1, 1, 1, 1] 和 D = [2, 2, 2, 2, 2]

- 欧氏距离:B(0.0) < D(2.236)   → B 更近(因为空间距离更近)
- 余弦距离:B(0.0) = D(0.0)      → 一样近(因为方向相同)
- 负内积:  D(-10) < B(-5)       → D 更相似(因为方向相同且模长更大)

2.3.2 Top-K 检索

-- ============================================================
-- Top-K 语义相似度检索
-- ============================================================

-- 基本 Top-K 查询:找出与查询向量最相似的 5 条记录
-- 假设查询向量为 [0.03, -0.02, 0.08, ...](1024 维,此处简化为 5 维演示)
SELECT 
    id,
    doc_id,
    chunk_id,
    title,
    content,
    embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector AS cosine_distance
FROM knowledge_base
WHERE embedding IS NOT NULL
ORDER BY embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector
LIMIT 5;

-- ============================================================
-- 带相似度阈值过滤的 Top-K
-- ============================================================

-- 只返回余弦距离小于 0.3 的结果(即相似度大于 70%)
SELECT 
    id,
    chunk_id,
    content,
    embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector AS cosine_distance,
    1 - (embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector) AS similarity_score
FROM knowledge_base
WHERE embedding IS NOT NULL
  AND embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector < 0.3  -- 阈值过滤
ORDER BY embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector
LIMIT 10;

-- ============================================================
-- 使用 CTE(Common Table Expression)使查询更清晰
-- ============================================================

WITH query AS (
    -- 定义查询向量(可以来自函数参数或子查询)
    SELECT '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector AS q_vec
),
ranked AS (
    SELECT 
        kb.id,
        kb.chunk_id,
        kb.content,
        kb.metadata,
        kb.embedding <=> query.q_vec AS cosine_dist,
        1 - (kb.embedding <=> query.q_vec) AS similarity
    FROM knowledge_base kb, query
    WHERE kb.embedding IS NOT NULL
    ORDER BY kb.embedding <=> query.q_vec
    LIMIT 10
)
SELECT * FROM ranked
WHERE similarity > 0.7  -- 在 Top-10 中进一步筛选相似度 > 70% 的结果
ORDER BY similarity DESC;

2.3.3 混合检索:结构化条件 + 向量检索

混合检索是 pgvector 相比专用向量库的杀手级优势——你可以在一条 SQL 中同时实现结构化过滤和语义检索,无需额外的数据同步或后处理步骤。

-- ============================================================
-- 场景 1:在特定分类中进行语义检索
-- ============================================================

-- 只在"数据库"分类的文档中搜索与"索引优化"语义最相关的 5 条记录
SELECT 
    id,
    chunk_id,
    title,
    content,
    metadata->>'category' AS category,
    1 - (embedding <=> '[0.05, 0.03, -0.02, 0.08, 0.01]'::vector) AS similarity
FROM knowledge_base
WHERE embedding IS NOT NULL
  AND metadata->>'category' = 'database'     -- 结构化过滤:分类为 database
ORDER BY embedding <=> '[0.05, 0.03, -0.02, 0.08, 0.01]'::vector
LIMIT 5;

-- ============================================================
-- 场景 2:时间范围 + 语义检索
-- ============================================================

-- 在最近 30 天新增的文档中搜索
SELECT 
    id,
    chunk_id,
    title,
    content,
    created_at,
    1 - (embedding <=> '[0.05, 0.03, -0.02, 0.08, 0.01]'::vector) AS similarity
FROM knowledge_base
WHERE embedding IS NOT NULL
  AND created_at >= now() - interval '30 days'  -- 时间范围过滤
ORDER BY embedding <=> '[0.05, 0.03, -0.02, 0.08, 0.01]'::vector
LIMIT 5;

-- ============================================================
-- 场景 3:多条件联合过滤 + 语义检索
-- ============================================================

-- 在中文文档中,搜索 HR 分类下最近审核过的内容
SELECT 
    id,
    chunk_id,
    title,
    content,
    metadata->>'category' AS category,
    metadata->>'language' AS language,
    1 - (embedding <=> '[0.02, 0.06, -0.01, 0.04, 0.03]'::vector) AS similarity
FROM knowledge_base
WHERE embedding IS NOT NULL
  AND metadata->>'language' = 'zh'               -- 语言为中文
  AND metadata->>'category' = 'HR'               -- 分类为 HR
  AND metadata->>'reviewed' = 'true'             -- 已审核
ORDER BY embedding <=> '[0.02, 0.06, -0.01, 0.04, 0.03]'::vector
LIMIT 5;

-- ============================================================
-- 场景 4:先向量检索后结构化过滤(Pre-filtering vs Post-filtering)
-- ============================================================

-- 方式 A:Post-filtering(先向量检索取 Top-50,再从中过滤)
-- 适用于:过滤条件选择性较高(大部分记录满足条件)的情况
WITH vector_results AS (
    SELECT 
        id, chunk_id, content, metadata,
        1 - (embedding <=> '[0.05, 0.03, -0.02, 0.08, 0.01]'::vector) AS similarity
    FROM knowledge_base
    WHERE embedding IS NOT NULL
    ORDER BY embedding <=> '[0.05, 0.03, -0.02, 0.08, 0.01]'::vector
    LIMIT 50  -- 先取较大的 Top-K
)
SELECT * FROM vector_results
WHERE metadata->>'category' = 'database'  -- 再过滤
ORDER BY similarity DESC
LIMIT 5;

-- 方式 B:Pre-filtering(WHERE 子句同时包含结构化和向量条件)
-- 适用于:PostgreSQL 查询优化器会自动选择最优执行计划
SELECT 
    id, chunk_id, content, metadata,
    1 - (embedding <=> '[0.05, 0.03, -0.02, 0.08, 0.01]'::vector) AS similarity
FROM knowledge_base
WHERE embedding IS NOT NULL
  AND metadata->>'category' = 'database'
ORDER BY embedding <=> '[0.05, 0.03, -0.02, 0.08, 0.01]'::vector
LIMIT 5;

-- 建议:对于大多数场景,直接使用方式 B(让 PostgreSQL 优化器决定执行计划)
-- 如果发现过滤后结果太少,可以增大向量检索的候选集(在子查询中取更大的 LIMIT)

2.3.4 余弦相似度分数计算

在实际应用中,我们通常需要将距离转化为更直观的"相似度分数"(0-1 范围,1 表示完全相同):

-- ============================================================
-- 余弦相似度分数 = 1 - 余弦距离
-- ============================================================

-- 余弦距离范围:[0, 2]
-- 余弦相似度 = 1 - 余弦距离,范围:[-1, 1]
-- 对于归一化向量,余弦距离范围:[0, 1](不会出现负值)

-- 基本用法
SELECT 
    id,
    chunk_id,
    content,
    embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector AS cosine_distance,
    1 - (embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector) AS cosine_similarity
FROM knowledge_base
WHERE embedding IS NOT NULL
ORDER BY cosine_similarity DESC
LIMIT 5;

-- 输出示例:
-- cosine_distance | cosine_similarity | 含义
-- 0.05            | 0.95             | 非常相似
-- 0.15            | 0.85             | 高度相似
-- 0.35            | 0.65             | 中度相似
-- 0.60            | 0.40             | 较低相似
-- 0.85            | 0.15             | 不太相似

-- ============================================================
-- 将相似度映射到百分制(更直观)
-- ============================================================

SELECT 
    id,
    chunk_id,
    content,
    ROUND(
        (1 - (embedding <=> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector)) * 100, 
        2
    ) AS similarity_percent
FROM knowledge_base
WHERE embedding IS NOT NULL
ORDER BY similarity_percent DESC
LIMIT 5;

-- 输出示例:
-- similarity_percent | 含义
-- 95.23             | 95.23% 相似
-- 85.67             | 85.67% 相似

-- ============================================================
-- 欧氏距离转相似度(需要归一化)
-- ============================================================

-- 欧氏距离没有固定的上界,无法直接转为 0-1 相似度
-- 常用的转换方式:similarity = 1 / (1 + distance)
SELECT 
    id,
    chunk_id,
    embedding <-> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector AS l2_distance,
    1.0 / (1.0 + (embedding <-> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector)) AS l2_similarity
FROM knowledge_base
WHERE embedding IS NOT NULL
ORDER BY l2_similarity DESC
LIMIT 5;

-- ============================================================
-- 内积转相似度
-- ============================================================

-- 对于归一化向量,负内积 = -cosine_similarity
-- 所以 cosine_similarity = -(neg_inner_product) = inner_product
SELECT 
    id,
    chunk_id,
    embedding <#> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector AS neg_inner_product,
    -(embedding <#> '[0.03, -0.02, 0.08, 0.04, -0.03]'::vector) AS inner_product_score
FROM knowledge_base
WHERE embedding IS NOT NULL
ORDER BY inner_product_score DESC
LIMIT 5;

完整的 RAG 检索 SQL 模板(可直接用于生产环境):

-- ============================================================
-- 生产级 RAG 检索模板
-- 参数:$1 = 查询向量, $2 = 分类过滤(可选), $3 = Top-K
-- ============================================================

WITH search_results AS (
    SELECT 
        kb.id,
        kb.doc_id,
        kb.chunk_id,
        kb.title,
        kb.content,
        kb.metadata,
        1 - (kb.embedding <=> $1::vector) AS similarity_score,
        kb.created_at
    FROM knowledge_base kb
    WHERE kb.embedding IS NOT NULL
      AND ($2::text IS NULL OR kb.metadata->>'category' = $2::text)  -- 可选分类过滤
    ORDER BY kb.embedding <=> $1::vector
    LIMIT $3::integer
)
SELECT 
    id,
    doc_id,
    chunk_id,
    title,
    content,
    metadata,
    ROUND(similarity_score::numeric, 4) AS similarity_score,
    created_at
FROM search_results
WHERE similarity_score >= 0.5  -- 最低相似度阈值
ORDER BY similarity_score DESC;

2.4 新手高频报错解决

错误 1:维度不匹配(最常见)

ERROR:  different vector dimensions, in [1024] and [1536]

原因:插入或查询时,向量的维度与列定义的维度不一致。

-- 表定义为 1024 维
CREATE TABLE docs (embedding vector(1024));

-- 错误:插入 1536 维向量
INSERT INTO docs (embedding) VALUES ('[0.1, 0.2, ...(共1536个值)]'::vector);
-- ERROR: different vector dimensions, in [1024] and [1536]

-- 错误:查询向量维度不匹配
SELECT * FROM docs ORDER BY embedding <=> '[0.1, 0.2, 0.3]'::vector LIMIT 5;
-- ERROR: different vector dimensions, in [1024] and [3]

解决方案

-- 1. 确认表的向量维度
SELECT column_name, udt_name, character_maximum_length
FROM information_schema.columns
WHERE table_name = 'docs' AND column_name = 'embedding';

-- 或者使用 pgvector 的 vector_dims 函数
SELECT vector_dims(embedding) FROM docs LIMIT 1;

-- 2. 确保插入和查询的向量维度与定义一致
-- 如果确实需要改变维度(如更换了 Embedding 模型),需要重建表:
-- 步骤 A:新增列
ALTER TABLE docs ADD COLUMN embedding_new vector(1536);
-- 步骤 B:重新生成所有向量并填入新列
-- 步骤 C:删除旧列,重命名新列
ALTER TABLE docs DROP COLUMN embedding;
ALTER TABLE docs RENAME COLUMN embedding_new TO embedding;
-- 步骤 D:重建索引

错误 2:extension 未启用

ERROR:  type "vector" does not exist
LINE 1: CREATE TABLE docs (embedding vector(1024));
                                     ^

原因:当前数据库没有安装 pgvector 扩展。

解决方案

-- 1. 首先启用扩展(需要数据库 CREATE 权限)
CREATE EXTENSION IF NOT EXISTS vector;

-- 2. 如果提示 "permission denied",使用超级用户:
-- sudo -u postgres psql -d your_database -c "CREATE EXTENSION vector;"

-- 3. 如果提示 "could not open extension control file",说明 pgvector 没有安装
-- 需要先按照 1.2 节的步骤安装 pgvector

-- 4. 验证扩展是否启用
SELECT extname, extversion FROM pg_extension WHERE extname = 'vector';

-- 注意:CREATE EXTENSION 是针对单个数据库的
-- 如果你在 database_A 中启用了,database_B 中仍然需要单独启用
-- 如果希望所有新数据库自动启用,可以在 template1 中启用:
-- sudo -u postgres psql -d template1 -c "CREATE EXTENSION vector;"

错误 3:向量格式错误

ERROR:  invalid input syntax for type vector: "1, 2, 3"

常见格式错误及正确写法

-- 错误:缺少方括号
SELECT '1, 2, 3'::vector;
-- ERROR: invalid input syntax

-- 正确:必须有方括号
SELECT '[1, 2, 3]'::vector;

-- 错误:使用了花括号(这是 PostgreSQL 数组的格式)
SELECT '{1, 2, 3}'::vector;
-- ERROR: invalid input syntax

-- 错误:使用了圆括号
SELECT '(1, 2, 3)'::vector;
-- ERROR: invalid input syntax

-- 错误:包含非数字字符
SELECT '[1, 2, abc]'::vector;
-- ERROR: invalid input syntax

-- 错误:维度为 0(空向量)
SELECT '[]'::vector;
-- ERROR: vector must have at least 1 dimension

-- 正确:标准格式
SELECT '[1.0, 2.0, 3.0]'::vector;          -- 浮点数
SELECT '[1, 2, 3]'::vector;                -- 整数(自动转为 float32)
SELECT '[1e-5, 2.5e3, -0.001]'::vector;    -- 科学计数法

-- 正确:指定维度
SELECT '[1, 2, 3]'::vector(3);             -- 明确指定 3 维

-- 错误:指定维度与实际不匹配
SELECT '[1, 2, 3]'::vector(5);
-- ERROR: different vector dimensions, in [5] and [3]

Python 中的常见格式错误

# 错误:直接传入 numpy 数组(某些驱动不支持)
import numpy as np
embedding = np.array([0.1, 0.2, 0.3])
cur.execute("INSERT INTO docs (embedding) VALUES (%s)", (embedding,))
# 可能报错

# 正确:转为 Python 列表
cur.execute("INSERT INTO docs (embedding) VALUES (%s::vector)", (embedding.tolist(),))

# 正确:转为字符串格式
vector_str = '[' + ','.join(str(x) for x in embedding) + ']'
cur.execute("INSERT INTO docs (embedding) VALUES (%s::vector)", (vector_str,))

# 最推荐:使用 pgvector-python 注册自动转换
from pgvector.psycopg import register_vector
register_vector(conn)
cur.execute("INSERT INTO docs (embedding) VALUES (%s)", (embedding.tolist(),))

错误 4:浮点精度问题

-- 现象:插入的向量值与查询出来的不完全一致
INSERT INTO test_vec (embedding) VALUES ('[0.123456789012345678]'::vector(1));
SELECT embedding FROM test_vec;
-- 结果:[0.12345679]  (精度被截断到约 7 位有效数字)

-- 这不是 bug!vector 类型使用 float32(单精度浮点),精度约为 7 位十进制数字
-- float32 能精确表示的范围:±1.17549435e-38 到 ±3.40282347e+38

-- 如果确实需要更高精度(不推荐,会增加存储和计算成本),可以:
-- 1. 将高精度值存储为额外的 text 列(仅用于审计/追溯)
-- 2. 接受 float32 精度限制(绝大多数 AI 应用场景足够)

-- 精度对检索的影响测试
SELECT 
    '[0.1234567]'::vector(1) <-> '[0.1234568]'::vector(1) AS diff_7th_digit;
-- 结果:约 1e-7,对检索排序几乎没有影响

-- 结论:float32 的精度对语义检索完全够用,不必纠结精度损失

错误 5:字符编码冲突

ERROR:  invalid byte sequence for encoding "UTF8": 0xXX

原因:数据库编码不是 UTF-8,或者客户端连接编码设置不正确。

解决方案

-- 1. 检查数据库编码
SHOW server_encoding;
-- 应为 UTF8

-- 2. 检查客户端编码
SHOW client_encoding;
-- 应为 UTF8

-- 3. 如果数据库不是 UTF-8,需要重建数据库(创建时指定)
CREATE DATABASE vector_db 
    ENCODING 'UTF8' 
    LC_COLLATE 'en_US.UTF-8' 
    LC_CTYPE 'en_US.UTF-8'
    TEMPLATE template0;

-- 4. 如果客户端编码不匹配,在连接时指定
-- psql "host=localhost dbname=vector_db user=vector_user client_encoding=UTF8"

-- 5. 在 Python 中确保使用 UTF-8 连接
import psycopg
conn = psycopg.connect(
    "host=localhost dbname=vector_db user=vector_user",
    client_encoding="UTF8"
)

-- 6. 处理特殊字符
-- 确保 Embedding 模型返回的文本内容使用 UTF-8 编码
-- 在写入前清理文本中的非法字符
import re
def clean_text(text: str) -> str:
    # 移除非法 UTF-8 字符
    return text.encode('utf-8', errors='ignore').decode('utf-8')

错误 6:索引创建失败

ERROR:  column "embedding" does not have 64-bit aligned values

ERROR:  index row size XXXX exceeds maximum 2712 for index "xxx"

原因:向量维度过高导致索引行超出 PostgreSQL 页面大小限制(8KB)。

-- HNSW 索引对于高维向量的限制:
-- vector(n) 的 HNSW 索引:每个维度需要约 4 字节存储,加上图结构的指针开销
-- 通常 vector(2000) 以上的维度需要注意索引行大小

-- 解决方案 1:使用 halfvec 类型减少存储(2 字节/维)
ALTER TABLE docs ALTER COLUMN embedding TYPE halfvec(2000);

-- 解决方案 2:对于超高维向量,使用 IVFFlat 索引(对行大小限制更宽松)
CREATE INDEX ON docs USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

-- 解决方案 3:降维(如果 Embedding 模型支持)
-- OpenAI text-embedding-3-* 支持维度截断,可以直接请求 1024 维而非 3072 维

-- 解决方案 4:拆分向量列(不推荐,仅作为最后手段)
-- 将 3072 维拆分为两个 1536 维的列

错误排查清单

当你遇到问题时,按以下清单逐项检查:

-- 1. pgvector 扩展是否已启用?
SELECT extname, extversion FROM pg_extension WHERE extname = 'vector';

-- 2. 表的向量维度是多少?
SELECT attname, atttypid::regtype 
FROM pg_attribute 
WHERE attrelid = 'knowledge_base'::regclass 
  AND atttypid::regtype::text LIKE 'vector%';

-- 3. 查询向量维度是否匹配?
SELECT vector_dims('[0.1, 0.2, 0.3]'::vector);  -- 应为表的定义维度

-- 4. 是否有 NULL 向量?
SELECT COUNT(*) AS null_count 
FROM knowledge_base 
WHERE embedding IS NULL;

-- 5. 数据库中是否有不同维度的向量混存?
SELECT vector_dims(embedding), COUNT(*) 
FROM knowledge_base 
WHERE embedding IS NOT NULL 
GROUP BY vector_dims(embedding);
-- 如果输出多行,说明存在维度不一致的问题

-- 6. 索引状态检查
SELECT indexname, indexdef 
FROM pg_indexes 
WHERE tablename = 'knowledge_base';

-- 7. 表和索引大小
SELECT 
    pg_size_pretty(pg_total_relation_size('knowledge_base')) AS total_size,
    pg_size_pretty(pg_relation_size('knowledge_base')) AS table_size,
    pg_size_pretty(pg_indexes_size('knowledge_base')) AS index_size;

阶段总结:完成前两个阶段后,你已经具备了使用 pgvector 的全部基础知识——理解了向量数据库的定位与原理,完成了环境搭建与验证,掌握了四种向量数据类型的使用,能够熟练执行向量的增删改查和相似度检索。在接下来的阶段中,我们将深入索引优化、高级检索策略和生产级调优等进阶主题。


第三阶段:向量索引原理与索引实操

本阶段将从底层原理出发,系统讲解 pgvector 的向量索引机制。你将理解为什么向量检索需要专门的索引结构、HNSW 和 IVFFlat 两种索引是如何工作的,以及如何在实际项目中做出正确的索引选型和参数配置。


3.1 无索引检索痛点解析

3.1.1 暴力全表扫描的原理

当你不使用任何索引执行向量近邻查询时,PostgreSQL 只能采用暴力全表扫描(Brute Force / Exact Search)的方式:

对于查询向量 q,逐一计算它与表中每一个向量 v_i 的距离 d(q, v_i),
然后将所有距离排序,返回距离最小的 Top-K 个结果。

其时间复杂度为 O(n * d),其中 n 是表中向量的总数,d 是每个向量的维度。

这意味着:

  • 如果表中有 100 万条 768 维的向量记录,每次查询需要执行 100 万次 768 维的浮点运算。
  • 每增加一倍的向量数量,查询耗时近似翻倍。
  • 向量维度越高,每次距离计算的开销越大。

在 PostgreSQL 的执行计划中,这表现为一次完整的 Seq Scan

EXPLAIN ANALYZE
SELECT *, embedding <=> '[0.1, 0.2, ...]' AS distance
FROM items
ORDER BY distance
LIMIT 10;

-- 典型的执行计划输出:
-- Limit  (cost=... rows=10)
--   ->  Sort  (cost=...)
--         Sort Key: (embedding <=> '[...]'::vector)
--         ->  Seq Scan on items  (cost=...)

3.1.2 性能实测:无索引检索的耗时对比

以下是在一台标准云服务器(4核8G内存,SSD磁盘,768维向量)上的大致性能数据:

向量数量 无索引检索平均耗时 内存占用(仅向量数据) 用户体验
1 万 ~50ms ~30 MB 可接受
10 万 ~500ms ~300 MB 明显延迟
100 万 ~5,000ms (5秒) ~3 GB 不可接受
1000 万 ~50,000ms (50秒) ~30 GB 完全不可用

关键洞察:1万条以下,暴力检索尚可接受;一旦超过10万条,不建索引的查询延迟将严重影响用户体验。在生产环境中,几乎必须使用向量索引。

可以用以下 SQL 进行实际测试:

-- 生成测试数据
CREATE TABLE perf_test (
    id serial PRIMARY KEY,
    embedding vector(768)
);

-- 插入10万条随机向量
INSERT INTO perf_test (embedding)
SELECT random_vector(768)
FROM generate_series(1, 100000);

-- 辅助函数:生成随机向量
CREATE OR REPLACE FUNCTION random_vector(dim int)
RETURNS vector
LANGUAGE plpgsql AS $$
DECLARE
    arr float8[];
    i int;
BEGIN
    arr := ARRAY[]::float8[];
    FOR i IN 1..dim LOOP
        arr := arr || random();
    END LOOP;
    RETURN arr::vector;
END;
$$;

-- 无索引测试
EXPLAIN ANALYZE
SELECT id, embedding <=> (SELECT embedding FROM perf_test LIMIT 1) AS dist
FROM perf_test
ORDER BY dist
LIMIT 10;

3.1.3 为什么传统 B-tree 索引无法加速向量检索

传统数据库索引(B-tree、B+tree)的核心思想是利用数据的有序性来缩小搜索范围。对于标量数据(整数、字符串),数据天然具有全序关系,可以构建有效的平衡树结构。

但向量是高维空间中的点,存在以下本质困难:

  1. 不存在有意义的全序关系:向量是多维的,不存在"大于""小于"的统一定义。不同的排序方式(按第一维、按模长等)对近邻搜索没有帮助。

  2. 维度灾难(Curse of Dimensionality):随着维度增加,高维空间中任意两点之间的距离趋于均匀分布。这意味着在低维空间中有效的空间划分策略(如 KD-Tree、R-Tree),在高维空间中几乎完全失效——几乎所有数据点到查询点的距离都差不多。

  3. 剪枝失效:B-tree 之所以高效,是因为可以通过比较中间节点的值,排除掉大量不可能包含答案的数据。但在高维向量空间中,很难找到一个超平面或超球面将"近邻"和"远邻"干净地分开。

因此,专门为高维近似最近邻(ANN, Approximate Nearest Neighbor)搜索设计的索引结构应运而生——这正是 HNSW 和 IVFFlat 的设计动机。


3.2 pgvector 三大索引类型深度解析

pgvector 支持三种检索方式:HNSW 索引IVFFlat 索引无索引暴力检索。下面逐一深入讲解。

3.2.1 HNSW 索引(Hierarchical Navigable Small World)

一、核心设计思想

HNSW(分层可导航小世界图)是目前最先进、应用最广泛的 ANN 索引算法之一。它的核心思想是构建一个多层的图结构:

  • 底层(第 0 层):包含所有向量节点,每个节点与其最近的若干邻居相连,形成一张稠密的"小世界网络"。
  • 高层:只有少量节点被"提升"到更高的层,形成越来越稀疏的图结构,充当"高速公路"的角色。
  • 搜索过程类似互联网的路由:从顶层的稀疏图开始快速定位到大致区域,然后逐层向下到更稠密的图中精炼结果。

形象地理解:想象你在一个陌生城市找一家咖啡馆。你先看全国地图(顶层),定位到城市;再看城市地图(中间层),定位到街区;最后看街区地图(底层),找到具体位置。每一层都帮你大幅缩小搜索范围。

二、构建过程

当一个新的向量节点被插入时,HNSW 的构建过程如下:

  1. 确定该节点的最高层级 l:通过一个指数衰减的随机函数决定。大多数节点只存在于第 0 层,少数节点出现在第 1 层,极少数出现在第 2 层,以此类推。这保证了高层节点稀疏。

    l = floor(-ln(uniform(0,1)) * mL)
    其中 mL = 1 / ln(M),M 为参数 m
    
  2. 从顶层入口点开始贪心搜索:从当前图的最高层入口节点出发,在每一层中找到距离新节点最近的 ef_construction 个候选节点。

  3. 在每层建立连接:对于新节点存在的每一层(从 l 到第 0 层),从候选列表中选择最近的节点建立双向连接。如果连接数超过 m(或第 0 层的 2m),则裁剪掉最远的连接。

  4. 更新入口点:如果新节点的最高层级高于当前入口点的层级,则将入口点更新为新节点。

三、搜索过程

查询时的搜索流程:

输入:查询向量 q,参数 ef_search
1. 从最高层的入口节点开始
2. 对于每一层(从最高层到第1层):
   - 贪心搜索,找到该层中距离 q 最近的节点
3. 在第0层执行 beam search:
   - 维护一个大小为 ef_search 的候选优先队列
   - 维护一个大小为 K 的结果列表
   - 不断从候选队列中取出最近节点,探索其邻居
   - 如果邻居比当前结果列表中最远的更近,则加入结果
4. 返回结果列表中的 Top-K

四、核心参数详解

参数 m(默认值 16)

m 控制每个节点在每一层的最大双向连接数(第 0 层的最大连接数为 2 * m)。

  • m 越大,图的连通性越好,搜索精度越高,但内存占用和构建时间显著增加。
  • m 越小,索引越紧凑,但可能丢失重要的近邻连接,导致召回率下降。
m 值 适用场景 内存影响 召回率影响
8 内存受限、向量数量少 略有下降
16 通用默认值,平衡性能与质量 中等 良好
32 高召回率要求 显著提升
48 极端精度要求 很高 边际提升

参数 ef_construction(默认值 64)

ef_construction 控制构建索引时候选列表的大小。

  • 值越大,构建出的图质量越高(邻居选择更精确),但构建速度越慢。
  • 值越小,构建越快,但图的质量可能下降。
ef_construction 适用场景 构建速度 图质量
40 快速原型、大数据量快速构建 一般
64 通用默认值 中等 良好
128-256 生产环境高质量索引 优秀
512+ 极端质量要求(通常不必要) 非常慢 边际提升

五、运行时搜索参数

-- 调整 HNSW 搜索时的候选集大小(会话级别)
SET hnsw.ef_search = 100;  -- 默认值为 40

ef_search 越大,搜索结果越精确,但查询速度越慢。建议根据业务需求在 40-200 之间调整。

六、适用场景总结

  • 读多写少的 RAG(检索增强生成)场景:这是 90% 用户的默认选择。
  • 中小规模数据:百万级以内的向量数据。
  • 对检索延迟要求高:需要毫秒级响应的在线服务。
  • 对召回率要求高:HNSW 在合理参数下可轻松达到 95%+ 的召回率。

七、优势与劣势

优势 劣势
检索速度极快(毫秒级) 索引构建慢(需维护图结构)
召回率高(轻松 >95%) 内存占用较大(需存储邻接关系)
支持增量插入(新向量可即时检索) 大规模数据(千万级以上)内存开销显著
无需预先训练/聚类 删除节点后图结构可能退化

八、完整创建 SQL

-- 使用余弦距离创建 HNSW 索引(最常用)
CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- 使用 L2(欧氏)距离创建 HNSW 索引
CREATE INDEX ON items USING hnsw (embedding vector_l2_ops)
WITH (m = 16, ef_construction = 64);

-- 使用内积距离创建 HNSW 索引
CREATE INDEX ON items USING hnsw (embedding vector_ip_ops)
WITH (m = 16, ef_construction = 64);

-- 高质量索引配置(适合生产环境)
CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops)
WITH (m = 32, ef_construction = 128);

-- 索引创建后查询时自动生效
SELECT *, embedding <=> '[0.1, 0.2, ...]' AS distance
FROM items
ORDER BY distance
LIMIT 10;

3.2.2 IVFFlat 索引(Inverted File with Flat compression)

一、核心设计思想

IVFFlat(倒排文件索引 + 扁平压缩)的核心思想源自信息检索中的倒排索引。它通过 K-Means 聚类 将整个向量空间划分为多个区域(Voronoi 单元),每个区域有一个聚类中心。查询时只需在最近的几个区域内搜索,从而大幅减少距离计算次数。

形象理解:把向量空间想象成一个国家,K-Means 聚类就是把国家划分为多个省份。查询时先看你的查询向量落在哪个省附近,然后只在那几个省里搜索,不用跑遍全国。

二、构建过程

IVFFlat 索引的构建分为以下步骤:

  1. K-Means 聚类:对所有已有向量执行 K-Means 聚类,生成 lists 个聚类中心(质心)。每个聚类中心代表一个 Voronoi 区域的"中心点"。

  2. 分配向量到聚类:将每个向量分配到距离它最近的聚类中心所在的列表(list/inverted list)。

  3. 存储结构:每个聚类中心维护一个列表,包含所有属于该聚类的原始向量。这就是"Flat"的含义——列表内的向量保持原始形式,没有进一步压缩。

重要提示:IVFFlat 索引的质量严重依赖于构建时的数据。如果表为空或数据量太少,K-Means 聚类无法产生有代表性的聚类中心,索引质量会很差。因此,必须在有足够数据之后再创建 IVFFlat 索引

三、搜索过程

输入:查询向量 q,参数 nprobe
1. 计算 q 到所有聚类中心的距离
2. 选出距离最近的 nprobe 个聚类中心
3. 在这 nprobe 个聚类对应的倒排列表中,逐一计算 q 与每个向量的距离
4. 合并所有结果,返回 Top-K

nprobe 是控制搜索精度与速度权衡的关键旋钮:

  • nprobe = 1:只搜索最近的 1 个聚类,速度最快,但可能遗漏分布在邻近聚类中的近邻。
  • nprobe = lists:搜索所有聚类,等价于暴力搜索,100% 精确但最慢。

四、核心参数详解

参数 lists(聚类中心数)

lists 决定了向量空间被划分成多少个区域。选择 lists 的一般原则:

向量规模 推荐 lists 值 计算公式
1 万 - 10 万 10 - 100 rows / 1000
10 万 - 100 万 100 - 1000 rows / 1000
100 万 - 1000 万 1000 - 3162 sqrt(rows)
1000 万以上 3162 - 10000 sqrt(rows)

经验公式:

  • 百万级以下lists = rows / 1000(保证每个聚类约 1000 个向量)
  • 百万级以上lists = sqrt(rows)(平方根法则,平衡聚类大小与数量)

lists 过小:每个聚类内向量太多,搜索退化为接近暴力扫描。
lists 过大:每个聚类内向量太少,需要探测更多聚类,开销增加。

参数 probes / nprobe

-- 设置查询时探测的聚类数(会话级别)
SET ivfflat.probes = 10;  -- 默认值为 1
  • 推荐起始值:probes = sqrt(lists)
  • 对于 lists = 100,推荐 probes = 10
  • 对于 lists = 1000,推荐 probes = 30-50
probes 值 召回率 查询速度
1 低(~60-70%) 极快
5-10 中等(~85-92%)
10-50 高(~95%+) 中等
= lists 100%(等价暴力搜索)

五、适用场景总结

  • 写多读少的场景:索引构建速度快,适合频繁重建。
  • 百万级以上向量:大数据量下内存效率优于 HNSW。
  • 对内存敏感:IVFFlat 不需要存储图结构,内存占用显著低于 HNSW。
  • 可以容忍预热期:需要先积累足够数据再建索引。

六、优势与劣势

优势 劣势
构建速度远快于 HNSW 需要有足够数据才能构建高质量索引
内存占用低(不存图结构) 不支持真正的增量优化(新数据可能不属于已有聚类)
大数据量下性价比好 召回率调优需要仔细调整 probes
适合批量写入场景 空表建索引质量极差

七、完整创建 SQL

-- 使用余弦距离创建 IVFFlat 索引
-- 注意:必须确保表中已有足够数据
CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- 使用 L2 距离创建 IVFFlat 索引
CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops)
WITH (lists = 100);

-- 百万级数据的推荐配置
CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 1000);

-- 设置查询参数
SET ivfflat.probes = 10;

-- 查询
SELECT *, embedding <=> '[0.1, 0.2, ...]' AS distance
FROM items
ORDER BY distance
LIMIT 10;

3.2.3 Brute Force(无索引暴力检索)

暴力检索不需要任何额外配置,就是直接对全表执行距离计算并排序。

何时适用

  • 数据量极小(万级以内):此时暴力检索的耗时在几十毫秒内,建索引的维护成本反而不划算。
  • 要求 100% 召回率:暴力检索是精确搜索,不会遗漏任何近邻。在评估 ANN 索引质量时,暴力检索的结果是"金标准"(ground truth)。
  • 临时分析/调试:开发阶段用于验证索引结果的正确性。
-- 无索引查询(暴力检索),无需任何额外配置
SELECT *, embedding <=> '[0.1, 0.2, ...]' AS distance
FROM items
ORDER BY distance
LIMIT 10;

3.3 索引精细化配置与创建 SQL

3.3.1 HNSW 参数调优详解

m 值选择指南

-- 场景一:通用平衡配置(推荐大多数场景)
CREATE INDEX idx_embedding_hnsw ON items
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- 场景二:高召回率配置(推荐对精度要求高的场景)
-- 更高的 m 值带来更好的图连通性,但内存占用增加约一倍
CREATE INDEX idx_embedding_hnsw ON items
USING hnsw (embedding vector_cosine_ops)
WITH (m = 32, ef_construction = 128);

-- 场景三:低内存配置(资源受限时使用)
-- m=8 可显著降低内存,但在高维数据上可能损失 2-5% 的召回率
CREATE INDEX idx_embedding_hnsw ON items
USING hnsw (embedding vector_cosine_ops)
WITH (m = 8, ef_construction = 40);

ef_construction 选择指南

值范围 构建速度 索引质量 适用场景
32-40 一般 快速原型验证
64(默认) 中等 良好 通用生产环境
128-256 优秀 高质量生产环境
400+ 极慢 边际提升 极端精度要求

运行时 ef_search 调整

-- 在会话级别调整搜索精度(不影响索引构建)
-- 值越大,结果越精确,但查询越慢

-- 低延迟优先(在线服务,允许轻微精度损失)
SET hnsw.ef_search = 40;   -- 默认值

-- 平衡模式(推荐大多数场景)
SET hnsw.ef_search = 80;

-- 高精度优先(离线分析、对召回率要求高的场景)
SET hnsw.ef_search = 200;

-- 也可以在单条查询中通过子查询设置
SET LOCAL hnsw.ef_search = 150;  -- 仅在当前事务中生效

不同参数组合的性能对比(基于 10 万条 768 维向量的参考数据):

配置 索引构建时间 索引大小 查询延迟 召回率
m=8, ef_c=40 ~30秒 ~180 MB ~2ms ~90%
m=16, ef_c=64(默认) ~90秒 ~350 MB ~3ms ~95%
m=32, ef_c=128 ~300秒 ~680 MB ~5ms ~98%
m=32, ef_c=256 ~600秒 ~680 MB ~5ms ~99%

3.3.2 IVFFlat 参数调优详解

lists 数量选择

-- 首先确认表中向量数量
SELECT COUNT(*) FROM items;

-- 假设结果为 500,000 行
-- 推荐 lists = 500000 / 1000 = 500
CREATE INDEX idx_embedding_ivfflat ON items
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 500);

-- 假设结果为 5,000,000 行(百万级以上,用平方根法则)
-- 推荐 lists = sqrt(5000000) ≈ 2236
CREATE INDEX idx_embedding_ivfflat ON items
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 2236);

probes 调整

-- 设置会话级别的 probes 参数
-- 推荐起始值:probes = sqrt(lists)

-- lists=100 时
SET ivfflat.probes = 10;

-- lists=500 时
SET ivfflat.probes = 22;

-- lists=2236 时
SET ivfflat.probes = 47;

-- 逐步调优:从 sqrt(lists) 开始,增加直到召回率满足要求
SET ivfflat.probes = 50;  -- 提高召回率
SET ivfflat.probes = 5;   -- 降低延迟

训练数据集选择——为什么不能用空表建 IVFFlat 索引

IVFFlat 的核心是 K-Means 聚类,而 K-Means 需要从实际数据中学习聚类中心。如果表中没有数据或数据量太少:

  • 空表:K-Means 无法执行,索引创建会失败或产生无意义的聚类中心。
  • 数据太少(比如只有几百条但 lists = 1000):每个聚类只有不到 1 个向量,聚类中心完全由随机初始化决定,毫无代表性。

最佳实践

-- 方案一:先导入数据,再创建索引(推荐)
-- 步骤1:先批量导入所有数据
INSERT INTO items (embedding) VALUES (...), (...), ...;
-- 步骤2:数据导入完成后创建索引
CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

-- 方案二:如果数据是持续写入的,分两步走
-- 步骤1:数据量达到预期 50% 以上时先创建初始索引
CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
-- 步骤2:数据量翻倍后重建索引
REINDEX INDEX CONCURRENTLY idx_items_ivfflat;

3.3.3 索引维护操作

索引重建

-- 普通重建(会锁表,阻塞写入)
REINDEX INDEX idx_embedding_hnsw;

-- 并发重建(不阻塞读写,推荐生产环境使用)
-- PostgreSQL 12+ 支持
REINDEX INDEX CONCURRENTLY idx_embedding_hnsw;

-- 重建整个表的所有索引
REINDEX TABLE items;

-- 重建整个数据库的索引(谨慎使用)
REINDEX DATABASE mydb;

何时需要重建索引

  • 大量数据被删除或更新后(尤其是 IVFFlat,因为聚类可能过时)。
  • 索引膨胀严重时。
  • 调整索引参数后(需要 DROP 后重建,或使用 REINDEX)。

索引删除

-- 普通删除
DROP INDEX idx_embedding_hnsw;

-- 如果存在则删除(避免报错)
DROP INDEX IF EXISTS idx_embedding_hnsw;

-- 并发删除(PostgreSQL 不提供 DROP INDEX CONCURRENTLY,
-- 但 DROP INDEX 本身持有排他锁的时间很短,通常不是问题)

索引膨胀检测

-- 安装 pgstattuple 扩展(需要超级用户权限)
CREATE EXTENSION IF NOT EXISTS pgstattuple;

-- 检测索引的内部统计信息
SELECT * FROM pgstatindex('idx_embedding_hnsw');

-- 关键字段:
-- avg_leaf_density: 叶页面利用率(低于 70% 可能需要重建)
-- internal_pages / leaf_pages: 内部页和叶页数量
-- empty_pages: 空页面数量(过多表示膨胀)

-- 对于 HNSW 索引,使用 pgstatindex 查看:
SELECT * FROM pgstatindex('idx_embedding_hnsw');

-- 简单的索引大小监控
SELECT
    indexname,
    pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
WHERE relname = 'items';

并行创建索引

-- 设置并行工作进程数(默认通常为 2)
SET max_parallel_maintenance_workers = 4;

-- 确保有足够的共享内存
SET maintenance_work_mem = '2GB';  -- 根据可用内存调整

-- 然后创建索引(会自动使用并行)
CREATE INDEX idx_embedding_hnsw ON items
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- 检查并行是否生效
SET client_min_messages = DEBUG1;
CREATE INDEX idx_embedding_hnsw ON items
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- 输出中会显示 "using N parallel workers"

提示:HNSW 索引的构建是 CPU 密集型的,增加 max_parallel_maintenance_workersmaintenance_work_mem 可以显著加快构建速度。IVFFlat 的构建主要耗时在 K-Means 聚类上,增加内存和并行也有帮助。

3.3.4 操作符类(Operator Classes)详解

pgvector 为不同的向量类型和距离度量提供了丰富的操作符类。选择正确的操作符类至关重要——它决定了索引使用哪种距离度量进行计算

完整操作符类列表

向量类型 距离度量 操作符类 运算符
vector L2(欧氏)距离 vector_l2_ops <->
vector 内积 vector_ip_ops <#>
vector 余弦距离 vector_cosine_ops <=>
halfvec L2 距离 halfvec_l2_ops <->
halfvec 内积 halfvec_ip_ops <#>
halfvec 余弦距离 halfvec_cosine_ops <=>
sparsevec L2 距离 sparsevec_l2_ops <->
sparsevec 内积 sparsevec_ip_ops <#>
sparsevec 余弦距离 sparsevec_cosine_ops <=>
bit 汉明距离 bit_hamming_ops <~>
bit 杰卡德距离 bit_jaccard_ops <%>

选择原则

-- 余弦距离(最常用,适合归一化向量,衡量方向相似性)
CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);
-- 对应查询:embedding <=> query_vector

-- L2 欧氏距离(适合非归一化向量,同时考虑方向和大小)
CREATE INDEX ON items USING hnsw (embedding vector_l2_ops);
-- 对应查询:embedding <-> query_vector

-- 负内积(适合已经归一化的向量,等价于余弦相似度)
CREATE INDEX ON items USING hnsw (embedding vector_ip_ops);
-- 对应查询:embedding <#> query_vector

距离运算符与操作符类的匹配

这是一个常见的错误来源。创建索引时使用的操作符类必须与查询时使用的运算符匹配。例如:

-- 正确:用 vector_cosine_ops 建索引,用 <=> 查询
CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);
SELECT * FROM items ORDER BY embedding <=> '[...]' LIMIT 5;  -- ✅ 走索引

-- 错误:用 vector_cosine_ops 建索引,用 <-> 查询
CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);
SELECT * FROM items ORDER BY embedding <-> '[...]' LIMIT 5;  -- ❌ 不走索引,退化为全表扫描

使用 EXPLAIN 验证索引是否被使用:

EXPLAIN ANALYZE
SELECT * FROM items ORDER BY embedding <=> '[...]' LIMIT 5;

-- 如果走索引,会看到 "Index Scan using idx_xxx"
-- 如果没走索引,会看到 "Seq Scan" + "Sort"

半精度向量(halfvec)的操作符类

-- halfvec 使用 16 位浮点数,内存占用减半
-- 适合对精度要求不是极高但需要节省内存的场景
CREATE TABLE items_half (
    id serial PRIMARY KEY,
    embedding halfvec(768)
);

CREATE INDEX ON items_half USING hnsw (embedding halfvec_cosine_ops)
WITH (m = 16, ef_construction = 64);

SELECT * FROM items_half ORDER BY embedding <=> '[0.1, 0.2, ...]'::halfvec LIMIT 5;

稀疏向量(sparsevec)的操作符类

-- sparsevec 适合高维稀疏向量(如 BM25 特征)
CREATE TABLE items_sparse (
    id serial PRIMARY KEY,
    embedding sparsevec(30000)
);

CREATE INDEX ON items_sparse USING hnsw (embedding sparsevec_l2_ops);

二进制向量(bit)的操作符类

-- bit 类型用于二进制嵌入,极度压缩存储空间
CREATE TABLE items_bit (
    id serial PRIMARY KEY,
    embedding bit(768)
);

-- 汉明距离索引
CREATE INDEX ON items_bit USING hnsw (embedding bit_hamming_ops);
SELECT * FROM items_bit ORDER BY embedding <~> B'101...' LIMIT 5;

-- 杰卡德距离索引
CREATE INDEX ON items_bit USING hnsw (embedding bit_jaccard_ops);
SELECT * FROM items_bit ORDER BY embedding <%> B'101...' LIMIT 5;

3.4 索引选型决策表

3.4.1 按向量数量选型

向量数量 推荐方案 理由
< 1 万 Brute Force(无索引) 数据量太小,索引维护成本不划算,暴力扫描在 50ms 以内
1 万 - 10 万 HNSW(m=16, ef_c=64) 默认配置即可,索引构建仅需几秒到几十秒
10 万 - 100 万 HNSW(m=16-32, ef_c=64-128) HNSW 的最佳甜区,性能与资源平衡最优
100 万 - 1000 万 HNSW(m=16, ef_c=128)或 IVFFlat(lists=1000-3162) 如果内存充足选 HNSW,内存敏感选 IVFFlat
> 1000 万 IVFFlat(lists=sqrt(n))或 分布式方案 HNSW 内存开销过大;考虑分片或专用向量数据库

3.4.2 按读写比例选型

读写比例 推荐方案 理由
读多写少(读:写 > 10:1) HNSW HNSW 支持增量插入,查询极快,读性能最优
读写均衡(读:写 ≈ 1:1) HNSW HNSW 仍适用;频繁更新时注意定期 REINDEX
写多读少(写:读 > 10:1) IVFFlat IVFFlat 构建更快,批量写入后重建索引成本低

3.4.3 按召回率要求选型

召回率要求 推荐方案 参数建议
> 99%(极高) HNSW + 高参数 m=32, ef_c=256, ef_search=200
> 95%(高) HNSW 默认 m=16, ef_c=64, ef_search=80
> 90%(中等) IVFFlat + 高 probes lists=sqrt(n), probes=30-50
> 80%(可接受) IVFFlat 默认 lists=n/1000, probes=10
100%(精确) Brute Force 无索引,精确搜索

3.4.4 按延迟要求选型

延迟要求 推荐方案 说明
< 10ms HNSW HNSW 的图遍历天然快速,毫秒级响应
< 50ms HNSW 或 IVFFlat(高 probes) 两者都可满足,IVFFlat 需适当提高 probes
< 100ms IVFFlat(低 probes) 低 probes 值可保证速度,但牺牲召回率
> 100ms(可接受) 任意方案 包括 Brute Force(小数据量时)

3.4.5 综合选型决策矩阵

                    ┌──────────────────────────────────────┐
                    │          向量数据量是多少?            │
                    └──────────────┬───────────────────────┘
                                   │
              ┌────────────────────┼────────────────────┐
              ▼                    ▼                     ▼
         < 1 万              1万 - 100万            > 100万
              │                    │                     │
              ▼                    ▼                     ▼
      Brute Force          内存充足吗?            内存充足吗?
      (无索引)            │          │            │          │
                       Yes ▼     No ▼         Yes ▼     No ▼
                       HNSW      HNSW         HNSW    IVFFlat
                    m=16,ef=64  m=8,ef=40   m=16,ef=128  lists=sqrt(n)

最终建议:对于大多数 RAG / 语义搜索场景,HNSW 是默认的最佳选择(m=16, ef_construction=64),它能覆盖 80% 以上的项目需求。只有在向量规模超过千万级、或内存极度受限时,才需要考虑 IVFFlat 或更复杂的方案。


第四阶段:Python 工程化实操与 RAG 端到端案例

本阶段将带你从原生 Python 驱动到主流框架集成,最终完成一个完整的 RAG(检索增强生成)系统。所有代码均为完整可运行的示例。


4.1 原生 Python 对接 pgvector

4.1.1 psycopg 驱动对接(推荐 psycopg3)

psycopg(即 psycopg3,psycopg2 的继任者)是 Python 对接 PostgreSQL 的首选驱动。pgvector 官方提供了对 psycopg 的原生支持。

安装依赖

pip install psycopg[binary] pgvector numpy tenacity

注册向量类型与基本操作

"""
pgvector + psycopg3 基础对接示例
演示:连接数据库、注册向量类型、插入与查询
"""
import psycopg
from pgvector.psycopg import register_vector
import numpy as np


def get_connection():
    """建立数据库连接并注册向量类型"""
    conn = psycopg.connect(
        host="localhost",
        port=5432,
        dbname="vector_db",
        user="postgres",
        password="your_password",
        autocommit=True
    )
    # 注册向量类型,使 psycopg 能自动转换 Python 列表/numpy 数组 <-> pgvector
    register_vector(conn)
    return conn


def create_table(conn):
    """创建向量表"""
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE IF NOT EXISTS documents (
                id serial PRIMARY KEY,
                content text NOT NULL,
                embedding vector(768),
                metadata jsonb DEFAULT '{}'::jsonb,
                created_at timestamptz DEFAULT now()
            )
        """)


def insert_document(conn, content: str, embedding: np.ndarray, metadata: dict = None):
    """插入单条文档"""
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO documents (content, embedding, metadata)
            VALUES (%s, %s, %s)
            RETURNING id
            """,
            (content, embedding, metadata or {})
        )
        return cur.fetchone()[0]


def search_similar(conn, query_embedding: np.ndarray, top_k: int = 5):
    """余弦相似度搜索"""
    with conn.cursor() as cur:
        cur.execute(
            """
            SELECT id, content, embedding <=> %s AS distance
            FROM documents
            ORDER BY distance
            LIMIT %s
            """,
            (query_embedding, top_k)
        )
        results = []
        for row in cur.fetchall():
            results.append({
                "id": row[0],
                "content": row[1],
                "distance": float(row[2])
            })
        return results


# 主流程
if __name__ == "__main__":
    conn = get_connection()
    create_table(conn)

    # 创建 HNSW 索引
    with conn.cursor() as cur:
        cur.execute("""
            CREATE INDEX IF NOT EXISTS idx_documents_embedding
            ON documents USING hnsw (embedding vector_cosine_ops)
            WITH (m = 16, ef_construction = 64)
        """)

    # 插入测试数据
    sample_embedding = np.random.rand(768).astype(np.float32)
    doc_id = insert_document(conn, "这是一条测试文档", sample_embedding)
    print(f"插入文档 ID: {doc_id}")

    # 查询
    query_vec = np.random.rand(768).astype(np.float32)
    results = search_similar(conn, query_vec, top_k=5)
    for r in results:
        print(f"ID: {r['id']}, 距离: {r['distance']:.4f}, 内容: {r['content'][:50]}")

    conn.close()

向量批量写入

"""
高效批量写入向量数据
方式一:executemany(简单可靠)
方式二:COPY(极致性能,适合百万级数据)
"""
import psycopg
from pgvector.psycopg import register_vector
import numpy as np
from io import BytesIO


def batch_insert_executemany(conn, documents: list[dict]):
    """
    使用 executemany 批量写入
    documents: [{"content": "...", "embedding": np.array, "metadata": {}}, ...]
    """
    with conn.cursor() as cur:
        cur.executemany(
            """
            INSERT INTO documents (content, embedding, metadata)
            VALUES (%s, %s, %s)
            """,
            [
                (doc["content"], doc["embedding"], doc.get("metadata", {}))
                for doc in documents
            ]
        )
    print(f"通过 executemany 插入了 {len(documents)} 条记录")


def batch_insert_copy(conn, documents: list[dict]):
    """
    使用 COPY 协议批量写入(性能最优)
    适合百万级数据导入,速度比 executemany 快 5-10 倍
    """
    with conn.cursor() as cur:
        # 使用 copy 接口
        with cur.copy(
            "COPY documents (content, embedding, metadata) FROM STDIN"
        ) as copy:
            for doc in documents:
                # 将 embedding 转换为 pgvector 的文本格式
                emb_str = "[" + ",".join(str(x) for x in doc["embedding"]) + "]"
                meta_str = str(doc.get("metadata", {})).replace("'", '"')
                row = f"{doc['content']}\t{emb_str}\t{meta_str}\n"
                copy.write(row.encode("utf-8"))
    print(f"通过 COPY 插入了 {len(documents)} 条记录")


# 使用示例
if __name__ == "__main__":
    conn = psycopg.connect(
        "postgresql://postgres:your_password@localhost:5432/vector_db",
        autocommit=True
    )
    register_vector(conn)

    # 准备批量数据
    batch_data = []
    for i in range(10000):
        batch_data.append({
            "content": f"这是第 {i} 条测试文档的内容",
            "embedding": np.random.rand(768).astype(np.float32),
            "metadata": {"source": "test", "index": i}
        })

    # 方式一:executemany(推荐大多数场景)
    batch_insert_executemany(conn, batch_data[:1000])

    # 方式二:COPY(大数据量场景)
    batch_insert_copy(conn, batch_data[1000:2000])

    conn.close()

分页检索封装

"""
向量检索分页封装
支持:基于偏移量的分页 + 基于游标的分页(推荐)
"""
import psycopg
from pgvector.psycopg import register_vector
import numpy as np
from dataclasses import dataclass


@dataclass
class SearchResult:
    id: int
    content: str
    distance: float
    metadata: dict


@dataclass
class PaginatedResults:
    results: list[SearchResult]
    total_estimated: int
    has_next: bool


def search_paginated(
    conn,
    query_embedding: np.ndarray,
    top_k: int = 10,
    offset: int = 0,
    max_distance: float = None
) -> PaginatedResults:
    """
    基于偏移量的分页向量检索

    参数:
        conn: 数据库连接
        query_embedding: 查询向量
        top_k: 每页返回数量
        offset: 偏移量
        max_distance: 最大距离阈值(可选,过滤掉不相关的结果)
    """
    with conn.cursor() as cur:
        # 构建查询
        query = """
            SELECT id, content, embedding <=> %s AS distance, metadata
            FROM documents
        """
        params = [query_embedding]

        if max_distance is not None:
            query += " WHERE embedding <=> %s <= %s "
            params.extend([query_embedding, max_distance])

        query += " ORDER BY distance LIMIT %s OFFSET %s"
        params.extend([top_k + 1, offset])  # 多取一条判断是否有下一页

        cur.execute(query, params)
        rows = cur.fetchall()

        has_next = len(rows) > top_k
        results = []
        for row in rows[:top_k]:
            results.append(SearchResult(
                id=row[0],
                content=row[1],
                distance=float(row[2]),
                metadata=row[3] if row[3] else {}
            ))

        return PaginatedResults(
            results=results,
            total_estimated=offset + len(results),
            has_next=has_next
        )


def search_cursor_paginated(
    conn,
    query_embedding: np.ndarray,
    top_k: int = 10,
    last_distance: float = None,
    last_id: int = None
) -> PaginatedResults:
    """
    基于游标的分页(推荐,性能更好,结果更稳定)
    使用上一页最后一条记录的距离和 ID 作为游标
    """
    with conn.cursor() as cur:
        if last_distance is not None and last_id is not None:
            cur.execute(
                """
                SELECT id, content, embedding <=> %s AS distance, metadata
                FROM documents
                WHERE (embedding <=> %s, id) > (%s, %s)
                ORDER BY distance, id
                LIMIT %s
                """,
                (query_embedding, query_embedding, last_distance, last_id, top_k + 1)
            )
        else:
            cur.execute(
                """
                SELECT id, content, embedding <=> %s AS distance, metadata
                FROM documents
                ORDER BY distance, id
                LIMIT %s
                """,
                (query_embedding, top_k + 1)
            )

        rows = cur.fetchall()
        has_next = len(rows) > top_k
        results = []
        for row in rows[:top_k]:
            results.append(SearchResult(
                id=row[0],
                content=row[1],
                distance=float(row[2]),
                metadata=row[3] if row[3] else {}
            ))

        return PaginatedResults(
            results=results,
            total_estimated=0,  # 游标分页不便计算总数
            has_next=has_next
        )

异常重试机制

"""
使用 tenacity 库实现数据库操作的重试机制
处理:连接超时、死锁、临时网络故障等瞬时错误
"""
import psycopg
from pgvector.psycopg import register_vector
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log
)
import logging
import numpy as np

logger = logging.getLogger(__name__)


# 定义可重试的异常类型
RETRYABLE_EXCEPTIONS = (
    psycopg.OperationalError,   # 连接失败、超时
    psycopg.errors.DeadlockDetected,  # 死锁
    psycopg.errors.SerializationFailure,  # 序列化失败
    ConnectionError,
    TimeoutError,
)


@retry(
    stop=stop_after_attempt(5),           # 最多重试 5 次
    wait=wait_exponential(                # 指数退避:1s, 2s, 4s, 8s
        multiplier=1,
        min=1,
        max=30
    ),
    retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True
)
def robust_search(
    conn_str: str,
    query_embedding: np.ndarray,
    top_k: int = 5
) -> list[dict]:
    """
    带重试机制的向量检索

    使用独立连接,确保重试时能重新建立连接
    """
    conn = psycopg.connect(conn_str, autocommit=True)
    try:
        register_vector(conn)
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT id, content, embedding <=> %s AS distance
                FROM documents
                ORDER BY distance
                LIMIT %s
                """,
                (query_embedding, top_k)
            )
            return [
                {"id": r[0], "content": r[1], "distance": float(r[2])}
                for r in cur.fetchall()
            ]
    finally:
        conn.close()


@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=20),
    retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
    reraise=True
)
def robust_insert(
    conn_str: str,
    content: str,
    embedding: np.ndarray,
    metadata: dict = None
) -> int:
    """带重试机制的文档插入"""
    conn = psycopg.connect(conn_str, autocommit=True)
    try:
        register_vector(conn)
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO documents (content, embedding, metadata)
                VALUES (%s, %s, %s) RETURNING id
                """,
                (content, embedding, metadata or {})
            )
            return cur.fetchone()[0]
    finally:
        conn.close()


# 使用示例
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    conn_str = "postgresql://postgres:your_password@localhost:5432/vector_db"

    query = np.random.rand(768).astype(np.float32)
    results = robust_search(conn_str, query, top_k=5)
    print(f"检索到 {len(results)} 条结果")

4.1.2 连接池配置

生产级连接池配置

"""
psycopg 连接池配置
适用于高并发 Web 应用场景
"""
import psycopg
from psycopg_pool import ConnectionPool
from pgvector.psycopg import register_vector
import numpy as np


def create_production_pool(
    conninfo: str,
    min_size: int = 5,
    max_size: int = 20,
    **kwargs
) -> ConnectionPool:
    """
    创建生产级连接池

    参数:
        conninfo: 连接字符串,如 "postgresql://user:pass@host:port/db"
        min_size: 最小连接数(始终保持的空闲连接)
        max_size: 最大连接数(超过后请求会等待)
    """
    pool = ConnectionPool(
        conninfo=conninfo,
        min_size=min_size,
        max_size=max_size,
        # 连接超时(等待可用连接的最大时间,秒)
        timeout=30,
        # 每个连接的最大存活时间(秒),防止长时间持有旧连接
        max_lifetime=3600,
        # 空闲连接的最大存活时间(秒)
        max_idle=300,
        # 连接创建后的初始化回调
        open=True,
        configure=lambda conn: _configure_connection(conn),
        **kwargs
    )
    return pool


def _configure_connection(conn):
    """连接初始化配置"""
    register_vector(conn)
    with conn.cursor() as cur:
        # 设置 HNSW 搜索参数
        cur.execute("SET hnsw.ef_search = 80")
        # 设置 IVFFlat 探测参数
        cur.execute("SET ivfflat.probes = 10")
        # 设置语句超时(防止慢查询拖垮系统)
        cur.execute("SET statement_timeout = '5s'")
    conn.commit()


# 全局连接池(在应用启动时创建)
_pool: ConnectionPool = None


def get_pool() -> ConnectionPool:
    """获取全局连接池(懒加载单例)"""
    global _pool
    if _pool is None:
        _pool = create_production_pool(
            conninfo="postgresql://postgres:your_password@localhost:5432/vector_db",
            min_size=5,    # 启动时创建 5 个连接
            max_size=20,   # 最多 20 个连接
        )
    return _pool


def search_with_pool(query_embedding: np.ndarray, top_k: int = 5) -> list[dict]:
    """使用连接池执行检索"""
    pool = get_pool()
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT id, content, embedding <=> %s AS distance
                FROM documents
                ORDER BY distance
                LIMIT %s
                """,
                (query_embedding, top_k)
            )
            return [
                {"id": r[0], "content": r[1], "distance": float(r[2])}
                for r in cur.fetchall()
            ]


def close_pool():
    """关闭连接池(应用关闭时调用)"""
    global _pool
    if _pool is not None:
        _pool.close()
        _pool = None


# ---- 高并发场景的连接池参数建议 ----
#
# 小型应用(< 50 并发用户):
#   min_size=3, max_size=10
#
# 中型应用(50-200 并发用户):
#   min_size=5, max_size=20
#
# 大型应用(200-500 并发用户):
#   min_size=10, max_size=50
#   注意:PostgreSQL 默认最大连接数为 100,
#   需要相应调整 postgresql.conf 中的 max_connections
#
# 超大型应用(> 500 并发用户):
#   使用 PgBouncer 作为外部连接池,
#   应用连接到 PgBouncer,PgBouncer 再连接到 PostgreSQL
#   min_size=20, max_size=100(PgBouncer 层面)


if __name__ == "__main__":
    # 测试连接池
    results = search_with_pool(np.random.rand(768).astype(np.float32), top_k=3)
    print(f"检索结果: {results}")
    close_pool()

4.2 主流框架集成 pgvector

4.2.1 LangChain 集成 PGVector

LangChain 提供了 PGVector 向量存储类,封装了与 pgvector 交互的所有底层细节。

安装依赖

pip install langchain-postgresql psycopg[binary] langchain-core langchain-text-splitters

注意:LangChain 的 pgvector 集成已经从 langchain-community 迁移到 langchain-postgresql 包。请使用 langchain-postgresql 而非旧版的 langchain-community

完整代码示例

"""
LangChain + PGVector 完整集成示例
演示:初始化、文档入库、相似度搜索、MMR 检索
"""
from langchain_postgresql import PGVector
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
import numpy as np


# ---- 自定义 Embedding 模型(适配本地模型) ----
class LocalEmbeddings(Embeddings):
    """
    适配本地 sentence-transformers 模型的 Embeddings 实现
    如果使用 OpenAI 等 API,可直接使用 langchain-openai 的 OpenAIEmbeddings
    """

    def __init__(self, model_name: str = "BAAI/bge-m3"):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer(model_name)

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        """批量生成文档向量"""
        embeddings = self.model.encode(
            texts,
            normalize_embeddings=True,  # 归一化,使余弦距离等价于内积
            batch_size=64,
            show_progress_bar=True
        )
        return embeddings.tolist()

    def embed_query(self, text: str) -> list[float]:
        """生成查询向量"""
        embedding = self.model.encode(
            text,
            normalize_embeddings=True
        )
        return embedding.tolist()


def create_vector_store(
    connection_string: str,
    collection_name: str = "my_documents",
    embeddings: Embeddings = None
) -> PGVector:
    """
    初始化 PGVector 向量存储

    参数:
        connection_string: PostgreSQL 连接字符串
        collection_name: 集合名称(类似表名,LangChain 会自动创建)
        embeddings: Embedding 模型实例
    """
    if embeddings is None:
        embeddings = LocalEmbeddings()

    vector_store = PGVector(
        embeddings=embeddings,
        collection_name=collection_name,
        connection=connection_string,
        use_jsonb=True,  # 使用 JSONB 存储元数据
    )
    return vector_store


def ingest_documents(
    vector_store: PGVector,
    documents: list[Document]
) -> list[str]:
    """
    文档入库:将文档向量化并存入 pgvector

    返回文档 ID 列表
    """
    ids = vector_store.add_documents(documents)
    print(f"成功入库 {len(ids)} 条文档")
    return ids


def similarity_search(
    vector_store: PGVector,
    query: str,
    top_k: int = 5,
    filter: dict = None
) -> list[Document]:
    """
    基础相似度搜索

    参数:
        query: 查询文本
        top_k: 返回结果数
        filter: 元数据过滤条件,如 {"source": "pdf"}
    """
    results = vector_store.similarity_search(
        query=query,
        k=top_k,
        filter=filter
    )
    return results


def similarity_search_with_score(
    vector_store: PGVector,
    query: str,
    top_k: int = 5
) -> list[tuple[Document, float]]:
    """
    带距离分数的相似度搜索
    返回 (文档, 距离) 元组列表
    """
    results = vector_store.similarity_search_with_score(
        query=query,
        k=top_k
    )
    return results


def mmr_search(
    vector_store: PGVector,
    query: str,
    top_k: int = 5,
    fetch_k: int = 20,
    lambda_mult: float = 0.5
) -> list[Document]:
    """
    MMR(最大边际相关性)去重检索

    MMR 在相关性和多样性之间取得平衡:
    - 先检索 fetch_k 个最相关的文档
    - 然后从中选出 top_k 个既相关又多样的文档

    参数:
        query: 查询文本
        top_k: 最终返回的结果数
        fetch_k: 预检索的候选数(越大,多样性选择空间越大)
        lambda_mult: 多样性参数(0=最大多样性,1=最大相关性,默认0.5)
    """
    results = vector_store.max_marginal_relevance_search(
        query=query,
        k=top_k,
        fetch_k=fetch_k,
        lambda_mult=lambda_mult
    )
    return results


# ---- 主流程 ----
if __name__ == "__main__":
    CONNECTION_STRING = "postgresql://postgres:your_password@localhost:5432/vector_db"

    # 1. 初始化
    embeddings = LocalEmbeddings("BAAI/bge-m3")
    store = create_vector_store(CONNECTION_STRING, "demo_docs", embeddings)

    # 2. 准备文档
    docs = [
        Document(
            page_content="PostgreSQL 是一个强大的开源关系数据库管理系统",
            metadata={"source": "database_guide", "page": 1}
        ),
        Document(
            page_content="pgvector 是 PostgreSQL 的向量搜索扩展,支持 ANN 检索",
            metadata={"source": "pgvector_docs", "page": 2}
        ),
        Document(
            page_content="HNSW 是一种基于图的近似最近邻搜索算法",
            metadata={"source": "algorithm_paper", "page": 3}
        ),
        Document(
            page_content="RAG 通过检索增强生成,结合了检索系统和大型语言模型",
            metadata={"source": "rag_tutorial", "page": 4}
        ),
        Document(
            page_content="向量嵌入将文本映射到高维空间中的稠密向量表示",
            metadata={"source": "nlp_basics", "page": 5}
        ),
    ]

    # 3. 文档入库
    ids = ingest_documents(store, docs)

    # 4. 相似度搜索
    print("\n--- 相似度搜索 ---")
    results = similarity_search(store, "什么是向量搜索?", top_k=3)
    for doc in results:
        print(f"  [{doc.metadata['source']}] {doc.page_content}")

    # 5. 带分数的搜索
    print("\n--- 带分数的搜索 ---")
    scored_results = similarity_search_with_score(store, "HNSW 算法原理", top_k=3)
    for doc, score in scored_results:
        print(f"  距离: {score:.4f} | {doc.page_content}")

    # 6. MMR 去重搜索
    print("\n--- MMR 去重搜索 ---")
    mmr_results = mmr_search(
        store, "数据库技术", top_k=3, fetch_k=10, lambda_mult=0.7
    )
    for doc in mmr_results:
        print(f"  [{doc.metadata['source']}] {doc.page_content}")

    # 7. 带过滤条件的搜索
    print("\n--- 带过滤条件的搜索 ---")
    filtered = similarity_search(
        store, "向量相关内容", top_k=2,
        filter={"source": "nlp_basics"}
    )
    for doc in filtered:
        print(f"  [{doc.metadata['source']}] {doc.page_content}")

4.2.2 LlamaIndex 集成

安装依赖

pip install llama-index llama-index-vector-stores-postgres psycopg[binary] sentence-transformers

完整代码示例

"""
LlamaIndex + pgvector 完整集成示例
演示:PGVectorStore 配置、本地 Embedding 模型适配、文档索引与检索
"""
from llama_index.core import VectorStoreIndex, StorageContext, Document
from llama_index.core.embeddings import BaseEmbedding
from llama_index.vector_stores.postgres import PGVectorStore
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List


# ---- 自定义本地 Embedding 模型 ----
class BGEEmbedding(BaseEmbedding):
    """
    适配 BGE-M3 模型的 LlamaIndex Embedding 实现
    """

    def __init__(self, model_name: str = "BAAI/bge-m3", **kwargs):
        super().__init__(**kwargs)
        self._model = SentenceTransformer(model_name)

    def _get_query_embedding(self, query: str) -> List[float]:
        """生成查询向量"""
        embedding = self._model.encode(query, normalize_embeddings=True)
        return embedding.tolist()

    def _get_text_embedding(self, text: str) -> List[float]:
        """生成文本向量"""
        embedding = self._model.encode(text, normalize_embeddings=True)
        return embedding.tolist()

    def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]:
        """批量生成文本向量"""
        embeddings = self._model.encode(
            texts,
            normalize_embeddings=True,
            batch_size=64,
            show_progress_bar=True
        )
        return embeddings.tolist()

    async def _aget_query_embedding(self, query: str) -> List[float]:
        return self._get_query_embedding(query)

    async def _aget_text_embedding(self, text: str) -> List[float]:
        return self._get_text_embedding(text)


def create_llamaindex_vector_store(
    host: str = "localhost",
    port: int = 5432,
    database: str = "vector_db",
    user: str = "postgres",
    password: str = "your_password",
    table_name: str = "llamaindex_docs",
    embed_dim: int = 1024  # BGE-M3 的向量维度
) -> PGVectorStore:
    """
    创建 LlamaIndex PGVectorStore 实例
    """
    vector_store = PGVectorStore.from_params(
        host=host,
        port=str(port),
        database=database,
        user=user,
        password=password,
        table_name=table_name,
        embed_dim=embed_dim,
        # 使用 HNSW 索引
        hnsw_kwargs={
            "m": 16,
            "ef_construction": 64,
        }
    )
    return vector_store


def build_index(
    vector_store: PGVectorStore,
    documents: List[Document],
    embed_model: BaseEmbedding
) -> VectorStoreIndex:
    """
    构建向量索引

    将文档向量化并存入 pgvector,同时创建 HNSW 索引
    """
    storage_context = StorageContext.from_defaults(vector_store=vector_store)

    index = VectorStoreIndex.from_documents(
        documents,
        storage_context=storage_context,
        embed_model=embed_model,
        show_progress=True
    )
    return index


def query_index(
    index: VectorStoreIndex,
    query: str,
    top_k: int = 5
) -> list:
    """
    执行语义检索
    """
    retriever = index.as_retriever(similarity_top_k=top_k)
    nodes = retriever.retrieve(query)
    return nodes


# ---- 主流程 ----
if __name__ == "__main__":
    # 1. 初始化 Embedding 模型
    embed_model = BGEEmbedding("BAAI/bge-m3")

    # 2. 创建向量存储
    vector_store = create_llamaindex_vector_store(
        host="localhost",
        database="vector_db",
        table_name="llamaindex_docs",
        embed_dim=1024  # BGE-M3 输出维度
    )

    # 3. 准备文档
    documents = [
        Document(
            text="PostgreSQL 是世界上最先进的开源关系数据库。",
            metadata={"source": "postgres_intro", "category": "database"}
        ),
        Document(
            text="pgvector 扩展让 PostgreSQL 具备了向量搜索能力。",
            metadata={"source": "pgvector_intro", "category": "extension"}
        ),
        Document(
            text="HNSW 索引通过构建分层图结构实现高效的近似最近邻搜索。",
            metadata={"source": "hnsw_paper", "category": "algorithm"}
        ),
        Document(
            text="IVFFlat 索引使用 K-Means 聚类来加速向量检索。",
            metadata={"source": "ivfflat_paper", "category": "algorithm"}
        ),
        Document(
            text="RAG 系统将检索到的相关文档作为上下文注入到 LLM 的提示中。",
            metadata={"source": "rag_paper", "category": "application"}
        ),
    ]

    # 4. 构建索引
    print("正在构建向量索引...")
    index = build_index(vector_store, documents, embed_model)
    print("索引构建完成")

    # 5. 执行查询
    print("\n--- 语义检索 ---")
    query = "pgvector 支持哪些索引类型?"
    nodes = query_index(index, query, top_k=3)

    for i, node in enumerate(nodes):
        print(f"\n结果 {i + 1}:")
        print(f"  文本: {node.get_content()}")
        print(f"  分数: {node.get_score():.4f}")
        print(f"  来源: {node.metadata.get('source')}")

4.3 端到端极简 RAG 实战案例

下面完整实现一个从文档到问答的 RAG 系统。系统架构:

用户提问 → 问题向量化 → pgvector 语义检索 Top-K → 召回原文片段
                                                       ↓
                        LLM 生成回答 ← 构建 Prompt(问题 + 上下文)

4.3.1 完整可运行代码

"""
端到端 RAG(检索增强生成)系统完整实现

功能:
1. PDF/TXT 文档切片
2. 向量化(sentence-transformers + BGE-M3)
3. 存入 pgvector + HNSW 索引
4. 语义检索 + LLM 生成回答

依赖安装:
    pip install psycopg[binary] pgvector numpy sentence-transformers
    pip install tenacity openai PyPDF2 langchain-text-splitters

使用方法:
    python rag_system.py
"""
import os
import re
import numpy as np
import psycopg
from pgvector.psycopg import register_vector
from sentence_transformers import SentenceTransformer
from tenacity import retry, stop_after_attempt, wait_exponential
from dataclasses import dataclass, field
from typing import List, Optional
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


# ============================================================
# 配置
# ============================================================
@dataclass
class RAGConfig:
    """RAG 系统配置"""
    # 数据库配置
    db_host: str = "localhost"
    db_port: int = 5432
    db_name: str = "vector_db"
    db_user: str = "postgres"
    db_password: str = "your_password"
    table_name: str = "rag_documents"

    # Embedding 配置
    embedding_model: str = "BAAI/bge-m3"
    embedding_dim: int = 1024  # BGE-M3 输出维度

    # 切片配置
    chunk_size: int = 500       # 每个切片的最大字符数
    chunk_overlap: int = 50     # 切片之间的重叠字符数

    # 检索配置
    top_k: int = 5              # 召回的文档数量
    ef_search: int = 80         # HNSW 搜索精度

    # LLM 配置(以 OpenAI 兼容 API 为例)
    llm_api_base: str = "https://api.openai.com/v1"  # 可替换为其他兼容 API
    llm_api_key: str = os.environ.get("OPENAI_API_KEY", "your-api-key")
    llm_model: str = "gpt-4o-mini"  # 或其他模型

    @property
    def connection_string(self) -> str:
        return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"


# ============================================================
# 第一步:数据源准备 - 文档切片
# ============================================================
@dataclass
class TextChunk:
    """文本切片"""
    content: str
    source: str
    chunk_index: int
    metadata: dict = field(default_factory=dict)


def load_text_file(file_path: str) -> str:
    """加载 TXT 文件"""
    with open(file_path, "r", encoding="utf-8") as f:
        return f.read()


def load_pdf_file(file_path: str) -> str:
    """加载 PDF 文件(使用 PyPDF2)"""
    try:
        from PyPDF2 import PdfReader
    except ImportError:
        raise ImportError("请安装 PyPDF2: pip install PyPDF2")

    reader = PdfReader(file_path)
    text = ""
    for i, page in enumerate(reader.pages):
        page_text = page.extract_text() or ""
        text += f"\n--- 第 {i + 1} 页 ---\n" + page_text
    return text


def split_text(
    text: str,
    source: str,
    chunk_size: int = 500,
    chunk_overlap: int = 50
) -> List[TextChunk]:
    """
    将长文本分割为重叠的切片

    使用 RecursiveCharacterTextSplitter 的思想:
    1. 先按大段落分割(\n\n)
    2. 再按段落分割(\n)
    3. 再按句子分割(。!?.!?)
    4. 最后按固定长度分割

    切片之间保持 chunk_overlap 个字符的重叠,确保上下文不丢失。
    """
    # 分隔符列表,从大到小
    separators = ["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""]

    chunks = []
    # 使用递归分割
    parts = _recursive_split(text, separators, chunk_size)

    # 合并过小的片段,并添加重叠
    current_chunk = ""
    chunk_index = 0

    for part in parts:
        if len(current_chunk) + len(part) <= chunk_size:
            current_chunk += part
        else:
            if current_chunk:
                chunks.append(TextChunk(
                    content=current_chunk.strip(),
                    source=source,
                    chunk_index=chunk_index
                ))
                chunk_index += 1
                # 保留重叠部分
                if len(current_chunk) > chunk_overlap:
                    current_chunk = current_chunk[-chunk_overlap:] + part
                else:
                    current_chunk = part
            else:
                current_chunk = part

    # 处理最后一块
    if current_chunk.strip():
        chunks.append(TextChunk(
            content=current_chunk.strip(),
            source=source,
            chunk_index=chunk_index
        ))

    logger.info(f"文档 '{source}' 被分割为 {len(chunks)} 个切片")
    return chunks


def _recursive_split(text: str, separators: list, chunk_size: int) -> list:
    """递归分割文本"""
    if len(text) <= chunk_size:
        return [text]

    if not separators:
        # 强制按长度切割
        return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]

    sep = separators[0]
    remaining_seps = separators[1:]

    if sep == "":
        return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]

    parts = text.split(sep)
    # 把分隔符加回去
    result_parts = []
    for i, part in enumerate(parts):
        if i < len(parts) - 1:
            result_parts.append(part + sep)
        else:
            result_parts.append(part)

    # 递归处理过长的部分
    final_parts = []
    for part in result_parts:
        if len(part) > chunk_size:
            final_parts.extend(_recursive_split(part, remaining_seps, chunk_size))
        else:
            final_parts.append(part)

    return final_parts


# ============================================================
# 第二步:向量化
# ============================================================
class EmbeddingService:
    """Embedding 向量化服务"""

    def __init__(self, model_name: str = "BAAI/bge-m3"):
        logger.info(f"正在加载 Embedding 模型: {model_name}")
        self.model = SentenceTransformer(model_name)
        logger.info("Embedding 模型加载完成")

    def embed_texts(self, texts: List[str], batch_size: int = 64) -> np.ndarray:
        """
        批量生成文本向量

        参数:
            texts: 文本列表
            batch_size: 批处理大小

        返回:
            numpy 数组,形状 (len(texts), embedding_dim)
        """
        embeddings = self.model.encode(
            texts,
            normalize_embeddings=True,  # L2 归一化
            batch_size=batch_size,
            show_progress_bar=True,
            convert_to_numpy=True
        )
        return embeddings

    def embed_query(self, text: str) -> np.ndarray:
        """生成查询向量"""
        embedding = self.model.encode(
            text,
            normalize_embeddings=True,
            convert_to_numpy=True
        )
        return embedding


# ============================================================
# 第三步:存储 - pgvector 操作
# ============================================================
class VectorStore:
    """pgvector 向量存储层"""

    def __init__(self, config: RAGConfig):
        self.config = config
        self.conn = psycopg.connect(config.connection_string, autocommit=True)
        register_vector(self.conn)
        self._init_table()
        self._ensure_index()

    def _init_table(self):
        """创建向量表"""
        with self.conn.cursor() as cur:
            cur.execute(f"""
                CREATE TABLE IF NOT EXISTS {self.config.table_name} (
                    id serial PRIMARY KEY,
                    content text NOT NULL,
                    embedding vector({self.config.embedding_dim}),
                    source text,
                    chunk_index integer,
                    metadata jsonb DEFAULT '{{}}'::jsonb,
                    created_at timestamptz DEFAULT now()
                )
            """)
        logger.info(f"表 {self.config.table_name} 已就绪")

    def _ensure_index(self):
        """确保 HNSW 索引存在"""
        index_name = f"idx_{self.config.table_name}_embedding"
        with self.conn.cursor() as cur:
            cur.execute(f"""
                CREATE INDEX IF NOT EXISTS {index_name}
                ON {self.config.table_name}
                USING hnsw (embedding vector_cosine_ops)
                WITH (m = 16, ef_construction = 64)
            """)
        logger.info("HNSW 索引已就绪")

    def upsert_chunks(self, chunks: List[TextChunk], embeddings: np.ndarray):
        """
        批量写入切片及其向量

        使用 source + chunk_index 作为唯一键,支持幂等写入
        """
        assert len(chunks) == len(embeddings), "切片数量与向量数量不匹配"

        with self.conn.cursor() as cur:
            for chunk, emb in zip(chunks, embeddings):
                cur.execute(f"""
                    INSERT INTO {self.config.table_name}
                        (content, embedding, source, chunk_index, metadata)
                    VALUES (%s, %s, %s, %s, %s)
                    ON CONFLICT DO NOTHING
                """, (
                    chunk.content,
                    emb.astype(np.float32),
                    chunk.source,
                    chunk.chunk_index,
                    chunk.metadata
                ))

        logger.info(f"已写入 {len(chunks)} 个切片到 {self.config.table_name}")

    def search(
        self,
        query_embedding: np.ndarray,
        top_k: int = 5,
        filter_source: str = None
    ) -> List[dict]:
        """
        向量相似度检索

        参数:
            query_embedding: 查询向量
            top_k: 返回结果数
            filter_source: 按来源过滤(可选)
        """
        with self.conn.cursor() as cur:
            # 设置搜索精度
            cur.execute(f"SET LOCAL hnsw.ef_search = {self.config.ef_search}")

            if filter_source:
                cur.execute(f"""
                    SELECT id, content, source, chunk_index,
                           embedding <=> %s AS distance
                    FROM {self.config.table_name}
                    WHERE source = %s
                    ORDER BY distance
                    LIMIT %s
                """, (query_embedding, filter_source, top_k))
            else:
                cur.execute(f"""
                    SELECT id, content, source, chunk_index,
                           embedding <=> %s AS distance
                    FROM {self.config.table_name}
                    ORDER BY distance
                    LIMIT %s
                """, (query_embedding, top_k))

            results = []
            for row in cur.fetchall():
                results.append({
                    "id": row[0],
                    "content": row[1],
                    "source": row[2],
                    "chunk_index": row[3],
                    "distance": float(row[4])
                })
            return results

    def count(self) -> int:
        """获取表中文档数量"""
        with self.conn.cursor() as cur:
            cur.execute(f"SELECT COUNT(*) FROM {self.config.table_name}")
            return cur.fetchone()[0]

    def close(self):
        """关闭连接"""
        self.conn.close()


# ============================================================
# 第四步:检索增强生成(RAG)
# ============================================================
class RAGGenerator:
    """RAG 生成器:检索 + LLM 生成"""

    def __init__(self, config: RAGConfig):
        self.config = config

    def build_prompt(self, query: str, contexts: List[dict]) -> str:
        """
        构建 RAG 提示词

        将检索到的上下文和用户问题组装成结构化的 Prompt
        """
        # 拼接上下文
        context_text = ""
        for i, ctx in enumerate(contexts):
            source_info = f"[来源: {ctx['source']}"
            if ctx.get("chunk_index") is not None:
                source_info += f", 片段 {ctx['chunk_index']}"
            source_info += "]"
            context_text += f"\n\n段落 {i + 1} {source_info}:\n{ctx['content']}"

        prompt = f"""你是一个智能问答助手。请根据以下提供的参考文档回答用户的问题。
如果参考文档中没有足够的信息来回答问题,请明确说明"根据已有文档无法完全回答该问题"。
不要编造文档中没有的信息。

---参考文档---
{context_text}
---参考文档结束---

用户问题:{query}

请给出详细、准确的回答:"""

        return prompt

    def generate(self, prompt: str) -> str:
        """
        调用 LLM 生成回答

        使用 OpenAI 兼容 API(可替换为通义千问等任何兼容接口)
        """
        try:
            from openai import OpenAI
        except ImportError:
            raise ImportError("请安装 openai: pip install openai")

        client = OpenAI(
            api_key=self.config.llm_api_key,
            base_url=self.config.llm_api_base
        )

        response = client.chat.completions.create(
            model=self.config.llm_model,
            messages=[
                {
                    "role": "system",
                    "content": "你是一个专业的知识问答助手,基于提供的文档内容回答问题。"
                },
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            temperature=0.1,  # 低温度,提高回答的确定性
            max_tokens=1024
        )

        return response.choices[0].message.content


# ============================================================
# 主流程:完整的 RAG Pipeline
# ============================================================
class RAGPipeline:
    """RAG 系统主入口"""

    def __init__(self, config: RAGConfig = None):
        self.config = config or RAGConfig()
        self.embedding_service = EmbeddingService(self.config.embedding_model)
        self.vector_store = VectorStore(self.config)
        self.generator = RAGGenerator(self.config)

    def ingest_file(self, file_path: str):
        """导入单个文件"""
        logger.info(f"正在处理文件: {file_path}")

        # 1. 加载文件
        if file_path.endswith(".pdf"):
            text = load_pdf_file(file_path)
        elif file_path.endswith(".txt"):
            text = load_text_file(file_path)
        else:
            raise ValueError(f"不支持的文件格式: {file_path}")

        # 2. 文本切片
        chunks = split_text(
            text,
            source=os.path.basename(file_path),
            chunk_size=self.config.chunk_size,
            chunk_overlap=self.config.chunk_overlap
        )

        if not chunks:
            logger.warning(f"文件 {file_path} 切片结果为空,跳过")
            return

        # 3. 向量化
        texts = [c.content for c in chunks]
        embeddings = self.embedding_service.embed_texts(texts)

        # 4. 存入数据库
        self.vector_store.upsert_chunks(chunks, embeddings)
        logger.info(f"文件 {file_path} 处理完成,共 {len(chunks)} 个切片")

    def ingest_directory(self, dir_path: str, extensions: tuple = (".txt", ".pdf")):
        """批量导入目录下的所有文件"""
        for root, dirs, files in os.walk(dir_path):
            for file_name in files:
                if file_name.endswith(extensions):
                    file_path = os.path.join(root, file_name)
                    self.ingest_file(file_path)

    def ask(self, question: str, top_k: int = None) -> dict:
        """
        RAG 问答:检索 + 生成

        参数:
            question: 用户问题
            top_k: 召回的文档数量

        返回:
            {
                "answer": LLM 生成的回答,
                "sources": 引用的文档片段列表,
                "question": 原始问题
            }
        """
        top_k = top_k or self.config.top_k
        logger.info(f"正在处理问题: {question}")

        # 1. 问题向量化
        query_embedding = self.embedding_service.embed_query(question)

        # 2. 语义检索
        contexts = self.vector_store.search(query_embedding, top_k=top_k)
        logger.info(f"检索到 {len(contexts)} 个相关片段")

        if not contexts:
            return {
                "answer": "抱歉,知识库中没有找到与该问题相关的内容。",
                "sources": [],
                "question": question
            }

        # 3. 构建 Prompt
        prompt = self.generator.build_prompt(question, contexts)

        # 4. LLM 生成回答
        answer = self.generator.generate(prompt)

        return {
            "answer": answer,
            "sources": [
                {
                    "source": ctx["source"],
                    "chunk_index": ctx["chunk_index"],
                    "distance": ctx["distance"],
                    "preview": ctx["content"][:100] + "..."
                }
                for ctx in contexts
            ],
            "question": question
        }

    def close(self):
        """关闭资源"""
        self.vector_store.close()


# ============================================================
# 运行示例
# ============================================================
if __name__ == "__main__":
    # 配置(请根据实际环境修改)
    config = RAGConfig(
        db_host="localhost",
        db_port=5432,
        db_name="vector_db",
        db_user="postgres",
        db_password="your_password",
        table_name="rag_documents",
        embedding_model="BAAI/bge-m3",
        embedding_dim=1024,
        chunk_size=500,
        chunk_overlap=50,
        top_k=5,
        ef_search=80,
        # 可替换为通义千问 DashScope 兼容 API
        # llm_api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
        # llm_api_key="your-dashscope-key",
        # llm_model="qwen-plus",
    )

    # 初始化 Pipeline
    pipeline = RAGPipeline(config)

    # ---- 方式一:导入本地文件 ----
    # pipeline.ingest_file("./my_document.pdf")
    # pipeline.ingest_file("./notes.txt")

    # ---- 方式二:批量导入目录 ----
    # pipeline.ingest_directory("./documents/")

    # ---- 方式三:直接导入文本(演示用) ----
    sample_texts = [
        "pgvector 是 PostgreSQL 的开源向量搜索扩展,支持 HNSW、IVFFlat 等索引类型。",
        "HNSW(分层可导航小世界图)是目前最高效的近似最近邻搜索算法之一。",
        "RAG(检索增强生成)是一种将信息检索与大型语言模型结合的技术范式。",
        "向量嵌入是将文本、图像等非结构化数据映射为高维稠密向量的过程。",
        "PostgreSQL 支持通过 CREATE INDEX 语句为向量列创建 HNSW 或 IVFFlat 索引。",
        "在 RAG 系统中,文档首先被切分为小的文本片段,然后每个片段被转换为向量嵌入。",
        "余弦相似度通过计算两个向量之间的夹角来衡量它们的语义相似程度。",
        "BGE-M3 是北京智源研究院发布的多语言、多功能嵌入模型,支持 100+ 种语言。",
    ]

    from pathlib import Path

    # 写入临时文件用于演示
    demo_file = Path("demo_knowledge_base.txt")
    demo_file.write_text("\n\n".join(sample_texts), encoding="utf-8")
    pipeline.ingest_file(str(demo_file))
    demo_file.unlink()  # 清理临时文件

    # ---- 执行问答 ----
    print("\n" + "=" * 60)
    print("RAG 系统已就绪,开始问答")
    print("=" * 60)

    questions = [
        "pgvector 支持哪些索引类型?",
        "什么是 RAG?它是怎么工作的?",
        "BGE-M3 模型有什么特点?",
    ]

    for q in questions:
        print(f"\n{'─' * 40}")
        print(f"问题: {q}")
        result = pipeline.ask(q)
        print(f"回答: {result['answer']}")
        print(f"引用来源:")
        for src in result["sources"]:
            print(f"  - [{src['source']}] 片段{src['chunk_index']} "
                  f"(距离: {src['distance']:.4f})")

    pipeline.close()

4.3.2 流程图解

┌──────────────────────────────────────────────────────────────┐
│                    RAG 系统完整流程                           │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  离线阶段(文档入库):                                        │
│                                                              │
│  PDF/TXT ──→ 文本提取 ──→ 切片 ──→ 向量化 ──→ pgvector 存储  │
│             load_pdf()   split()  encode()   upsert_chunks() │
│                         500字/片  BGE-M3     HNSW 索引       │
│                         50字重叠  1024维                     │
│                                                              │
│  在线阶段(问答检索):                                        │
│                                                              │
│  用户问题 ──→ 问题向量化 ──→ pgvector 检索 ──→ Top-K 召回    │
│              embed_query() <=> 余弦距离     距离排序          │
│                                                              │
│  Top-K 上下文 + 用户问题 ──→ 构建 Prompt ──→ LLM ──→ 回答    │
│                              build_prompt()  GPT/Qwen        │
│                                                              │
└──────────────────────────────────────────────────────────────┘

4.4 高级向量能力实操

4.4.1 向量聚合计算

pgvector 支持对向量进行聚合运算,这在某些分析场景中非常有用。

"""
向量聚合计算示例
"""
import psycopg
from pgvector.psycopg import register_vector
import numpy as np


def compute_average_embedding(conn, table: str = "documents") -> np.ndarray:
    """
    计算所有文档的平均向量(即"中心向量")

    用途:
    - 了解文档集合的整体语义方向
    - 作为聚类初始中心
    - 检测异常文档(距离平均向量过远的文档)
    """
    with conn.cursor() as cur:
        # 使用 AVG 聚合计算每个维度的平均值
        # 注意:pgvector 0.7.0+ 支持 avg() 聚合函数
        cur.execute(f"""
            SELECT avg(embedding) FROM {table}
        """)
        result = cur.fetchone()
        if result and result[0] is not None:
            # avg() 返回的是 vector 类型,psycopg 会自动转为 numpy 数组
            avg_vec = np.array(result[0])
            logger.info(f"平均向量维度: {avg_vec.shape}")
            return avg_vec
        return None


def compute_centroid_per_source(conn, table: str = "documents") -> dict:
    """
    按来源分组计算每个来源的中心向量

    用途:
    - 比较不同文档/来源的语义差异
    - 构建来源级别的语义索引
    """
    with conn.cursor() as cur:
        cur.execute(f"""
            SELECT source, avg(embedding) AS centroid
            FROM {table}
            GROUP BY source
            ORDER BY source
        """)
        centroids = {}
        for row in cur.fetchall():
            centroids[row[0]] = np.array(row[1])
            logger.info(f"来源 '{row[0]}' 的中心向量已计算")
        return centroids


def vector_arithmetic(conn):
    """
    向量加减法运算

    应用场景:
    - 语义类比:king - man + woman ≈ queen
    - 文档差异:doc_new - doc_old = 变化方向
    - 混合查询:0.7 * query_vec + 0.3 * category_vec
    """
    with conn.cursor() as cur:
        # 获取两个文档的向量
        cur.execute("""
            SELECT id, content, embedding
            FROM documents
            LIMIT 2
        """)
        rows = cur.fetchall()
        if len(rows) < 2:
            print("需要至少 2 条文档")
            return

        vec_a = np.array(rows[0][2])
        vec_b = np.array(rows[1][2])

        # 向量加法
        vec_sum = vec_a + vec_b
        print(f"向量 A + B: 前5维 = {vec_sum[:5]}")

        # 向量减法
        vec_diff = vec_a - vec_b
        print(f"向量 A - B: 前5维 = {vec_diff[:5]}")

        # 加权混合(常用于混合查询)
        alpha = 0.7
        vec_mixed = alpha * vec_a + (1 - alpha) * vec_b
        print(f"0.7*A + 0.3*B: 前5维 = {vec_mixed[:5]}")

        # 使用混合向量进行搜索
        cur.execute("""
            SELECT id, content, embedding <=> %s AS distance
            FROM documents
            ORDER BY distance
            LIMIT 5
        """, (vec_mixed.astype(np.float32),))

        print("\n混合向量搜索结果:")
        for row in cur.fetchall():
            print(f"  ID: {row[0]}, 距离: {float(row[2]):.4f}, 内容: {row[1][:60]}")


# 使用示例
if __name__ == "__main__":
    conn = psycopg.connect(
        "postgresql://postgres:your_password@localhost:5432/vector_db",
        autocommit=True
    )
    register_vector(conn)

    # 计算平均向量
    avg = compute_average_embedding(conn)
    if avg is not None:
        print(f"平均向量 (前5维): {avg[:5]}")

    # 按来源计算中心向量
    centroids = compute_centroid_per_source(conn)
    for source, centroid in centroids.items():
        print(f"来源 '{source}' 中心向量 (前5维): {centroid[:5]}")

    # 向量运算
    vector_arithmetic(conn)

    conn.close()

4.4.2 MMR 多样性检索

原理

MMR(Maximum Marginal Relevance,最大边际相关性)旨在解决检索结果中的冗余问题。当查询某个主题时,最相关的 Top-K 结果可能内容高度重复。MMR 通过迭代选择,在每一步中同时考虑与查询的相关性与已选结果的差异性

MMR 的核心公式:

MMR = argmax_{d_i ∈ R\S} [ λ * Sim(d_i, Q) - (1-λ) * max_{d_j ∈ S} Sim(d_i, d_j) ]

其中:
- R: 全部候选文档集合
- S: 已选中的文档集合
- Q: 查询向量
- λ: 相关性-多样性权衡参数(0=最大多样性,1=最大相关性)
- Sim: 相似度函数
"""
MMR(最大边际相关性)多样性检索的完整实现
不依赖框架,纯 Python + numpy 实现
"""
import numpy as np
import psycopg
from pgvector.psycopg import register_vector
from typing import List


def cosine_similarity_matrix(
    vectors_a: np.ndarray,
    vectors_b: np.ndarray
) -> np.ndarray:
    """
    计算两组向量之间的余弦相似度矩阵

    参数:
        vectors_a: 形状 (m, dim)
        vectors_b: 形状 (n, dim)

    返回:
        相似度矩阵,形状 (m, n)
    """
    # 假设向量已经归一化
    return vectors_a @ vectors_b.T


def mmr_search(
    conn,
    query_embedding: np.ndarray,
    table: str = "documents",
    top_k: int = 5,
    fetch_k: int = 20,
    lambda_mult: float = 0.5
) -> List[dict]:
    """
    MMR 多样性检索

    参数:
        conn: 数据库连接
        query_embedding: 查询向量(已归一化)
        table: 表名
        top_k: 最终返回的结果数
        fetch_k: 预检索的候选数量(fetch_k >= top_k)
        lambda_mult: 多样性参数
            - 1.0: 纯相关性排序(等价于普通 Top-K)
            - 0.0: 纯多样性排序
            - 0.5: 平衡相关性与多样性(推荐默认值)
            - 0.7: 偏向相关性,适度去重(推荐用于 RAG)

    返回:
        MMR 选中的文档列表
    """
    assert fetch_k >= top_k, "fetch_k 必须大于等于 top_k"

    # 第一步:预检索 fetch_k 个最相关的候选
    with conn.cursor() as cur:
        cur.execute(f"""
            SELECT id, content, source, embedding,
                   embedding <=> %s AS distance
            FROM {table}
            ORDER BY distance
            LIMIT %s
        """, (query_embedding, fetch_k))

        candidates = []
        candidate_embeddings = []
        for row in cur.fetchall():
            candidates.append({
                "id": row[0],
                "content": row[1],
                "source": row[2],
                "distance": float(row[4])
            })
            candidate_embeddings.append(np.array(row[3]))

    if len(candidates) <= top_k:
        return candidates

    candidate_embeddings = np.array(candidate_embeddings)

    # 第二步:计算查询与所有候选的相似度
    # 余弦相似度 = 1 - 余弦距离
    query_sims = 1.0 - np.array([c["distance"] for c in candidates])

    # 第三步:计算候选之间的相似度矩阵
    sim_matrix = cosine_similarity_matrix(candidate_embeddings, candidate_embeddings)

    # 第四步:MMR 迭代选择
    selected_indices = []
    remaining_indices = list(range(len(candidates)))

    for _ in range(min(top_k, len(candidates))):
        best_score = -float("inf")
        best_idx = None

        for idx in remaining_indices:
            # 与查询的相关性
            relevance = query_sims[idx]

            # 与已选文档的最大相似度(冗余度)
            if selected_indices:
                max_sim_to_selected = max(
                    sim_matrix[idx][sel_idx] for sel_idx in selected_indices
                )
            else:
                max_sim_to_selected = 0.0

            # MMR 分数
            mmr_score = (
                lambda_mult * relevance
                - (1 - lambda_mult) * max_sim_to_selected
            )

            if mmr_score > best_score:
                best_score = mmr_score
                best_idx = idx

        selected_indices.append(best_idx)
        remaining_indices.remove(best_idx)

    # 返回选中的文档(按 MMR 选择顺序排列)
    return [candidates[i] for i in selected_indices]


# ---- 使用示例 ----
if __name__ == "__main__":
    conn = psycopg.connect(
        "postgresql://postgres:your_password@localhost:5432/vector_db",
        autocommit=True
    )
    register_vector(conn)

    # 模拟查询向量
    query_vec = np.random.rand(1024).astype(np.float32)
    query_vec = query_vec / np.linalg.norm(query_vec)  # 归一化

    # 普通 Top-K 检索(可能有重复内容)
    print("=== 普通 Top-K 检索 ===")
    with conn.cursor() as cur:
        cur.execute("""
            SELECT id, content, embedding <=> %s AS distance
            FROM documents
            ORDER BY distance
            LIMIT 5
        """, (query_vec,))
        for row in cur.fetchall():
            print(f"  ID={row[0]}, 距离={float(row[2]):.4f}, "
                  f"内容={row[1][:60]}...")

    # MMR 多样性检索(去重后的结果)
    print("\n=== MMR 多样性检索 (λ=0.7) ===")
    mmr_results = mmr_search(
        conn, query_vec,
        table="documents",
        top_k=5,
        fetch_k=20,
        lambda_mult=0.7
    )
    for r in mmr_results:
        print(f"  ID={r['id']}, 距离={r['distance']:.4f}, "
              f"内容={r['content'][:60]}...")

    conn.close()

4.4.3 向量距离归一化

将原始距离转换为 0-1 的相似度分数,使结果更直观、更易于设定阈值。

"""
距离归一化:将向量距离转换为 0-1 的相似度分数
"""
import numpy as np


def normalize_distance(distance: float, distance_type: str = "cosine") -> float:
    """
    将距离转换为 0-1 的相似度分数
    1.0 = 完全相似,0.0 = 完全不相似

    参数:
        distance: 原始距离值
        distance_type: 距离类型
            - "cosine": 余弦距离,范围 [0, 2]
            - "l2": L2 欧氏距离,范围 [0, +inf)
            - "ip": 负内积,范围 [-1, +inf)(归一化向量时为 [-1, 1])
    """
    if distance_type == "cosine":
        # 余弦距离范围 [0, 2]
        # 距离 0 → 相似度 1.0(完全相同方向)
        # 距离 1 → 相似度 0.5(正交)
        # 距离 2 → 相似度 0.0(完全相反方向)
        similarity = 1.0 - (distance / 2.0)
        return max(0.0, min(1.0, similarity))

    elif distance_type == "l2":
        # L2 距离范围 [0, +inf)
        # 使用指数衰减函数映射到 (0, 1]
        # distance=0 → 1.0, distance=1 → ~0.37, distance=5 → ~0.007
        similarity = np.exp(-distance)
        return float(similarity)

    elif distance_type == "ip":
        # 负内积(pgvector 的 <#> 返回的是负内积)
        # 对于归一化向量:内积范围 [-1, 1],负内积范围 [-1, 1]
        # 实际内积 = -distance
        # 相似度 = (实际内积 + 1) / 2
        inner_product = -distance
        similarity = (inner_product + 1.0) / 2.0
        return max(0.0, min(1.0, similarity))

    else:
        raise ValueError(f"不支持的距离类型: {distance_type}")


def batch_normalize_distances(
    distances: list[float],
    distance_type: str = "cosine"
) -> list[float]:
    """批量归一化距离"""
    return [normalize_distance(d, distance_type) for d in distances]


# ---- 使用示例 ----
if __name__ == "__main__":
    # 余弦距离示例
    cosine_distances = [0.0, 0.1, 0.3, 0.5, 1.0, 1.5, 2.0]
    print("余弦距离 → 相似度:")
    for d in cosine_distances:
        s = normalize_distance(d, "cosine")
        print(f"  距离 {d:.2f} → 相似度 {s:.4f}")

    # L2 距离示例
    l2_distances = [0.0, 0.5, 1.0, 2.0, 5.0]
    print("\nL2 距离 → 相似度:")
    for d in l2_distances:
        s = normalize_distance(d, "l2")
        print(f"  距离 {d:.2f} → 相似度 {s:.4f}")

    # 在实际检索中使用
    import psycopg
    from pgvector.psycopg import register_vector

    conn = psycopg.connect(
        "postgresql://postgres:your_password@localhost:5432/vector_db",
        autocommit=True
    )
    register_vector(conn)

    query_vec = np.random.rand(1024).astype(np.float32)
    query_vec = query_vec / np.linalg.norm(query_vec)

    with conn.cursor() as cur:
        cur.execute("""
            SELECT id, content, embedding <=> %s AS distance
            FROM documents
            ORDER BY distance
            LIMIT 5
        """, (query_vec,))

        print("\n带相似度分数的检索结果:")
        for row in cur.fetchall():
            raw_distance = float(row[2])
            similarity = normalize_distance(raw_distance, "cosine")
            print(f"  ID={row[0]}, "
                  f"原始距离={raw_distance:.4f}, "
                  f"相似度={similarity:.4f}, "
                  f"内容={row[1][:50]}...")

    conn.close()

4.4.4 相似度分数转置信度

"""
将相似度分数转换为置信度评级
基于距离分布的阈值设定策略
"""
import numpy as np
from dataclasses import dataclass
from enum import Enum


class Confidence(Enum):
    """置信度等级"""
    HIGH = "高置信度"      # 非常确定答案准确
    MEDIUM = "中等置信度"  # 答案可能准确
    LOW = "低置信度"       # 答案可能不准确
    VERY_LOW = "极低置信度"  # 不建议依赖此答案


@dataclass
class ConfidenceResult:
    """置信度评估结果"""
    confidence: Confidence
    score: float           # 0-1 的置信度分数
    recommendation: str    # 对应用的建议


def compute_confidence(
    similarity: float,
    thresholds: dict = None
) -> ConfidenceResult:
    """
    基于相似度分数计算置信度

    参数:
        similarity: 归一化后的相似度分数 (0-1)
        thresholds: 自定义阈值,默认使用经验值

    默认阈值(基于余弦相似度):
        - ≥ 0.85: 高置信度(文档高度匹配)
        - ≥ 0.70: 中等置信度(文档相关但可能不完全匹配)
        - ≥ 0.50: 低置信度(文档部分相关)
        - < 0.50: 极低置信度(文档不太相关)
    """
    if thresholds is None:
        thresholds = {
            "high": 0.85,
            "medium": 0.70,
            "low": 0.50,
        }

    if similarity >= thresholds["high"]:
        return ConfidenceResult(
            confidence=Confidence.HIGH,
            score=similarity,
            recommendation="可以直接使用检索结果回答用户问题"
        )
    elif similarity >= thresholds["medium"]:
        return ConfidenceResult(
            confidence=Confidence.MEDIUM,
            score=similarity,
            recommendation="可以参考检索结果,但建议提示用户验证"
        )
    elif similarity >= thresholds["low"]:
        return ConfidenceResult(
            confidence=Confidence.LOW,
            score=similarity,
            recommendation="检索结果相关性一般,建议告知用户可能无法准确回答"
        )
    else:
        return ConfidenceResult(
            confidence=Confidence.VERY_LOW,
            score=similarity,
            recommendation="检索结果不相关,建议直接告知用户知识库中没有相关信息"
        )


def adaptive_threshold_calibration(
    conn,
    table: str = "documents",
    sample_queries: list[str] = None,
    embedding_fn=None
) -> dict:
    """
    自适应阈值校准

    通过分析查询向量与库中向量的距离分布,自动确定合理的阈值

    原理:
    - 随机采样一组查询,计算它们与库中所有向量的距离
    - 分析距离分布的统计特征(均值、标准差、分位数)
    - 基于分布特征自动设定阈值
    """
    if sample_queries is None or embedding_fn is None:
        # 使用库中已有向量作为模拟查询
        with conn.cursor() as cur:
            cur.execute(f"""
                SELECT embedding FROM {table}
                ORDER BY RANDOM()
                LIMIT 20
            """)
            sample_embeddings = [np.array(row[0]) for row in cur.fetchall()]
    else:
        sample_embeddings = [embedding_fn(q) for q in sample_queries]

    all_distances = []
    with conn.cursor() as cur:
        for emb in sample_embeddings:
            cur.execute(f"""
                SELECT embedding <=> %s AS distance
                FROM {table}
                ORDER BY distance
                LIMIT 10
            """, (emb,))
            for row in cur.fetchall():
                all_distances.append(float(row[0]))

    if not all_distances:
        return {"high": 0.85, "medium": 0.70, "low": 0.50}

    all_distances = np.array(all_distances)

    # 将距离转换为相似度
    similarities = 1.0 - (all_distances / 2.0)

    # 基于统计分布设定阈值
    mean_sim = np.mean(similarities)
    std_sim = np.std(similarities)

    # 高置信度:均值 + 1 个标准差(高于平均水平的匹配)
    high_threshold = float(np.clip(mean_sim + std_sim, 0.7, 0.95))
    # 中等置信度:均值附近
    medium_threshold = float(np.clip(mean_sim, 0.5, 0.8))
    # 低置信度:均值 - 1 个标准差
    low_threshold = float(np.clip(mean_sim - std_sim, 0.3, 0.6))

    calibrated = {
        "high": round(high_threshold, 3),
        "medium": round(medium_threshold, 3),
        "low": round(low_threshold, 3),
    }

    print(f"距离分布统计:")
    print(f"  相似度均值: {mean_sim:.4f}")
    print(f"  相似度标准差: {std_sim:.4f}")
    print(f"  校准后阈值: {calibrated}")

    return calibrated


# ---- 使用示例 ----
if __name__ == "__main__":
    # 基本置信度评估
    test_similarities = [0.95, 0.80, 0.65, 0.45, 0.30]
    for sim in test_similarities:
        result = compute_confidence(sim)
        print(f"相似度 {sim:.2f} → {result.confidence.value} "
              f"(分数: {result.score:.2f}) | 建议: {result.recommendation}")

    # 自适应校准(需要有数据库连接)
    # conn = psycopg.connect(...)
    # thresholds = adaptive_threshold_calibration(conn)
    # 然后使用校准后的阈值:
    # result = compute_confidence(similarity, thresholds=thresholds)

4.4.5 多向量融合检索

"""
多路召回与结果融合排序

场景:
- 同一个问题用不同的 Embedding 模型编码,分别检索后融合
- 混合检索:向量检索 + 全文检索 + 元数据过滤,融合排序
- 多查询扩展:将一个问题扩展为多个子查询,分别检索后合并
"""
import psycopg
from pgvector.psycopg import register_vector
import numpy as np
from typing import List
from collections import defaultdict


def reciprocal_rank_fusion(
    result_lists: List[List[dict]],
    k: int = 60
) -> List[dict]:
    """
    RRF(Reciprocal Rank Fusion)倒数排名融合

    这是信息检索中最经典的结果融合算法。
    不依赖于原始分数,只依赖于排名位置。

    RRF 分数公式:
        score(d) = Σ 1 / (k + rank_i(d))

    其中 k 是平滑常数(默认 60),rank_i(d) 是文档 d 在第 i 个列表中的排名(从 1 开始)。

    参数:
        result_lists: 多路检索结果列表,每路是一个按相关性排序的文档列表
        k: RRF 平滑参数(越大,排名差异的影响越小)

    返回:
        融合排序后的文档列表
    """
    # 用文档 ID 作为唯一键
    doc_scores = defaultdict(float)
    doc_info = {}

    for results in result_lists:
        for rank, doc in enumerate(results):
            doc_id = doc["id"]
            # RRF 分数
            doc_scores[doc_id] += 1.0 / (k + rank + 1)  # rank 从 0 开始,+1 转为从 1 开始
            # 保留最新的文档信息
            if doc_id not in doc_info:
                doc_info[doc_id] = doc

    # 按 RRF 分数降序排序
    sorted_ids = sorted(doc_scores.keys(), key=lambda x: doc_scores[x], reverse=True)

    fused_results = []
    for doc_id in sorted_ids:
        doc = doc_info[doc_id]
        doc["rrf_score"] = doc_scores[doc_id]
        fused_results.append(doc)

    return fused_results


def weighted_score_fusion(
    result_lists: List[List[dict]],
    weights: List[float] = None,
    score_key: str = "distance"
) -> List[dict]:
    """
    加权分数融合

    将多路检索的分数按权重加权后排序。
    注意:需要将距离转换为"越大越好"的分数。

    参数:
        result_lists: 多路检索结果列表
        weights: 每路的权重(默认等权重)
        score_key: 分数字段名
    """
    if weights is None:
        weights = [1.0 / len(result_lists)] * len(result_lists)

    assert len(result_lists) == len(weights), "结果列表数量与权重数量不匹配"

    doc_scores = defaultdict(float)
    doc_info = {}

    for results, weight in zip(result_lists, weights):
        for doc in results:
            doc_id = doc["id"]
            # 将距离转换为分数(距离越小,分数越高)
            # 使用 1 / (1 + distance) 作为分数
            score = 1.0 / (1.0 + doc.get(score_key, 0.0))
            doc_scores[doc_id] += weight * score
            if doc_id not in doc_info:
                doc_info[doc_id] = doc

    sorted_ids = sorted(doc_scores.keys(), key=lambda x: doc_scores[x], reverse=True)

    fused_results = []
    for doc_id in sorted_ids:
        doc = doc_info[doc_id]
        doc["fused_score"] = doc_scores[doc_id]
        fused_results.append(doc)

    return fused_results


def multi_vector_search(
    conn,
    query_embeddings: List[np.ndarray],
    table: str = "documents",
    top_k: int = 5,
    fetch_k: int = 10,
    fusion_method: str = "rrf"
) -> List[dict]:
    """
    多向量融合检索

    使用多个查询向量分别检索,然后融合结果。

    适用场景:
    - 多查询扩展:一个问题生成多个变体查询
    - 多模型检索:使用不同的 Embedding 模型
    - 多语言检索:不同语言的查询向量

    参数:
        conn: 数据库连接
        query_embeddings: 查询向量列表
        table: 表名
        top_k: 最终返回的结果数
        fetch_k: 每路检索的候选数
        fusion_method: 融合方法 ("rrf" 或 "weighted")
    """
    result_lists = []

    with conn.cursor() as cur:
        for emb in query_embeddings:
            cur.execute(f"""
                SELECT id, content, source,
                       embedding <=> %s AS distance
                FROM {table}
                ORDER BY distance
                LIMIT %s
            """, (emb, fetch_k))

            results = []
            for row in cur.fetchall():
                results.append({
                    "id": row[0],
                    "content": row[1],
                    "source": row[2],
                    "distance": float(row[3])
                })
            result_lists.append(results)

    # 融合
    if fusion_method == "rrf":
        fused = reciprocal_rank_fusion(result_lists)
    else:
        fused = weighted_score_fusion(result_lists)

    return fused[:top_k]


def hybrid_search(
    conn,
    query_embedding: np.ndarray,
    query_text: str,
    table: str = "documents",
    top_k: int = 5,
    vector_weight: float = 0.7,
    text_weight: float = 0.3
) -> List[dict]:
    """
    混合检索:向量语义检索 + 全文关键词检索

    将两路检索结果通过 RRF 或加权融合。

    参数:
        query_embedding: 查询向量
        query_text: 查询文本(用于全文检索)
        table: 表名
        top_k: 返回结果数
        vector_weight: 向量检索权重
        text_weight: 全文检索权重
    """
    vector_results = []
    text_results = []

    with conn.cursor() as cur:
        # 向量检索
        cur.execute(f"""
            SELECT id, content, source,
                   embedding <=> %s AS distance
            FROM {table}
            ORDER BY distance
            LIMIT %s
        """, (query_embedding, top_k * 3))

        for row in cur.fetchall():
            vector_results.append({
                "id": row[0],
                "content": row[1],
                "source": row[2],
                "distance": float(row[3])
            })

        # 全文检索(使用 PostgreSQL 的全文搜索)
        cur.execute(f"""
            SELECT id, content, source,
                   ts_rank(
                       to_tsvector('simple', content),
                       plainto_tsquery('simple', %s)
                   ) AS rank
            FROM {table}
            WHERE to_tsvector('simple', content) @@ plainto_tsquery('simple', %s)
            ORDER BY rank DESC
            LIMIT %s
        """, (query_text, query_text, top_k * 3))

        for row in cur.fetchall():
            text_results.append({
                "id": row[0],
                "content": row[1],
                "source": row[2],
                # 将 rank 转换为距离形式(越大越好 → 越小越好)
                "distance": 1.0 / (1.0 + float(row[3])) if row[3] else 1.0
            })

    # 加权融合
    fused = weighted_score_fusion(
        [vector_results, text_results],
        weights=[vector_weight, text_weight],
        score_key="distance"
    )

    return fused[:top_k]


# ---- 使用示例 ----
if __name__ == "__main__":
    conn = psycopg.connect(
        "postgresql://postgres:your_password@localhost:5432/vector_db",
        autocommit=True
    )
    register_vector(conn)

    # 多向量融合检索(模拟查询扩展)
    # 实际场景中,这3个向量可能来自:原始问题、改写后的问题、翻译后的问题
    query_vecs = [
        np.random.rand(1024).astype(np.float32) for _ in range(3)
    ]
    for v in query_vecs:
        v /= np.linalg.norm(v)

    print("=== 多向量 RRF 融合检索 ===")
    fused = multi_vector_search(
        conn, query_vecs,
        table="documents",
        top_k=5,
        fusion_method="rrf"
    )
    for r in fused:
        print(f"  ID={r['id']}, RRF分数={r['rrf_score']:.4f}, "
              f"内容={r['content'][:60]}...")

    # 混合检索(向量 + 全文)
    print("\n=== 混合检索(向量 70% + 全文 30%)===")
    hybrid = hybrid_search(
        conn,
        query_embedding=query_vecs[0],
        query_text="pgvector 向量搜索",
        table="documents",
        top_k=5,
        vector_weight=0.7,
        text_weight=0.3
    )
    for r in hybrid:
        print(f"  ID={r['id']}, 融合分数={r['fused_score']:.4f}, "
              f"内容={r['content'][:60]}...")

    conn.close()

本章小结

通过第三、四阶段的学习,你已经掌握了:

  1. 索引原理:理解了为什么向量检索需要专门的索引结构,以及 HNSW 和 IVFFlat 两种索引的底层工作原理。
  2. 索引实操:能够根据数据规模、读写比例、召回率和延迟要求,做出正确的索引选型和参数配置。
  3. Python 工程化:从原生 psycopg 驱动到 LangChain/LlamaIndex 框架集成,具备了完整的生产级开发能力。
  4. RAG 实战:完成了一个从文档切片到问答生成的端到端 RAG 系统。
  5. 高级能力:掌握了 MMR 多样性检索、距离归一化、置信度评估、多路融合检索等进阶技术。

这些知识构成了使用 pgvector 构建生产级向量搜索系统的核心能力。在后续阶段中,我们将进一步探讨性能优化、分布式部署、监控运维等高级主题。


第五阶段:生产环境性能调优、运维与坑点规避

当你将 pgvector 从开发环境推向生产环境,真正的挑战才刚刚开始。数据量从万级膨胀到千万级,并发从个位数飙升到数百甚至上千,查询延迟的容忍度从秒级收紧到毫秒级——这些问题都需要系统性的调优手段和扎实的运维保障。本阶段将从内核参数、数据分层、运维监控和高频坑点四个维度,带你全面掌握 pgvector 在生产环境中的最佳实践。


5.1 PostgreSQL 内核参数针对向量场景调优

PostgreSQL 的默认配置是为通用 OLTP 场景设计的,对于向量检索这种"大内存 + 大IO + 计算密集"的工作负载,默认参数往往远远不够。下面逐一讲解每个关键参数的含义、推荐值和调整方法。

5.1.1 内存参数调优

(1)shared_buffers —— 共享缓冲区

shared_buffers 是 PostgreSQL 最重要的内存参数,它定义了数据库服务器用于缓存数据页的共享内存区域大小。所有后端进程共享这块内存,用于缓存从磁盘读取的表数据和索引数据。

在向量检索场景中,shared_buffers 的重要性更加突出:HNSW 索引的图结构节点、IVFFlat 的聚类中心、频繁访问的向量数据行,都需要驻留在共享缓冲区中才能保证查询延迟。如果 shared_buffers 太小,每次向量检索都可能触发大量磁盘 IO,延迟会从毫秒级飙升到百毫秒级。

推荐值:系统物理内存的 25%~30%。对于专门的向量检索服务器,可以适当增大到 35%~40%,但不要超过 40%,因为操作系统本身也需要内存来维护文件系统缓存(page cache)。

# postgresql.conf

# 假设服务器有 64GB 内存
shared_buffers = 16GB          # 系统内存的 25%

# 如果是专门的向量检索服务器(128GB 内存)
# shared_buffers = 40GB        # 可适当增大,但不超过 40%

原理补充:为什么不能超过 40%?PostgreSQL 采用双重缓存架构——数据先经过操作系统的 page cache,再进入 shared_buffers。如果把 shared_buffers 设得过大,操作系统的 page cache 被挤压,反而会导致更多的磁盘 IO。此外,过大的 shared_buffers 也会增加 checkpoint 刷盘时的突发 IO 压力。

(2)maintenance_work_mem —— 维护操作工作内存

maintenance_work_mem 控制的是维护性操作(如 CREATE INDEXVACUUMALTER TABLE ADD FOREIGN KEY)可以使用的最大内存。这是 HNSW 索引构建性能的关键参数。

HNSW 索引的构建过程需要在内存中维护一个多层跳表图(multi-layer graph)。每一条向量数据插入图中时,都需要计算与已有节点的相似度、更新邻居连接关系。当数据量达到百万级时,图结构本身可能占用数 GB 内存。如果 maintenance_work_mem 不够,PostgreSQL 不得不将中间数据溢写到磁盘临时文件,索引构建时间会从分钟级膨胀到小时级。

推荐值

# postgresql.conf

# 通用场景
maintenance_work_mem = 512MB

# 创建 HNSW 索引时(推荐临时调大)
# 方法一:全局设置(索引构建完成后建议调回)
maintenance_work_mem = 2GB

# 方法二:会话级设置(推荐,只对当前会话的索引构建生效)
SET maintenance_work_mem = '2GB';
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);
-- 索引创建完成后,该会话设置自动失效

实测参考:在 100 万条 1536 维向量上创建 HNSW 索引(m=16, ef_construction=64),maintenance_work_mem = 64MB(默认值)时需要约 25 分钟,而 maintenance_work_mem = 2GB 时仅需约 3 分钟。差距高达 8 倍。

(3)work_mem —— 查询工作内存

work_mem 控制的是单个查询操作中每个排序或哈希节点可以使用的内存。当执行向量检索的 ORDER BY distance LIMIT k 时,PostgreSQL 需要对候选向量按距离排序,这个排序操作使用的就是 work_mem

对于向量检索场景,work_mem 的影响通常不如 shared_buffersmaintenance_work_mem 显著,但在以下场景中仍然重要:

  • 向量检索后需要大量排序(如 JOIN 操作后对结果排序)
  • 包含哈希聚合的复合查询(如分组统计 + 向量检索)

推荐值

# postgresql.conf

# 通用场景
work_mem = 16MB

# 复杂向量检索场景
work_mem = 64MB

# 会话级动态调整(对复杂查询临时调大)
SET work_mem = '128MB';
SELECT id, embedding <=> query_vector AS distance
FROM documents
ORDER BY distance
LIMIT 100;

注意work_mem 是每个排序/哈希节点独立分配的,一个复杂查询可能包含多个节点,因此总内存消耗可能是 work_mem 的数倍。不建议将全局 work_mem 设置得过大。

(4)effective_cache_size —— 有效缓存大小估计

effective_cache_size 并不实际分配内存,它只是告诉查询优化器"操作系统和 PostgreSQL 一共可以缓存多少数据"。优化器根据这个值来决定是使用索引扫描还是全表扫描。

对于向量检索,如果 effective_cache_size 设置过小,优化器可能会错误地认为索引数据不在缓存中,从而放弃使用 HNSW/IVFFlat 索引,选择全表扫描。

推荐值:系统物理内存的 50%~75%。

# postgresql.conf

# 假设服务器有 64GB 内存
effective_cache_size = 48GB    # 系统内存的 75%

5.1.2 IO 与并发参数调优

(1)max_parallel_maintenance_workers —— 并行索引构建

PostgreSQL 支持使用多个工作进程并行创建索引,这对大数据量的 HNSW 索引构建至关重要。每个并行工作进程可以独立处理一部分向量的图结构构建。

# postgresql.conf

# 推荐设为 CPU 核心数的一半,但不超过 8
max_parallel_maintenance_workers = 4   # 4核以上服务器

# 创建索引时可以会话级临时调大
SET max_parallel_maintenance_workers = 8;
SET maintenance_work_mem = '2GB';
CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);

注意:每个并行工作进程都需要独立的 maintenance_work_mem 内存。如果你设置了 max_parallel_maintenance_workers = 4maintenance_work_mem = 2GB,那么索引构建过程最多可能消耗 8GB 内存。请确保服务器内存充足。

(2)max_parallel_workers_per_gather —— 并行查询

控制单个查询可以使用的最大并行工作进程数。对于返回大量结果的向量检索查询,并行扫描可以显著降低延迟。

# postgresql.conf
max_parallel_workers_per_gather = 4
max_parallel_workers = 8        # 系统级并行工作进程上限

(3)WAL 日志参数 —— 适配向量批量写入

向量数据的批量写入(如批量导入 embedding)会产生大量 WAL(Write-Ahead Log)日志。默认的 WAL 参数可能成为写入瓶颈:

# postgresql.conf

wal_buffers = 64MB              # WAL 缓冲区,默认 -1(自动计算)通常够用
                                 # 向量批量写入场景可显式设为 64MB

max_wal_size = 4GB              # WAL 日志最大总量,默认 1GB
                                 # 向量批量写入时建议 4GB~8GB

min_wal_size = 1GB              # WAL 日志最小保留量

checkpoint_completion_target = 0.9  # 检查点完成目标,默认 0.5
                                     # 设为 0.9 可以平滑写入压力
                                     # 让检查点更慢地完成,减少IO突发

原理补充checkpoint_completion_target = 0.9 的含义是让检查点在下次检查点间隔的 90% 时间内完成(而非默认的 50%),这使得脏页刷盘的过程更加平缓,避免了默认设置下每 5 分钟一次的"写入悬崖"(write cliff)现象。对于向量批量写入场景尤为重要。

(4)random_page_cost —— 适配 SSD 存储

PostgreSQL 查询优化器使用 random_page_costseq_page_cost 来估算随机 IO 和顺序 IO 的相对成本。默认值 random_page_cost = 4.0 是基于传统机械硬盘(HDD)的假设。在 SSD 环境下,随机读和顺序读的性能差距远没有 4 倍那么大。

# postgresql.conf

# SSD 环境(推荐)
random_page_cost = 1.1

# NVMe SSD 环境
random_page_cost = 1.0

# 机械硬盘(保持默认)
# random_page_cost = 4.0

为什么这对向量检索重要:HNSW 索引的遍历模式是典型的随机 IO(在图结构中跳转到不同的邻居节点)。如果 random_page_cost 过高,优化器可能认为"随机读索引页太贵了",从而放弃使用 HNSW 索引,转而进行全表顺序扫描。这在 SSD 环境下显然是错误的决策。

5.1.3 向量场景专属运行时参数

pgvector 提供了两个运行时参数,可以在不重启数据库的情况下动态调整检索精度和性能的平衡。

(1)hnsw.ef_search —— HNSW 检索精度控制

hnsw.ef_search 控制 HNSW 索引在查询时的搜索宽度。值越大,搜索越精确(召回率越高),但查询延迟也越高。

-- 会话级设置(仅当前会话有效)
SET hnsw.ef_search = 100;      -- 推荐范围:40~200

-- 在事务内设置(事务结束自动恢复)
BEGIN;
SET LOCAL hnsw.ef_search = 200;  -- 高精度模式
SELECT id, title
FROM documents
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;
COMMIT;
-- COMMIT 后 ef_search 自动恢复为会话级值

-- 全局设置(需要超级用户权限,写入 postgresql.conf)
-- postgresql.conf 中添加:
-- hnsw.ef_search = 100
-- 然后执行:
SELECT pg_reload_conf();

作用域说明

设置方式 作用域 持久性 示例
SET hnsw.ef_search = 100 当前会话 会话结束失效 应用连接后设置
SET LOCAL hnsw.ef_search = 100 当前事务 事务结束失效 单次查询精度调整
postgresql.conf + pg_reload_conf() 全局 持久生效 统一默认策略
ALTER ROLE app_user SET hnsw.ef_search = 100 指定角色 持久生效 按角色定制
ALTER DATABASE mydb SET hnsw.ef_search = 100 指定数据库 持久生效 按数据库定制

(2)ivfflat.probes —— IVFFlat 探测范围控制

ivfflat.probes 控制 IVFFlat 索引在查询时需要搜索多少个聚类(cluster)。值越大,召回率越高,但延迟也越高。

-- 会话级设置
SET ivfflat.probes = 10;       -- 推荐范围:5~50

-- 经验法则:probes ≈ sqrt(lists)
-- 如果创建索引时 lists = 1000,则 probes ≈ 32
-- 如果创建索引时 lists = 100,则 probes ≈ 10

-- 全局设置方式同上
-- postgresql.conf 中添加:
-- ivfflat.probes = 10

调参经验:先用小值(如 ef_search = 40probes = 5)快速得到初步结果,然后根据召回率要求逐步增大。对于在线服务,建议通过实验确定"召回率 vs 延迟"的最佳平衡点,然后在连接池中统一设置。

完整的向量调优配置模板postgresql.conf 追加部分):

# ============================================================
# pgvector 生产环境调优配置模板
# 服务器配置:64GB RAM, 8核 CPU, NVMe SSD
# ============================================================

# --- 内存参数 ---
shared_buffers = 16GB
maintenance_work_mem = 2GB
work_mem = 32MB
effective_cache_size = 48GB

# --- IO 与并发 ---
max_parallel_maintenance_workers = 4
max_parallel_workers_per_gather = 4
max_parallel_workers = 8
random_page_cost = 1.1

# --- WAL 参数 ---
wal_buffers = 64MB
max_wal_size = 4GB
min_wal_size = 1GB
checkpoint_completion_target = 0.9

# --- pgvector 专属参数 ---
hnsw.ef_search = 100
ivfflat.probes = 10

# --- 连接与超时 ---
max_connections = 200
statement_timeout = 5000           # 5秒查询超时保护

5.2 数据分层与冷热存储

当向量数据量达到千万甚至亿级时,单表存储会面临查询性能下降、索引维护成本飙升、备份恢复缓慢等问题。数据分层(Data Tiering)是解决这些问题的核心策略。

5.2.1 千万级向量分表策略

策略一:按时间范围分表

适用于 embedding 数据有明确时间属性的场景(如每日新增的文档向量、用户行为向量等)。

-- 手动分表:按月创建独立表
CREATE TABLE documents_2025_01 (
    id BIGSERIAL PRIMARY KEY,
    title TEXT,
    embedding vector(1536),
    created_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE documents_2025_02 (LIKE documents_2025_01 INCLUDING ALL);
CREATE TABLE documents_2025_03 (LIKE documents_2025_01 INCLUDING ALL);
-- ... 每月一张表

-- 为每张表独立创建向量索引
CREATE INDEX ON documents_2025_01 USING hnsw (embedding vector_cosine_ops);
CREATE INDEX ON documents_2025_02 USING hnsw (embedding vector_cosine_ops);
CREATE INDEX ON documents_2025_03 USING hnsw (embedding vector_cosine_ops);

-- 查询时通过应用层路由到具体表
SELECT id, title, embedding <=> $1 AS distance
FROM documents_2025_06
ORDER BY distance
LIMIT 10;

策略二:按业务分类分表

适用于不同业务域的 embedding 相互独立、很少跨域查询的场景。

-- 按业务类型分表
CREATE TABLE product_embeddings (
    product_id BIGINT PRIMARY KEY,
    embedding vector(1536),
    category TEXT,
    updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE article_embeddings (
    article_id BIGINT PRIMARY KEY,
    embedding vector(1536),
    source TEXT,
    updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE user_embeddings (
    user_id BIGINT PRIMARY KEY,
    embedding vector(768),           -- 用户向量维度可能与商品不同
    segment TEXT,
    updated_at TIMESTAMPTZ DEFAULT now()
);

-- 每张表独立建索引,参数可以按业务需求调整
CREATE INDEX ON product_embeddings USING hnsw (embedding vector_cosine_ops)
  WITH (m = 32, ef_construction = 128);   -- 商品向量精度高,参数调大

CREATE INDEX ON user_embeddings USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);    -- 用户向量精度要求低,参数适中

5.2.2 PostgreSQL 声明式分区表结合向量索引

PostgreSQL 10+ 原生支持声明式分区(Declarative Partitioning),这是比手动分表更优雅的解决方案。分区表对外表现为一张逻辑表,应用层无需关心数据路由。

-- 创建分区主表
CREATE TABLE vectors (
    id BIGSERIAL,
    content TEXT,
    embedding vector(1536),
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    category TEXT
) PARTITION BY RANGE (created_at);

-- 创建月度分区
CREATE TABLE vectors_2025_q1 PARTITION OF vectors
    FOR VALUES FROM ('2025-01-01') TO ('2025-04-01');

CREATE TABLE vectors_2025_q2 PARTITION OF vectors
    FOR VALUES FROM ('2025-04-01') TO ('2025-07-01');

CREATE TABLE vectors_2025_q3 PARTITION OF vectors
    FOR VALUES FROM ('2025-07-01') TO ('2025-10-01');

CREATE TABLE vectors_2025_q4 PARTITION OF vectors
    FOR VALUES FROM ('2025-10-01') TO ('2026-01-01');

-- 为每个分区独立创建向量索引(分区索引比全局索引更小、更快)
CREATE INDEX ON vectors_2025_q1 USING hnsw (embedding vector_cosine_ops);
CREATE INDEX ON vectors_2025_q2 USING hnsw (embedding vector_cosine_ops);
CREATE INDEX ON vectors_2025_q3 USING hnsw (embedding vector_cosine_ops);
CREATE INDEX ON vectors_2025_q4 USING hnsw (embedding vector_cosine_ops);

-- 查询时自动分区裁剪(Partition Pruning)
-- 如果查询包含分区键条件,PostgreSQL 只扫描相关分区
SELECT id, content, embedding <=> $1 AS distance
FROM vectors
WHERE created_at >= '2025-07-01'    -- 只会扫描 Q3、Q4 分区
ORDER BY distance
LIMIT 10;

分区索引 vs 全局索引:在 PostgreSQL 中,分区表的每个分区维护自己独立的索引。这有两个优势:(1)单个索引更小,B-tree/HNSW 层数更少,查询更快;(2)索引维护(REINDEX、VACUUM)可以按分区独立执行,不影响其他分区。

自动创建新分区(使用 pg_partman 扩展或定时任务):

-- 使用 pg_partman 自动管理分区
CREATE EXTENSION pg_partman;

SELECT partman.create_parent(
    p_parent_table   => 'public.vectors',
    p_control        => 'created_at',
    p_type           => 'range',
    p_interval       => '3 months',
    p_premake        => 2        -- 提前创建未来 2 个分区
);

-- 或者用简单的定时 SQL 手动创建下季度分区
-- 建议在 pg_cron 或外部调度中执行
CREATE TABLE IF NOT EXISTS vectors_2026_q1
    PARTITION OF vectors
    FOR VALUES FROM ('2026-01-01') TO ('2026-04-01');

CREATE INDEX IF NOT EXISTS vectors_2026_q1_hnsw_idx
    ON vectors_2026_q1 USING hnsw (embedding vector_cosine_ops);

5.2.3 过期向量定时清理

向量数据随时间积累,过期数据不仅占用存储空间,还会拖慢全表扫描和索引维护的性能。定时清理是必要的运维操作。

方案一:使用 pg_cron 定时清理

-- 安装 pg_cron 扩展
CREATE EXTENSION IF NOT EXISTS pg_cron;

-- 每天凌晨 2 点清理 6 个月前的向量数据
SELECT cron.schedule(
    'cleanup-old-vectors',          -- 任务名称
    '0 2 * * *',                     -- cron 表达式:每天 02:00
    $$
    DELETE FROM vectors
    WHERE created_at < now() - INTERVAL '6 months'
    $$
);

-- 清理后回收空间(每周日凌晨 3 点)
SELECT cron.schedule(
    'vacuum-vectors',
    '0 3 * * 0',
    $$VACUUM ANALYZE vectors$$
);

-- 查看已注册的定时任务
SELECT * FROM cron.job;

-- 取消定时任务
SELECT cron.unschedule('cleanup-old-vectors');

方案二:分区表快速 DROP(推荐用于大数据量)

对于分区表,直接删除(DETACH + DROP)过期分区比逐行 DELETE 快几个数量级:

-- 快速移除过期分区(O(1) 操作,不产生大量 WAL)
ALTER TABLE vectors DETACH PARTITION vectors_2024_q1;
DROP TABLE vectors_2024_q1;

-- 如果需要先归档再删除
ALTER TABLE vectors DETACH PARTITION vectors_2024_q1;
-- 使用 pg_dump 备份该分区
-- pg_dump -t vectors_2024_q1 mydb > vectors_2024_q1_backup.sql
DROP TABLE vectors_2024_q1;

DELETE vs DETACH+DROP:对于千万级分区,DELETE FROM partition WHERE ... 可能需要数小时,期间产生大量 WAL 日志并导致表膨胀。而 DETACH PARTITION + DROP TABLE 是元数据操作,几乎瞬间完成,不产生额外的 WAL 日志。

方案三:外部调度脚本(适用于不使用 pg_cron 的环境)

#!/usr/bin/env python3
"""vector_cleanup.py - 过期向量清理脚本"""
import psycopg2
from datetime import datetime, timedelta

DB_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "dbname": "vectordb",
    "user": "admin",
    "password": "secure_password"
}

RETENTION_DAYS = 180
BATCH_SIZE = 10000  # 分批删除,避免长事务

def cleanup_expired_vectors():
    conn = psycopg2.connect(**DB_CONFIG)
    conn.autocommit = False
    cur = conn.cursor()

    cutoff = datetime.now() - timedelta(days=RETENTION_DAYS)
    total_deleted = 0

    while True:
        cur.execute("""
            DELETE FROM documents
            WHERE id IN (
                SELECT id FROM documents
                WHERE created_at < %s
                LIMIT %s
            )
        """, (cutoff, BATCH_SIZE))
        deleted = cur.rowcount
        conn.commit()
        total_deleted += deleted

        if deleted < BATCH_SIZE:
            break
        print(f"已删除 {total_deleted} 条过期向量...")

    # 更新统计信息
    cur.execute("ANALYZE documents")
    conn.commit()

    print(f"清理完成,共删除 {total_deleted} 条记录")
    cur.close()
    conn.close()

if __name__ == "__main__":
    cleanup_expired_vectors()
# crontab 配置:每天凌晨 2 点执行
0 2 * * * /usr/bin/python3 /opt/scripts/vector_cleanup.py >> /var/log/vector_cleanup.log 2>&1

5.2.4 冷热分离架构

在存储成本敏感的生产环境中,将数据按访问频率分层存储是一种常见策略。

热数据(Hot Tier):近期的高频查询数据,存储在 SSD 上,使用 HNSW 索引保障查询延迟。

冷数据(Cold Tier):历史低频数据,存储在 HDD 上或归档压缩,使用 IVFFlat 索引或完全去除向量索引。

-- 热数据表:最近 3 个月的向量,SSD 存储,HNSW 索引
CREATE TABLE vectors_hot (
    id BIGSERIAL PRIMARY KEY,
    embedding vector(1536) NOT NULL,
    content TEXT,
    created_at TIMESTAMPTZ DEFAULT now(),
    access_count INT DEFAULT 0      -- 访问计数器
);

-- HNSW 索引,高精度参数
CREATE INDEX ON vectors_hot USING hnsw (embedding vector_cosine_ops)
  WITH (m = 32, ef_construction = 128);

-- 放在 SSD 表空间
ALTER TABLE vectors_hot SET TABLESPACE ssd_tablespace;
ALTER INDEX vectors_hot_embedding_idx SET TABLESPACE ssd_tablespace;

-- 冷数据表:3 个月前的向量,HDD 存储,IVFFlat 索引或无索引
CREATE TABLE vectors_cold (
    id BIGINT PRIMARY KEY,
    embedding vector(1536) NOT NULL,
    content TEXT,
    created_at TIMESTAMPTZ,
    archived_at TIMESTAMPTZ DEFAULT now()
);

-- IVFFlat 索引,节省存储和内存
CREATE INDEX ON vectors_cold USING ivfflat (embedding vector_cosine_ops)
  WITH (lists = 500);

-- 放在 HDD 表空间
ALTER TABLE vectors_cold SET TABLESPACE hdd_tablespace;

数据流转脚本(热→冷迁移):

-- 定期将热数据中的过期记录迁移到冷表
BEGIN;

-- 1. 将 3 个月前的热数据插入冷表
INSERT INTO vectors_cold (id, embedding, content, created_at)
SELECT id, embedding, content, created_at
FROM vectors_hot
WHERE created_at < now() - INTERVAL '3 months';

-- 2. 从热表中删除
DELETE FROM vectors_hot
WHERE created_at < now() - INTERVAL '3 months';

-- 3. 回收热表空间
VACUUM ANALYZE vectors_hot;

COMMIT;

创建表空间

-- 创建 SSD 表空间(需要操作系统先挂载 SSD)
CREATE TABLESPACE ssd_tablespace LOCATION '/mnt/ssd/pg_data';

-- 创建 HDD 表空间
CREATE TABLESPACE hdd_tablespace LOCATION '/mnt/hdd/pg_data';

架构建议:对于大多数团队,不需要一开始就实现冷热分离。当数据量超过 1000 万条、存储成本成为明显负担时再引入分层存储。过早优化会增加系统复杂度。


5.3 运维监控与备份恢复

5.3.1 关键监控指标及监控 SQL

(1)检索延迟监控 —— 使用 pg_stat_statements

pg_stat_statements 是 PostgreSQL 内置的查询性能分析扩展,可以统计每条 SQL 的执行次数、平均耗时、最大耗时等。

-- 安装扩展
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;

-- 查看向量检索相关查询的性能统计
SELECT
    query,
    calls AS total_calls,
    round(mean_exec_time::numeric, 2) AS avg_ms,
    round(max_exec_time::numeric, 2) AS max_ms,
    round(min_exec_time::numeric, 2) AS min_ms,
    round(stddev_exec_time::numeric, 2) AS stddev_ms,
    rows AS total_rows
FROM pg_stat_statements
WHERE query LIKE '%<=>%'
   OR query LIKE '%<#>%'
   OR query LIKE '%<+>%'
   OR query LIKE '%vector%'
ORDER BY mean_exec_time DESC
LIMIT 20;

-- 重置统计信息(定期重置,避免历史数据累积)
SELECT pg_stat_statements_reset();

(2)索引命中率监控

-- 检查向量索引的使用情况
SELECT
    schemaname,
    relname AS table_name,
    indexrelname AS index_name,
    idx_scan AS index_scans,         -- 索引扫描次数
    idx_tup_read AS tuples_read,     -- 通过索引读取的行数
    idx_tup_fetch AS tuples_fetched  -- 通过索引实际获取的行数
FROM pg_stat_user_indexes
WHERE indexrelname LIKE '%hnsw%'
   OR indexrelname LIKE '%ivfflat%'
   OR indexrelname LIKE '%vector%'
ORDER BY idx_scan DESC;

-- 检查索引是否被有效使用
-- 如果 idx_scan 长期为 0,说明索引未被使用(可能是查询无法命中索引)

-- 检查表的索引扫描 vs 全表扫描比例
SELECT
    schemaname,
    relname AS table_name,
    seq_scan AS full_table_scans,
    idx_scan AS index_scans,
    CASE WHEN seq_scan + idx_scan > 0
         THEN round(100.0 * idx_scan / (seq_scan + idx_scan), 1)
         ELSE 0
    END AS index_hit_rate_pct
FROM pg_stat_user_tables
WHERE relname LIKE '%vector%'
   OR relname LIKE '%embedding%'
   OR relname LIKE '%document%'
ORDER BY index_hit_rate_pct ASC;

(3)表膨胀率检测 —— 使用 pgstattuple

PostgreSQL 的 MVCC(多版本并发控制)机制意味着 DELETE 和 UPDATE 不会真正删除旧行,而是标记为"死元组"(dead tuples)。大量的死元组会导致表膨胀,向量索引的检索效率也会下降。

-- 安装 pgstattuple 扩展
CREATE EXTENSION IF NOT EXISTS pgstattuple;

-- 检查向量表的膨胀情况
SELECT
    table_len,
    tuple_count,
    tuple_len,
    round(100.0 * tuple_len / NULLIF(table_len, 0), 1) AS live_pct,
    dead_tuple_count,
    dead_tuple_len,
    round(100.0 * dead_tuple_len / NULLIF(table_len, 0), 1) AS dead_pct,
    free_space,
    round(100.0 * free_space / NULLIF(table_len, 0), 1) AS free_pct
FROM pgstattuple('documents');

-- 如果 dead_pct > 20%,说明需要执行 VACUUM FULL 或等待 autovacuum 清理
-- 如果 free_pct 很高但 dead_pct 低,说明空间已被标记为可复用,等待 autovacuum

(4)写入吞吐监控

-- 监控向量表的写入活动
SELECT
    schemaname,
    relname AS table_name,
    n_tup_ins AS inserts,      -- 插入行数
    n_tup_upd AS updates,      -- 更新行数
    n_tup_del AS deletes,      -- 删除行数
    n_live_tup AS live_tuples, -- 当前活跃行数
    n_dead_tup AS dead_tuples, -- 当前死元组数
    last_vacuum,
    last_autovacuum,
    last_analyze,
    last_autoanalyze
FROM pg_stat_user_tables
WHERE relname LIKE '%vector%'
   OR relname LIKE '%embedding%'
ORDER BY n_tup_ins DESC;

(5)内存与连接监控

-- 当前活跃连接及状态
SELECT
    pid,
    usename,
    application_name,
    client_addr,
    state,
    wait_event_type,
    wait_event,
    query_start,
    now() - query_start AS duration,
    left(query, 100) AS query_preview
FROM pg_stat_activity
WHERE state != 'idle'
ORDER BY duration DESC;

-- 连接数统计
SELECT
    state,
    count(*) AS count,
    count(*) FILTER (WHERE query LIKE '%vector%' OR query LIKE '%<=>%')
        AS vector_queries
FROM pg_stat_activity
GROUP BY state;

-- 检查是否有锁等待
SELECT
    blocked_locks.pid AS blocked_pid,
    blocked_activity.usename AS blocked_user,
    blocking_locks.pid AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_query
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
    AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;

5.3.2 备份恢复

(1)逻辑备份 —— pg_dump

# 备份整个数据库(包含向量数据和索引定义)
pg_dump -h localhost -U postgres -Fc vectordb > vectordb_backup.dump

# 只备份特定向量表
pg_dump -h localhost -U postgres -t documents -Fc vectordb > documents_backup.dump

# 备份表结构(不包含数据,用于查看索引定义)
pg_dump -h localhost -U postgres -s vectordb > schema_backup.sql

向量数据在 pg_dump 中的格式:pg_dump 导出向量数据时,会以数组字面量格式存储,例如 [0.01234, -0.05678, 0.09012, ...]。这是 pgvector 的标准文本表示格式。逻辑备份的文件可能非常大(1536 维的 float32 向量,每条约占 6KB 文本空间),100 万条向量约产生 6GB 的 SQL dump 文件。

# 恢复逻辑备份
pg_restore -h localhost -U postgres -d vectordb_restored vectordb_backup.dump

# 注意事项:
# 1. 恢复时会自动重建向量索引,大数据量下可能需要较长时间
# 2. 如果只恢复数据不恢复索引(加快恢复速度):
pg_restore --no-indexes -h localhost -U postgres -d vectordb_restored vectordb_backup.dump
# 然后手动创建索引(可以调大 maintenance_work_mem 加速)

(2)物理备份 —— pg_basebackup

物理备份复制的是 PostgreSQL 的底层数据文件,恢复速度比逻辑备份快得多,适合生产环境。

# 执行基础物理备份
pg_basebackup -h localhost -U replicator -D /backup/pg_base_20250612 \
    -Ft -z -P -Xstream

# 参数说明:
# -Ft     : tar 格式
# -z      : gzip 压缩
# -P      : 显示进度
# -Xstream: 流式传输 WAL 日志,保证备份一致性

# 物理备份包含所有向量数据和索引的完整二进制文件
# 恢复时不需要重建索引(索引文件直接复制)

(3)跨实例向量数据迁移

将向量数据从一台 PostgreSQL 实例迁移到另一台(如从开发环境迁移到生产环境,或从旧版本升级):

# 步骤 1:在源实例导出数据(不含索引,加快导出速度)
pg_dump -h source_host -U postgres -t documents --no-indexes -Fc vectordb > docs_data.dump

# 步骤 2:在目标实例创建数据库和扩展
psql -h target_host -U postgres -c "CREATE DATABASE vectordb_new;"
psql -h target_host -U postgres -d vectordb_new -c "CREATE EXTENSION vector;"

# 步骤 3:恢复数据
pg_restore -h target_host -U postgres -d vectordb_new docs_data.dump

# 步骤 4:在目标实例创建索引(利用目标服务器的计算资源)
psql -h target_host -U postgres -d vectordb_new << 'EOF'
SET maintenance_work_mem = '2GB';
SET max_parallel_maintenance_workers = 4;
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);
ANALYZE documents;
EOF

# 步骤 5:验证数据完整性
psql -h target_host -U postgres -d vectordb_new -c "
SELECT count(*) AS total_rows FROM documents;
SELECT avg(vector_dims(embedding)) AS avg_dims FROM documents;
"

5.3.3 索引维护

(1)REINDEX —— 定期重建索引

随着时间的推移,HNSW 图结构可能因为频繁的 INSERT/UPDATE/DELETE 操作而产生"图退化"——节点间的连接关系不再是全局最优的,导致检索精度下降。定期重建索引可以恢复最佳图结构。

-- 重建单个向量索引
REINDEX INDEX CONCURRENTLY documents_embedding_idx;

-- CONCURRENTLY 关键字很重要:
-- 它允许在重建索引期间继续处理查询(不锁定表)
-- 但重建时间会更长

-- 重建整个表的所有索引
REINDEX TABLE CONCURRENTLY documents;

-- 建议:每月或每季度执行一次 REINDEX,或者在大批量数据导入后执行

(2)VACUUM FULL vs VACUUM

特性 VACUUM VACUUM FULL
锁级别 共享锁(不阻塞查询) 排他锁(阻塞所有读写)
功能 标记死元组空间为可复用 重写整个表,回收磁盘空间
磁盘空间 不释放给操作系统 释放给操作系统
耗时 快(分钟级) 慢(可能小时级)
生产可用性 可在线执行 需要停机窗口
-- 日常维护:VACUUM(由 autovacuum 自动执行)
VACUUM ANALYZE documents;

-- 深度清理:VACUUM FULL(需要维护窗口)
-- 注意:VACUUM FULL 会锁定表,期间无法读写!
-- 建议在业务低峰期执行
VACUUM FULL documents;

-- 替代方案:使用 pg_repack 在线重组(无需排他锁)
-- 安装:CREATE EXTENSION pg_repack;
-- 执行:pg_repack -t documents -d vectordb

(3)autovacuum 参数调优

对于频繁写入的向量表,默认的 autovacuum 触发阈值可能太保守(20% 的行变更才触发),导致死元组积累过多。

-- 为特定向量表设置更激进的 autovacuum 参数
ALTER TABLE documents SET (
    autovacuum_vacuum_threshold = 1000,        -- 基础阈值:1000 行
    autovacuum_vacuum_scale_factor = 0.01,     -- 比例因子:1%(默认 20%)
    autovacuum_analyze_threshold = 500,         -- 分析阈值
    autovacuum_analyze_scale_factor = 0.005,   -- 分析比例因子
    autovacuum_vacuum_cost_delay = 2,          -- 降低延迟,加快清理速度
    autovacuum_vacuum_cost_limit = 1000        -- 提高成本限制
);

-- 含义:当 1000 + 1% * 总行数 的行发生变更时,触发 autovacuum
-- 例如 100 万行的表,当 11000 行变更时就会触发(而非默认的 201000 行)

(4)索引碎片清理策略

-- 检查索引的碎片情况(通过 pgstatindex 函数)
CREATE EXTENSION IF NOT EXISTS pgstattuple;

-- 对于 HNSW 索引,主要关注索引大小变化
SELECT
    indexrelname,
    pg_size_pretty(pg_relation_size(indexrelid)) AS index_size,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes
WHERE relname = 'documents';

-- 如果索引大小持续增长但数据量未显著增加,说明存在碎片
-- 此时应执行 REINDEX CONCURRENTLY 重建索引

5.4 生产高频坑点汇总

以下汇总了在生产环境中部署 pgvector 时最常见的坑点,每个坑点都详细说明现象、根因和解决方案。

5.4.1 HNSW 索引写入卡顿/内存溢出

现象

  • INSERT 或 UPDATE 操作超时(超过 statement_timeout
  • 日志中出现 ERROR: out of memoryERROR: index row size exceeds maximum
  • 写入操作在正常速度运行一段时间后突然变慢
  • 系统内存使用率飙升,触发 OOM Killer

根因分析

HNSW 索引是一个多层跳表图结构。每次 INSERT 一条新的向量时,HNSW 算法需要:

  1. 从最高层开始,搜索最近的邻居节点(greedy search)
  2. 逐层向下,在每一层找到最近邻并建立连接
  3. 在最底层(第 0 层)建立 m 个双向连接
  4. 如果某层连接数超过 m_max,需要裁剪(prune)最弱连接

ef_construction 设置较大(如 256 以上)时,步骤 1-2 的搜索范围很大,需要在内存中维护大量的候选节点和距离计算结果。同时,maintenance_work_mem 如果不足以容纳这些中间数据,PostgreSQL 会尝试使用磁盘临时文件,导致性能急剧下降。

解决方案

-- 方案 1:调大 maintenance_work_mem(治标)
SET maintenance_work_mem = '2GB';

-- 方案 2:降低 ef_construction(治本,但会降低索引质量)
-- 重建索引时使用较小的 ef_construction
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 40);  -- 从 64 降到 40

-- 方案 3:分批写入(推荐的生产实践)
-- 不要在已有 HNSW 索引的情况下大批量 INSERT
-- 而是先删除索引,批量导入,再重建索引

-- 步骤 1:删除索引
DROP INDEX documents_embedding_idx;

-- 步骤 2:批量导入(无索引维护,速度快)
\COPY documents (title, embedding) FROM '/data/embeddings.csv' CSV;

-- 步骤 3:重建索引
SET maintenance_work_mem = '2GB';
SET max_parallel_maintenance_workers = 4;
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

-- 方案 4:增量写入时控制批次大小
-- 每次 INSERT 不超过 1000 行,间隔 100ms
DO $$
DECLARE
    batch_size INT := 1000;
    offset_val INT := 0;
    total INT;
BEGIN
    SELECT count(*) INTO total FROM staging_vectors;
    WHILE offset_val < total LOOP
        INSERT INTO documents (title, embedding)
        SELECT title, embedding
        FROM staging_vectors
        ORDER BY id
        LIMIT batch_size OFFSET offset_val;

        offset_val := offset_val + batch_size;
        PERFORM pg_sleep(0.1);  -- 间隔 100ms,让 autovacuum 有时间工作
    END LOOP;
END $$;

5.4.2 向量索引不生效的 5 种常见原因

坑点 1:操作符类与距离运算符不匹配

这是最常见的坑。pgvector 的索引创建时需要指定操作符类(operator class),每种操作符类只对应一种距离度量。如果查询使用的距离运算符与索引的操作符类不匹配,索引就不会被使用。

-- 错误示例:创建了 L2 距离索引,但查询用余弦距离
CREATE INDEX ON docs USING hnsw (embedding vector_l2_ops);

-- 这条查询不会使用上面的索引!
SELECT id FROM docs ORDER BY embedding <=> '[0.1, 0.2]'::vector LIMIT 5;
-- <=> 是余弦距离运算符,但索引是 L2 距离的

-- 正确做法:确保操作符类和运算符匹配
-- 余弦距离:vector_cosine_ops + <=>
CREATE INDEX ON docs USING hnsw (embedding vector_cosine_ops);
SELECT id FROM docs ORDER BY embedding <=> '[0.1, 0.2]'::vector LIMIT 5;

-- L2 距离:vector_l2_ops + <->
CREATE INDEX ON docs USING hnsw (embedding vector_l2_ops);
SELECT id FROM docs ORDER BY embedding <-> '[0.1, 0.2]'::vector LIMIT 5;

-- 内积距离:vector_ip_ops + <#>(注意返回负内积)
CREATE INDEX ON docs USING hnsw (embedding vector_ip_ops);
SELECT id FROM docs ORDER BY embedding <#> '[0.1, 0.2]'::vector LIMIT 5;

-- L1 距离:vector_l1_ops + <+>
CREATE INDEX ON docs USING hnsw (embedding vector_l1_ops);
SELECT id FROM docs ORDER BY embedding <+> '[0.1, 0.2]'::vector LIMIT 5;

操作符类完整对照表

距离度量 操作符类 运算符 说明
余弦距离 vector_cosine_ops <=> 1 - cos(a, b),范围 [0, 2]
欧氏距离 vector_l2_ops <-> sqrt(sum((a-b)^2))
负内积 vector_ip_ops <#> 返回负值,越小内积越大
L1 距离 vector_l1_ops <+> sum(abs(a-b))

坑点 2:查询中向量字段经过了函数转换

如果查询对向量列应用了函数(如 CASTnormalize()),PostgreSQL 无法直接使用基于原始列的索引。

-- 错误:对向量列使用了函数转换
CREATE INDEX ON docs USING hnsw (embedding vector_cosine_ops);

-- 不会走索引!因为 embedding 被 l2_normalize() 转换了
SELECT id FROM docs
ORDER BY l2_normalize(embedding) <=> '[0.1, 0.2]'::vector
LIMIT 5;

-- 正确做法 A:存储时就做好归一化,查询时直接用原始列
-- 写入时:
INSERT INTO docs (embedding) VALUES (l2_normalize('[1, 2, 3]'::vector));
-- 查询时:
SELECT id FROM docs ORDER BY embedding <=> '[0.1, 0.2]'::vector LIMIT 5;

-- 正确做法 B:创建表达式索引(如果确实需要动态转换)
CREATE INDEX ON docs USING hnsw (l2_normalize(embedding) vector_cosine_ops);
SELECT id FROM docs
ORDER BY l2_normalize(embedding) <=> '[0.1, 0.2]'::vector
LIMIT 5;

坑点 3:probes / ef_search 设置过小

-- IVFFlat 索引,probes 设置过小导致无法命中正确的聚类
SET ivfflat.probes = 1;  -- 只搜索 1 个聚类,极可能错过最近邻

-- HNSW 索引,ef_search 过小导致搜索宽度不够
SET hnsw.ef_search = 1;  -- 几乎等于贪婪搜索,召回率极低

-- 诊断方法:使用 EXPLAIN ANALYZE 检查执行计划
EXPLAIN ANALYZE
SELECT id FROM docs ORDER BY embedding <=> '[0.1, 0.2]'::vector LIMIT 5;

-- 如果看到 "Index Scan using ... (never executed)" 或者执行计划中没有
-- 使用索引,检查上述参数设置

坑点 4:IVFFlat 索引在无数据时创建(空聚类问题)

-- 错误做法:先创建 IVFFlat 索引,再导入数据
CREATE TABLE docs (id SERIAL, embedding vector(3));
CREATE INDEX ON docs USING ivfflat (embedding vector_l2_ops) WITH (lists = 100);
-- 此时表为空!K-means 聚类产生空聚类中心,索引质量极差

INSERT INTO docs (embedding) SELECT ... FROM generate_series(1, 100000);
-- 数据导入后,索引基于空数据的聚类中心,效果很差

-- 正确做法:先导入数据,再创建 IVFFlat 索引
CREATE TABLE docs (id SERIAL, embedding vector(3));
INSERT INTO docs (embedding) SELECT ... FROM generate_series(1, 100000);
-- 数据导入完成后再创建索引
CREATE INDEX ON docs USING ivfflat (embedding vector_l2_ops) WITH (lists = 100);
-- K-means 基于实际数据聚类,索引质量好

-- 如果已经犯了空表建索引的错误,重建索引
REINDEX INDEX docs_embedding_idx;

为什么 HNSW 没有这个问题:HNSW 是逐条插入构建图结构的,每条新数据都会更新图的连接关系,因此先建索引后导入数据没有问题(只是导入速度较慢)。而 IVFFlat 的聚类中心是一次性计算的,空表上计算出的聚类中心毫无意义。

坑点 5:数据量太小,优化器选择全表扫描

-- 当表中只有几百条数据时,PostgreSQL 优化器可能认为
-- "全表扫描比走索引更快"(因为索引扫描有额外的随机 IO 开销)

-- 诊断:
EXPLAIN ANALYZE
SELECT id FROM docs ORDER BY embedding <=> '[0.1, 0.2]'::vector LIMIT 5;

-- 如果结果是 Seq Scan 而非 Index Scan,可能是数据量太小
-- 对于小表(< 几千行),全表扫描可能确实更快,这不是问题

-- 如果想强制使用索引(仅用于测试):
SET enable_seqscan = off;
EXPLAIN ANALYZE
SELECT id FROM docs ORDER BY embedding <=> '[0.1, 0.2]'::vector LIMIT 5;
-- 注意:测试完成后一定要恢复
SET enable_seqscan = on;

万能诊断流程

-- 当怀疑索引不生效时,按以下步骤排查:

-- 第 1 步:确认索引存在
SELECT indexname, indexdef
FROM pg_indexes
WHERE tablename = 'documents';

-- 第 2 步:检查执行计划
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT id, title
FROM documents
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;

-- 第 3 步:检查操作符类是否匹配
-- 查看索引定义中的 ops 类型,对比查询中使用的运算符

-- 第 4 步:检查运行时参数
SHOW hnsw.ef_search;
SHOW ivfflat.probes;

-- 第 5 步:检查索引统计
SELECT idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
WHERE relname = 'documents';

5.4.3 高并发下检索超时与连接风暴

现象

  • 并发超过一定阈值后,查询延迟急剧上升
  • 出现 FATAL: sorry, too many clients already 错误
  • 大量连接处于 idle in transaction 状态
  • 部分查询超过 statement_timeout 被强制取消

根因:每个 PostgreSQL 连接对应一个独立的后端进程(fork 模型),每个进程占用约 5-10MB 内存。200 个并发连接就需要 1-2GB 的进程内存,加上 shared_buffers 等共享内存,很容易耗尽系统资源。

解决方案:PgBouncer 连接池

; pgbouncer.ini

[databases]
vectordb = host=127.0.0.1 port=5432 dbname=vectordb

[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt

; 连接池模式
pool_mode = transaction       ; 事务级复用(推荐)
; pool_mode = session         ; 会话级(兼容 SET 命令,但复用率低)

; 连接数控制
max_client_conn = 1000        ; 允许的最大客户端连接数
default_pool_size = 50        ; 每个数据库的默认连接池大小
min_pool_size = 10            ; 最小保留连接数
reserve_pool_size = 10        ; 预留连接数(突发流量)
reserve_pool_timeout = 3      ; 预留连接等待时间(秒)

; 超时设置
server_idle_timeout = 300     ; 服务端空闲连接超时(秒)
client_idle_timeout = 600     ; 客户端空闲连接超时
query_timeout = 5             ; 查询超时(秒)
query_wait_timeout = 120      ; 等待可用连接的超时

; 对于向量检索场景,如果应用使用 SET hnsw.ef_search,
; 需要使用 session 模式或在每个事务开始时重新 SET

应用层超时保护

-- 全局查询超时保护(防止单个慢查询占满连接)
ALTER DATABASE vectordb SET statement_timeout = '5000';  -- 5 秒

-- 特定查询超时
SET LOCAL statement_timeout = '2000';  -- 2 秒(仅当前事务)
SELECT id, title
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 10;

-- 锁等待超时
SET LOCAL lock_timeout = '1000';  -- 1 秒

连接复用策略选择

模式 说明 适用场景 pgvector 兼容性
session 连接在整个会话期间复用 应用使用 SET 命令设置 ef_search/probes 完全兼容
transaction 连接在事务结束后释放回池 高频短事务 SET 命令需在事务内用 SET LOCAL
statement 每条 SQL 执行完就释放 简单查询 不推荐,SET 命令立即失效

pgvector 与连接池的配合建议:由于 hnsw.ef_searchivfflat.probes 是会话级参数,使用 transaction 模式时,需要在每个事务开始时重新设置。推荐的做法是将这些参数设为数据库级别或角色级别的默认值(ALTER DATABASE/ROLE SET ...),这样无论使用哪种连接池模式都能生效。

5.4.4 召回率下降排查

现象

  • 同样的查询,返回的结果与预期不符(缺少了"应该有"的相似结果)
  • 上线一段时间后,检索质量似乎逐渐下降
  • A/B 测试发现推荐/搜索的点击率下降

排查清单

(1)检查索引参数是否合理

-- 查看当前索引的构建参数
SELECT indexname, indexdef
FROM pg_indexes
WHERE tablename = 'documents'
  AND indexdef LIKE '%hnsw%' OR indexdef LIKE '%ivfflat%';

-- HNSW 索引:m 和 ef_construction 越大,索引质量越高
-- 如果 m=8, ef_construction=32,对于高维向量(>512维)可能不够
-- 建议:m >= 16, ef_construction >= 64

-- IVFFlat 索引:lists 参数决定聚类数量
-- 如果 lists 过小(如 10),每个聚类包含太多向量,精度低
-- 经验公式:lists = sqrt(N),N 为数据量
-- 100 万条数据:lists ≈ 1000
-- 1000 万条数据:lists ≈ 3162

(2)检查向量是否需要归一化

-- 如果原始向量未归一化,使用余弦距离可能产生不准确的结果
-- 检查向量的 L2 范数
SELECT
    avg(l2_norm(embedding)) AS avg_norm,
    min(l2_norm(embedding)) AS min_norm,
    max(l2_norm(embedding)) AS max_norm
FROM documents
LIMIT 10000;

-- 如果 avg_norm 不接近 1.0,说明向量未归一化
-- 解决方案:存储时归一化
UPDATE documents
SET embedding = l2_normalize(embedding)
WHERE l2_norm(embedding) BETWEEN 0.99 AND 1.01 = false;

-- 或者改用 L2 距离(对未归一化向量更鲁棒)
-- 但需要注意 L2 距离和余弦距离的语义差异

(3)检查 probes / ef_search 是否足够

-- 使用"暴力搜索"(无索引)作为基准,对比索引搜索结果
-- 先关闭索引扫描,强制全表搜索得到"真实"的 Top-K
SET enable_indexscan = off;
SET enable_indexonlyscan = off;

SELECT id, title, embedding <=> query_vec AS distance
FROM documents, (SELECT '[0.1, 0.2, ...]'::vector AS query_vec) q
ORDER BY distance
LIMIT 10;
-- 记录这些结果作为"黄金标准"

-- 重新开启索引扫描
SET enable_indexscan = on;
SET enable_indexonlyscan = on;

-- 使用索引搜索
SET hnsw.ef_search = 40;   -- 先用小值
SELECT id, title, embedding <=> query_vec AS distance
FROM documents, (SELECT '[0.1, 0.2, ...]'::vector AS query_vec) q
ORDER BY distance
LIMIT 10;

-- 逐步增大 ef_search,直到结果与"黄金标准"一致
SET hnsw.ef_search = 100;
SET hnsw.ef_search = 200;
-- 当 ef_search = 200 时结果与暴力搜索一致,说明 200 是足够的

(4)检查数据分布是否发生变化

-- 数据分布漂移可能导致 IVFFlat 的聚类中心不再最优
-- 检查最近新增数据与历史数据的向量分布差异

-- 随机采样检查向量的各维度统计
SELECT
    d AS dim_index,
    avg(val) AS avg_val,
    stddev(val) AS stddev_val,
    min(val) AS min_val,
    max(val) AS max_val
FROM documents,
     LATERAL unnest(embedding::real[]) WITH ORDINALITY AS t(val, d)
WHERE d IN (1, 2, 3, 100, 500)   -- 抽样检查几个维度
GROUP BY d
ORDER BY d;

-- 如果发现分布发生显著变化(如某个维度的均值从 0 变为 1),
-- 说明 IVFFlat 的聚类中心可能已过时,需要 REINDEX 重建

REINDEX INDEX CONCURRENTLY documents_embedding_idx;

第六阶段:高阶拓展与对比选型

在掌握了 pgvector 的核心功能和生产调优之后,本阶段将探索更高阶的使用场景:如何将向量检索与全文检索、标签过滤等能力结合,如何将 pgvector 扩展到分布式架构,以及在什么场景下应该选择其他向量数据库。


6.1 混合检索高阶能力

纯粹的向量检索(语义搜索)在很多场景下并不够。用户可能输入精确的产品型号(关键词搜索更擅长),也可能用自然语言描述需求(语义搜索更擅长)。混合检索(Hybrid Search)将两者结合,实现"1+1>2"的效果。

6.1.1 PostgreSQL 全文检索基础

PostgreSQL 内置了强大的全文检索能力,基于 tsvector(文本向量)和 tsquery(查询表达式)两个核心数据类型。

-- tsvector:将文本分解为词元(lexeme)的有序列表
SELECT to_tsvector('english', 'The quick brown fox jumps over the lazy dog');
-- 结果:'brown':3 'dog':9 'fox':4 'jump':5 'lazi':8 'quick':2

-- tsquery:构造查询表达式
SELECT to_tsquery('english', 'quick & fox');
-- 结果:'quick' & 'fox'

-- 匹配操作 @@
SELECT to_tsvector('english', 'The quick brown fox')
    @@ to_tsquery('english', 'quick & fox') AS match;
-- 结果:true

-- 相关度排序
SELECT
    title,
    ts_rank(
        to_tsvector('english', title || ' ' || content),
        to_tsquery('english', 'database & performance')
    ) AS rank
FROM articles
WHERE to_tsvector('english', title || ' ' || content)
    @@ to_tsquery('english', 'database & performance')
ORDER BY rank DESC
LIMIT 10;

-- 创建 GIN 索引加速全文检索
CREATE INDEX ON articles USING gin (to_tsvector('english', title || ' ' || content));

6.1.2 pg_trgm 扩展实现模糊文本搜索

pg_trgm 扩展基于三元组(trigram)匹配,支持 LIKEILIKE 和正则表达式的索引加速,适合产品名称、SKU 编号等精确匹配场景。

-- 安装 pg_trgm
CREATE EXTENSION IF NOT EXISTS pg_trgm;

-- 创建 GIN 三元组索引
CREATE INDEX ON products USING gin (name gin_trgm_ops);

-- 模糊搜索(使用索引)
SELECT name, similarity(name, 'iphnoe 15') AS sim  -- 注意拼写错误
FROM products
WHERE name % 'iphnoe 15'   -- % 运算符:相似度超过阈值则匹配
ORDER BY sim DESC
LIMIT 5;
-- 结果可能包含 'iPhone 15'(自动纠正拼写错误)

-- 结合 LIKE 查询
SELECT name
FROM products
WHERE name ILIKE '%iphon%'    -- 使用 GIN 索引加速
ORDER BY similarity(name, 'iphone') DESC;

6.1.3 双路召回 + RRF 融合排序架构

RRF(Reciprocal Rank Fusion) 是目前最流行的混合检索融合算法,由 Gordon Cormack 等人在 2009 年提出。其核心思想极其简洁:对于每个文档,将其在多个检索通道中的排名取倒数后求和。

RRF 公式

RRF_score(d) = SUM(1 / (k + rank_i(d)))

其中 k 是平滑常数(通常取 60),rank_i(d) 是文档 d 在第 i 个检索通道中的排名(从 1 开始)。

为什么 RRF 有效:排名靠前的文档获得更高的分数,排名靠后的文档分数迅速衰减。相比简单的分数归一化加权,RRF 不需要对不同检索通道的分数进行归一化(因为不同通道的分数量纲完全不同),直接使用排名信息,鲁棒性更强。

完整的混合检索 SQL 实现

-- 混合检索:关键词搜索 + 语义向量搜索 + RRF 融合

-- 假设我们有一个文档表,同时包含全文索引和向量索引
CREATE TABLE documents (
    id BIGSERIAL PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    embedding vector(1536),
    created_at TIMESTAMPTZ DEFAULT now()
);

-- 全文检索索引
ALTER TABLE documents ADD COLUMN search_vector tsvector
    GENERATED ALWAYS AS (
        to_tsvector('english', coalesce(title, '') || ' ' || coalesce(content, ''))
    ) STORED;
CREATE INDEX ON documents USING gin (search_vector);

-- 向量索引
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- 混合检索查询
WITH
-- 通道 1:关键词检索(BM25 / ts_rank 排序)
keyword_results AS (
    SELECT
        id,
        title,
        ts_rank(search_vector, query) AS keyword_score,
        row_number() OVER (
            ORDER BY ts_rank(search_vector, query) DESC
        ) AS keyword_rank
    FROM documents, to_tsquery('english', 'database & optimization & performance') AS query
    WHERE search_vector @@ query
    ORDER BY keyword_score DESC
    LIMIT 50     -- 每个通道取 Top-50 候选
),
-- 通道 2:语义向量检索
vector_results AS (
    SELECT
        id,
        title,
        1 - (embedding <=> $1::vector) AS semantic_score,  -- 转换为相似度
        row_number() OVER (
            ORDER BY embedding <=> $1::vector
        ) AS vector_rank
    FROM documents
    ORDER BY embedding <=> $1::vector
    LIMIT 50     -- 每个通道取 Top-50 候选
),
-- RRF 融合
rrf_fusion AS (
    SELECT
        COALESCE(k.id, v.id) AS id,
        COALESCE(k.title, v.title) AS title,
        COALESCE(k.keyword_rank, 999) AS keyword_rank,   -- 未出现则排名靠后
        COALESCE(v.vector_rank, 999) AS vector_rank,
        k.keyword_score,
        v.semantic_score,
        -- RRF 分数计算(k=60)
        1.0 / (60 + COALESCE(k.keyword_rank, 999))
        + 1.0 / (60 + COALESCE(v.vector_rank, 999))
        AS rrf_score
    FROM keyword_results k
    FULL OUTER JOIN vector_results v ON k.id = v.id
)
SELECT
    id,
    title,
    keyword_rank,
    vector_rank,
    round(keyword_score::numeric, 4) AS keyword_score,
    round(semantic_score::numeric, 4) AS semantic_score,
    round(rrf_score::numeric, 6) AS rrf_score
FROM rrf_fusion
ORDER BY rrf_score DESC
LIMIT 10;

权重加权融合方案(当关键词和语义的重要性不同时):

-- 加权融合:关键词权重 0.3,语义权重 0.7
-- 适用于语义搜索更重要的场景(如问答系统)
WITH
keyword_results AS (
    SELECT id, row_number() OVER (
        ORDER BY ts_rank(search_vector, query) DESC
    ) AS keyword_rank
    FROM documents, to_tsquery('english', 'machine & learning') AS query
    WHERE search_vector @@ query
    LIMIT 50
),
vector_results AS (
    SELECT id, row_number() OVER (
        ORDER BY embedding <=> $1::vector
    ) AS vector_rank
    FROM documents
    ORDER BY embedding <=> $1::vector
    LIMIT 50
),
weighted_fusion AS (
    SELECT
        COALESCE(k.id, v.id) AS id,
        -- 加权 RRF
        0.3 * (1.0 / (60 + COALESCE(k.keyword_rank, 999)))
        + 0.7 * (1.0 / (60 + COALESCE(v.vector_rank, 999)))
        AS weighted_score
    FROM keyword_results k
    FULL OUTER JOIN vector_results v ON k.id = v.id
)
SELECT * FROM weighted_fusion
ORDER BY weighted_score DESC
LIMIT 10;

6.1.4 多维联合检索

在实际业务中,向量检索往往不是独立存在的,而是与多种过滤条件组合使用。

(1)向量检索 + 标签过滤 + 时间范围 + 全文匹配

-- 综合查询示例:
-- 在"技术类"文档中,搜索 2025 年发布的、包含"性能优化"关键词的、
-- 语义上与"数据库查询加速"最相似的 Top 10 文档

SELECT
    d.id,
    d.title,
    d.category,
    d.created_at,
    d.embedding <=> $1::vector AS semantic_distance,
    ts_rank(d.search_vector, query) AS keyword_rank
FROM documents d,
     to_tsquery('english', 'performance & optimization') AS query
WHERE
    d.category = 'technology'                           -- 标签过滤
    AND d.created_at >= '2025-01-01'                    -- 时间范围
    AND d.search_vector @@ query                        -- 全文匹配
ORDER BY
    d.embedding <=> $1::vector                          -- 向量距离排序
LIMIT 10;

(2)复合索引策略

-- 为过滤条件创建 B-tree 索引
CREATE INDEX ON documents (category);
CREATE INDEX ON documents (created_at);

-- 为向量检索创建向量索引
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops);

-- 为全文检索创建 GIN 索引
CREATE INDEX ON documents USING gin (search_vector);

-- PostgreSQL 优化器会自动选择最优的索引组合
-- 通常的策略是:先用 B-tree/GIN 索引过滤,再在过滤后的结果上做向量检索

(3)部分索引(Partial Index)在向量场景的应用

部分索引只对满足特定条件的行建立索引,可以显著减小索引大小,提高查询速度。

-- 只对"活跃"文档建立向量索引(假设只有 20% 的文档是活跃的)
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
    WHERE status = 'active';

-- 只对最近 6 个月的文档建立向量索引
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
    WHERE created_at >= '2025-06-01';

-- 查询时如果 WHERE 条件匹配部分索引的谓词,优化器会使用部分索引
SELECT id, title
FROM documents
WHERE status = 'active'                          -- 匹配部分索引谓词
ORDER BY embedding <=> $1::vector
LIMIT 10;
-- 使用部分索引,索引大小只有全量索引的 20%,查询更快

部分索引的存储节省:如果你的表有 1000 万条记录,但只有 200 万条是"活跃"的,部分索引只需要为 200 万条记录构建 HNSW 图。这不仅节省存储空间,还因为索引更小而使查询速度提升 30%~50%。


6.2 分布式 pgvector 方案

当单机 PostgreSQL 无法满足数据量或吞吐量需求时,需要考虑分布式方案。

6.2.1 Citus + pgvector 分布式架构

Citus 是什么:Citus 是 PostgreSQL 的分布式扩展(由 Microsoft 维护),它将 PostgreSQL 转变为一个分布式数据库,支持跨多台节点的水平扩展。Citus 以扩展的形式安装,不需要修改 PostgreSQL 内核,与 pgvector 完全兼容。

架构概述

                    ┌─────────────────┐
                    │  Coordinator    │  ← 接收查询,分发到 Worker 节点
                    │  (Citus 协调器) │
                    └───────┬─────────┘
                            │
              ┌─────────────┼─────────────┐
              │             │             │
        ┌─────┴─────┐ ┌────┴─────┐ ┌─────┴─────┐
        │ Worker 1  │ │ Worker 2 │ │ Worker 3  │
        │ 分片 1,4  │ │ 分片 2,5 │ │ 分片 3,6  │
        │ HNSW 索引 │ │ HNSW索引 │ │ HNSW 索引 │
        └───────────┘ └──────────┘ └───────────┘

安装与配置

-- 在每个节点(Coordinator + 所有 Worker)上安装扩展
CREATE EXTENSION citus;
CREATE EXTENSION vector;

-- 在 Coordinator 上注册 Worker 节点
SELECT citus_add_node('worker1.example.com', 5432);
SELECT citus_add_node('worker2.example.com', 5432);
SELECT citus_add_node('worker3.example.com', 5432);

-- 创建分布式向量表
CREATE TABLE documents (
    id BIGSERIAL,
    tenant_id BIGINT NOT NULL,
    title TEXT,
    embedding vector(1536),
    created_at TIMESTAMPTZ DEFAULT now()
);

-- 按 tenant_id 哈希分片(默认 32 个分片)
SELECT create_distributed_table('documents', 'tenant_id');

-- 为每个分片创建 HNSW 索引(自动传播到所有 Worker)
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

分片规则设计

-- 方案 A:按 tenant_id 哈希分片(多租户场景)
-- 优点:同一租户的数据在同一分片,支持租户内的向量检索
-- 缺点:如果某些租户数据量远大于其他租户,可能导致热点
SELECT create_distributed_table('documents', 'tenant_id');

-- 方案 B:按 id 哈希分片(通用场景)
-- 优点:数据均匀分布
-- 缺点:跨分片的向量检索需要合并多个分片的结果
SELECT create_distributed_table('documents', 'id');

-- 方案 C:引用表(小维度表)
-- 对于元数据表等小表,使用引用表(复制到每个 Worker)
SELECT create_reference_table('categories');

分布式向量查询的执行原理

-- 查询:在 tenant_id = 42 的文档中搜索最相似的 10 个
SELECT id, title, embedding <=> $1::vector AS distance
FROM documents
WHERE tenant_id = 42
ORDER BY distance
LIMIT 10;

-- 执行过程:
-- 1. Coordinator 根据 tenant_id = 42 的哈希值定位到具体分片(如 Worker 2)
-- 2. 查询被下推到 Worker 2,Worker 2 在自己的分片上执行 HNSW 索引扫描
-- 3. Worker 2 返回 Top-10 结果给 Coordinator
-- 4. Coordinator 直接返回给客户端
-- 注意:因为 tenant_id 过滤后只涉及单个分片,效率接近单节点查询

-- 跨分片查询(无 tenant_id 过滤)
SELECT id, title, embedding <=> $1::vector AS distance
FROM documents
ORDER BY distance
LIMIT 10;

-- 执行过程:
-- 1. Coordinator 将查询下推到所有 Worker
-- 2. 每个 Worker 在自己的分片上执行 HNSW 搜索,返回 Top-10
-- 3. Coordinator 收集所有 Worker 的 Top-10(共 30 条),全局排序取 Top-10
-- 注意:跨分片查询的延迟 = 最慢 Worker 的延迟 + 合并开销
-- 如果分片数量为 N,每个 Worker 返回 Top-K,Coordinator 需要合并 N*K 条记录

亿级向量的架构边界:Citus + pgvector 理论上可以支撑亿级向量存储。以 32 个 Worker 节点为例,每个节点存储约 300 万条向量(1 亿 / 32),每个 Worker 的 HNSW 索引大小约 15GB(300 万 * 1536 维 * 4 字节 + 图结构开销),可以完全驻留在 64GB 内存的服务器中。但跨分片查询的延迟会随着分片数量增加而增加,建议将分片数量控制在 16~64 之间。

6.2.2 其他分布式方案简述

(1)应用层分片(Sharding)

不依赖 Citus,在应用层实现数据路由。适合团队已有分片中间件的场景。

# Python 应用层分片示例
import hashlib
import psycopg2

SHARD_CONFIGS = [
    {"host": "shard1.db.example.com", "port": 5432, "dbname": "vectors_1"},
    {"host": "shard2.db.example.com", "port": 5432, "dbname": "vectors_2"},
    {"host": "shard3.db.example.com", "port": 5432, "dbname": "vectors_3"},
    {"host": "shard4.db.example.com", "port": 5432, "dbname": "vectors_4"},
]

def get_shard(doc_id: int) -> dict:
    """根据文档 ID 确定所在分片"""
    shard_index = doc_id % len(SHARD_CONFIGS)
    return SHARD_CONFIGS[shard_index]

def vector_search(query_vector: list, top_k: int = 10) -> list:
    """跨分片向量检索"""
    all_results = []

    for shard in SHARD_CONFIGS:
        conn = psycopg2.connect(**shard)
        cur = conn.cursor()
        cur.execute("""
            SELECT id, title, embedding <=> %s::vector AS distance
            FROM documents
            ORDER BY distance
            LIMIT %s
        """, (str(query_vector), top_k))
        all_results.extend(cur.fetchall())
        cur.close()
        conn.close()

    # 合并所有分片的结果,全局排序
    all_results.sort(key=lambda x: x[2])  # 按 distance 排序
    return all_results[:top_k]

(2)读写分离 + 流复制

通过 PostgreSQL 的流复制(Streaming Replication)创建只读副本,将向量检索查询分散到多个副本上。

                    ┌─────────────────┐
  写入(INSERT) ──→│  Primary 主节点  │
                    └────────┬────────┘
                             │ WAL 流复制
              ┌──────────────┼──────────────┐
              │              │              │
        ┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐
        │ Replica 1 │ │ Replica 2 │ │ Replica 3 │
        │ 只读查询   │ │ 只读查询   │ │ 只读查询   │
        │ HNSW 索引  │ │ HNSW 索引  │ │ HNSW 索引  │
        └───────────┘ └───────────┘ └───────────┘
              ↑              ↑              ↑
              └──────────────┴──────────────┘
                    向量检索查询(负载均衡)
# postgresql.conf(主节点)
wal_level = replica
max_wal_senders = 10
wal_keep_size = 1GB        # 保留足够的 WAL 供副本追赶

# 在副本节点上,可以独立调整向量检索参数
# 例如副本可以设置更大的 hnsw.ef_search(因为不承担写入压力)
hnsw.ef_search = 200

流复制 + pgvector 的注意事项:HNSW 索引会完全复制到每个副本上,副本不需要重建索引。但 WAL 日志中包含了向量数据的完整副本,批量写入 100 万条 1536 维向量会产生约 6GB 的 WAL 流量,需要确保主从之间的网络带宽足够。


6.3 主流向量库横向对比与选型决策

在选择向量数据库时,没有"银弹"——每种方案都有其适用的场景和局限性。以下是 2026 年主流向量数据库的全面对比。

6.3.1 各数据库概览

pgvector:PostgreSQL 的向量搜索扩展。将向量检索能力嵌入到最成熟的关系型数据库中,让用户在已有的 PostgreSQL 基础设施上获得向量搜索能力,无需引入新的技术栈。

Milvus / Zilliz:专为向量检索设计的分布式数据库,由 Zilliz 公司主导开发(CNCF 孵化项目)。架构上采用了存算分离、日志即数据(log-as-data)的设计理念,原生支持十亿级向量。

Qdrant:使用 Rust 语言编写的高性能向量数据库,以出色的查询性能和低延迟著称。支持丰富的过滤条件和负载(payload)索引,适合对延迟敏感的场景。

Chroma:轻量级的开源向量数据库,设计目标是"让 AI 应用开发更简单"。以内嵌式(embeddable)和极简 API 为核心卖点,适合快速原型开发和小规模应用。

Weaviate:基于 Go 语言开发的向量数据库,内置了向量化模块(vectorizer module),可以直接输入原始文本/图像由 Weaviate 自动完成向量化。GraphQL API 是其特色。

Pinecone:全托管的向量数据库云服务,用户无需管理任何基础设施。提供简单的 REST API,按需付费。

6.3.2 多维度对比表

对比维度 pgvector Milvus/Zilliz Qdrant Chroma Weaviate Pinecone
定位 PG 扩展,嵌入式向量检索 专用分布式向量数据库 专用向量数据库 轻量级嵌入式向量库 AI-native 向量数据库 全托管向量云服务
语言 C (PG 扩展) Go + C++ Rust Python Go 闭源 (托管)
架构 单体(依附于 PG) 存算分离分布式 单机/分布式 嵌入式/单机 单机/分布式 全托管云
最大数据量 数千万级(单机)/ 亿级(Citus) 十亿级以上 数亿级 百万级 数亿级 十亿级
索引类型 HNSW, IVFFlat HNSW, IVF_FLAT, IVF_SQ8, IVF_PQ, DiskANN, GPU 系列 HNSW HNSW HNSW 专有索引
查询延迟 1-50ms(千万级内) 1-20ms 1-10ms 1-50ms(小规模) 1-30ms 5-50ms
混合查询 极强(SQL 全能力) 中等(标量过滤) 较强(Payload 过滤) 弱(基础过滤) 较强(GraphQL) 中等(元数据过滤)
事务支持 完整 ACID 最终一致性
运维复杂度 低(已有 PG 运维体系) 高(分布式集群) 中等 极低 中等 极低(托管)
生态集成 极强(PG 生态) 较强(LangChain 等) 较强 极强(Python 生态) 较强 较强
开源 是 (PostgreSQL) 是 (Apache 2.0) 是 (Apache 2.0) 是 (Apache 2.0) 是 (BSD-3) 否(闭源 SaaS)
成本模型 自托管,硬件成本 自托管 / Zilliz Cloud 按量 自托管 / Qdrant Cloud 自托管免费 自托管 / Weaviate Cloud 按用量付费

6.3.3 深度分析:各维度详解

(1)数据量支持

pgvector 的数据量上限主要取决于 PostgreSQL 单机的内存和存储。在 64GB 内存、NVMe SSD 的服务器上,单表可以高效存储约 1000-2000 万条 1536 维向量。配合 Citus 分片,可以扩展到亿级。但如果你的需求是数十亿级向量,Milvus 和 Pinecone 的分布式架构更加原生适配。

(2)查询延迟

在同等数据量(100 万条 1536 维向量)下,各数据库的典型查询延迟:

  • pgvector (HNSW):2-10ms(取决于 ef_search 设置)
  • Milvus (HNSW):1-5ms
  • Qdrant (HNSW):1-5ms
  • Chroma (HNSW):2-20ms
  • Weaviate (HNSW):2-10ms
  • Pinecone:5-20ms(包含网络延迟)

说明:专用向量数据库(Milvus、Qdrant)的延迟通常比 pgvector 低 30%~50%,因为它们的索引实现经过了更多的向量特化优化,且不需要承担 SQL 解析器和事务管理的开销。但在大多数应用中,这个差距在端到端延迟中并不显著(网络延迟和 embedding 计算通常占主导)。

(3)混合查询能力

这是 pgvector 最大的优势领域。作为完整的 SQL 数据库,pgvector 支持:

  • 任意复杂的 WHERE 条件(多表 JOIN、子查询、CTE、窗口函数)
  • 事务一致性(向量插入和业务数据更新在同一事务中完成)
  • 全文检索 + 向量检索 + 结构化过滤的组合查询
  • 丰富的聚合函数和分析函数

其他向量数据库的混合查询能力通常局限于简单的标量过滤(如 color = 'red' AND price < 100),无法支持 JOIN、子查询等复杂操作。

(4)事务支持

pgvector 继承了 PostgreSQL 的完整 ACID 事务支持。这意味着你可以在一个事务中同时操作向量数据和非向量数据,保证一致性。例如:

BEGIN;
  -- 插入新文档
  INSERT INTO documents (title, content) VALUES ('New Doc', '...') RETURNING id;
  -- 调用外部服务生成 embedding 并存储
  UPDATE documents SET embedding = $1 WHERE id = lastval();
  -- 同时更新关联的元数据表
  INSERT INTO document_metadata (doc_id, category, tags) VALUES (lastval(), 'tech', '{ai,ml}');
COMMIT;  -- 所有操作要么全部成功,要么全部回滚

这在 Milvus、Qdrant 等专用向量数据库中是无法实现的——它们不支持跨集合/跨操作的事务。

6.3.4 选型决策框架

你的向量数据量有多大?
│
├── < 100 万条
│   ├── 已有 PostgreSQL? ───────────────→ pgvector(零额外成本)
│   ├── 快速原型/实验? ─────────────────→ Chroma(最简单)
│   └── 需要托管服务? ──────────────────→ Pinecone(免运维)
│
├── 100 万 ~ 1000 万条
│   ├── 需要复杂混合查询/事务? ─────────→ pgvector + 调优
│   ├── 纯向量检索 + 低延迟? ──────────→ Qdrant 或 Milvus
│   └── 不想运维? ──────────────────────→ Pinecone / Zilliz Cloud
│
├── 1000 万 ~ 1 亿条
│   ├── 已有 PG 运维能力 + 混合查询? ──→ pgvector + Citus
│   ├── 纯向量检索为主? ───────────────→ Milvus(原生分布式)
│   └── 低延迟优先? ───────────────────→ Qdrant(分布式模式)
│
└── > 1 亿条
    ├── 预算充足 + 不想运维? ──────────→ Pinecone / Zilliz Cloud
    └── 自建集群? ──────────────────────→ Milvus(经过大规模验证)

明确的选型建议

场景 推荐方案 理由
RAG 应用 + 已有 PostgreSQL pgvector 零额外基础设施,SQL 生态完整
电商推荐系统(多表 JOIN + 向量) pgvector 混合查询能力无可替代
十亿级图片搜索 Milvus 原生分布式,DiskANN 支持超大规模
快速 AI 原型/Demo Chroma 3 行代码接入,内嵌式零部署
低延迟实时搜索(<5ms) Qdrant Rust 实现,性能最优
不想管任何运维 Pinecone 全托管,按量付费
需要完整事务保证 pgvector 唯一的 ACID 选项
多模态搜索(图+文自动向量化) Weaviate 内置 Vectorizer 模块

6.4 pgvector 未来展望(2026+)

pgvector 仍在快速演进中,以下是值得关注的发展方向。

6.4.1 Iterative Scans(迭代扫描)

pgvector 0.8.x 引入了 iterative scans(迭代扫描)特性,这是对 HNSW 索引查询机制的重要增强。

原理:传统的 HNSW 查询在搜索图结构时,一旦遇到"局部最优"(当前节点的所有邻居都比当前节点更远),就会停止搜索。这可能导致错过全局最优解,特别是在图结构因频繁增删而退化的情况下。

迭代扫描机制允许查询在遇到局部最优后"重启"搜索——从未访问过的节点中选择最近的作为新起点,继续搜索。这类似于在 HNSW 图中进行多次"随机重启"(random restart),显著提高了召回率。

-- 启用迭代扫描(通过增加 ef_search 间接控制)
-- 迭代扫描会在 ef_search 的搜索预算内自动进行多轮搜索
SET hnsw.ef_search = 200;

-- 迭代扫描对以下场景特别有效:
-- 1. 图结构因频繁 INSERT/DELETE 而退化(尚未 REINDEX)
-- 2. 高维向量(>1024 维),图结构中"空隙"更多
-- 3. 需要极高召回率(>99%)的关键业务场景

性能影响:迭代扫描会增加少量查询延迟(通常 10%~30%),但带来的召回率提升在退化图上可以达到 5%~15%。对于无法频繁 REINDEX 的生产环境,这是一个非常有价值的特性。

6.4.2 GPU 加速向量检索预览

GPU 加速是向量检索领域的前沿方向。虽然 pgvector 本身尚未原生支持 GPU 加速,但 PostgreSQL 生态中已经出现了相关探索:

  • pg_vectorize 等社区扩展正在探索通过 CUDA/ROCm 调用 GPU 进行批量向量距离计算
  • Milvus 已经提供了 GPU 索引类型(GPU_IVF_FLAT、GPU_CAGRA 等),证明了 GPU 在向量检索中的可行性
  • 对于 pgvector,GPU 加速最可能的实现路径是通过自定义 C 扩展在距离计算阶段卸载到 GPU
-- 概念预览:未来的 GPU 加速向量查询
-- (目前尚未可用,仅为概念展示)

-- 假设未来有 pgvector_gpu 扩展
CREATE EXTENSION pgvector_gpu;

-- 使用 GPU 加速的索引类型
CREATE INDEX ON documents USING hnsw_gpu (embedding vector_cosine_ops)
    WITH (device = 'cuda:0');  -- 指定 GPU 设备

-- 查询语法不变,自动利用 GPU
SELECT id, title
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT 100;
-- GPU 可以同时计算数千个向量的距离,对大批量查询加速显著

适用场景:GPU 加速对于"单次查询需要计算大量距离"的场景效果最好,如 Top-K 值较大的查询(K > 100)或批量查询。对于简单的 Top-10 查询,GPU 的启动开销可能反而导致延迟增加。

6.4.3 VectorChord 等社区扩展

VectorChord(由 Tensorchord 团队开发)是 pgvector 的一个增强扩展,提供了 pgvector 尚未覆盖的高级功能:

  • ScaNN 索引:Google 提出的高效近似最近邻算法,通过各向异性向量量化(Anisotropic Vector Quantization)实现比 HNSW 更高的压缩比和查询精度,特别适合内存受限的场景
  • RaBit 索引:一种基于随机投影的索引方法,在超大规模数据集上表现优异
  • 半精度(FP16)存储:将向量从 float32 压缩为 float16,存储减半,精度损失通常在 1% 以内
-- VectorChord 使用示例(需要安装 vchord 扩展)
CREATE EXTENSION vchord;

-- 使用 ScaNN 索引(比 HNSW 更省内存)
CREATE INDEX ON documents USING scann (embedding vector_cosine_ops)
    WITH (n_lists = 1000, n_probes = 10);

-- ScaNN 索引的优势:
-- 1. 内存占用比 HNSW 低 50%~70%(通过量化压缩)
-- 2. 构建速度比 HNSW 快 3~5 倍
-- 3. 查询延迟与 HNSW 相当
-- 4. 适合 1000 万+ 向量且内存预算有限的场景

其他值得关注的社区扩展

扩展名 功能 状态
pgvectorscale Timescale 团队开发,提供 StreamingDiskANN 索引,支持磁盘上的大规模向量检索 活跃开发中
pgvector-python Python ORM 集成库,支持 SQLAlchemy、Django ORM 等 稳定维护中
pg_embedding Neon 团队开发的替代扩展,提供 HNSW 优化变体 实验性质

6.4.4 PostgreSQL 生态中向量检索的发展趋势

趋势 1:向量检索成为 PostgreSQL 的标准能力

随着 pgvector 的成熟和普及,向量检索正在从"特殊扩展"演变为 PostgreSQL 的标准能力。主流云数据库服务商(AWS RDS、Google Cloud SQL、Azure Database、Supabase、Neon、Timescale)均已支持 pgvector。这意味着新项目的技术选型中,pgvector 将成为"默认选项"而非"冒险选择"。

趋势 2:索引算法多样化

pgvector 目前只支持 HNSW 和 IVFFlat 两种索引。未来预计会引入更多索引类型:

  • DiskANN / Vamana:微软提出的磁盘友好型索引,可以在有限的内存预算下处理超大规模向量(数十亿级),通过将图结构存储在磁盘上、仅将关键导航信息保留在内存中
  • 量化索引:如 IVF_PQ(乘积量化)、IVF_SQ(标量量化),通过压缩向量表示来降低内存占用和加速距离计算
  • 混合索引:将标量过滤条件与向量索引深度整合,避免"先过滤后搜索"或"先搜索后过滤"的效率损失

趋势 3:与 AI 工作流的深度整合

PostgreSQL 生态正在向"AI-native 数据库"方向发展:

  • pgai(Timescale 团队):直接在 PostgreSQL 中调用 OpenAI/Cohere 等 API 生成 embedding,无需应用层中转
  • pgvector + pgvectorscale + pgai 的组合可以构建完整的 RAG 管道:文档存储 → 自动向量化 → 向量检索 → 结果生成,全部在 PostgreSQL 内完成
-- 概念示例:使用 pgai 在数据库内直接生成 embedding(未来方向)
-- 目前需要通过应用层调用 embedding API

-- 目标:
SELECT ai_embedding('text-embedding-3-small', content) AS embedding
FROM documents
WHERE id = 42;

-- 结合触发器实现自动向量化
CREATE OR REPLACE FUNCTION auto_embed()
RETURNS TRIGGER AS $$
BEGIN
    NEW.embedding := ai_embedding('text-embedding-3-small', NEW.content);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER embed_trigger
    BEFORE INSERT OR UPDATE OF content ON documents
    FOR EACH ROW EXECUTE FUNCTION auto_embed();

趋势 4:半结构化向量(Sparse/Dense Hybrid)

随着稀疏向量(如 BM25 词频向量)和稠密向量(如 transformer embedding)在混合检索中的广泛应用,pgvector 未来可能支持稀疏向量类型和混合检索的原生优化,而不是像现在这样需要通过 RRF 等外部算法来融合两路结果。


附录:本阶段关键 SQL 速查表

操作 SQL 命令
查看 HNSW 搜索参数 SHOW hnsw.ef_search;
查看 IVFFlat 探测参数 SHOW ivfflat.probes;
会话级调参 SET hnsw.ef_search = 100;
数据库级调参 ALTER DATABASE mydb SET hnsw.ef_search = 100;
查看索引定义 SELECT indexname, indexdef FROM pg_indexes WHERE tablename = 't';
检查索引使用率 SELECT idx_scan FROM pg_stat_user_indexes WHERE relname = 't';
检查表膨胀 SELECT * FROM pgstattuple('table_name');
强制使用索引 SET enable_seqscan = off;(仅测试)
重建索引 REINDEX INDEX CONCURRENTLY idx_name;
查看向量维度 SELECT vector_dims(embedding) FROM t LIMIT 1;
向量归一化 SELECT l2_normalize(embedding) FROM t;
检查向量范数 SELECT l2_norm(embedding) FROM t;

本阶段总结:第五阶段覆盖了 pgvector 在生产环境中从参数调优、数据分层、运维监控到坑点规避的完整知识体系。第六阶段则拓展了混合检索、分布式架构、技术选型和未来展望等高阶主题。掌握这两个阶段的内容,意味着你已经具备了在真实业务场景中端到端设计、部署和运维 pgvector 向量检索系统的能力。从"会用"到"精通",关键在于实践——建议读者在自己的业务数据上亲手操作一遍本教程中的每个 SQL 示例,感受不同参数配置下的性能差异,积累第一手的调优经验。


本教程基于 pgvector v0.8.2 + PostgreSQL 17 编写,最后更新:2026年6月

1

评论区