侧边栏壁纸
  • 累计撰写 68 篇文章
  • 累计创建 42 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

极简视界智能体开发平台-详细设计

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Agent实战:极简视界智能体开发平台 — 技术详细设计文档

1. 项目概述

1.1 项目背景与定位

极简视界智能体开发平台是面向企业级客户的低代码大模型应用开发轻量化平台,旨在为企业提供一个开箱即用、灵活可控的 AI 智能体构建与运营环境。平台核心能力涵盖以下四大维度:

  • 可视化智能体编排:基于 LangGraph 状态图引擎,提供拖拽式、节点化的智能体工作流编排能力,支持条件分支、循环、并行执行、人工审批等复杂流程控制,降低 AI 应用开发门槛。
  • RAG 知识库搭建:内置完整的文档解析、智能分块、向量化、混合检索与 Rerank 管线,支持 PDF/Word/Excel/Markdown/HTML 等多格式文档接入,为企业提供高质量的私域知识增强能力。
  • 大模型集成与网关:统一对接 OpenAI、Azure OpenAI、文心一言、通义千问、DeepSeek、Moonshot 等国内外主流大模型,提供统一的 API 抽象层、负载均衡、Fallback 容错与 Token 用量计量。
  • 企业级多租户与权限管理:基于 Schema 级别隔离的多租户架构,配合 RBAC 细粒度权限模型,满足企业客户在数据安全、合规性和组织管理上的严格要求。

1.2 项目来源与技术路线

本项目依托业界成熟的开源智能体平台(如 Dify / FastGPT)进行深度二次开发。我们在开源版本的基础上进行了以下核心改造:

改造维度开源基线极简视界增强
架构模式单体紧耦合模块化单体(Modular Monolith),清晰的领域边界
智能体引擎简单 Chain 调用LangGraph 状态图 + LCEL 声明式链
向量引擎内置 Chroma/WeaviateMilvus 2.6 分布式向量数据库 + 混合检索
多租户基础租户标识字段Schema 级别物理隔离 + 行级安全策略
可观测性基础日志OpenTelemetry 全链路追踪 + Prometheus 监控
异步任务简单后台线程Celery + Redis Broker 分布式任务队列
部署能力Docker ComposeDocker Compose(开发)+ K8s Helm Charts(生产)

选择基于开源项目二次开发而非从零搭建,核心考量如下:

  1. 加速交付周期:复用已验证的基础能力(对话管理、Prompt 模板、基础 RAG),将精力聚焦于差异化竞争力(智能体编排引擎、企业级多租户、混合检索增强)。
  2. 降低技术风险:开源项目已覆盖大量边界场景处理,避免重复踩坑。
  3. 社区生态兼容:保持与上游社区的接口兼容性,便于后续升级和插件生态接入。

1.3 项目周期与里程碑

  • 项目时间:2025 年 1 月 — 2026 年 4 月(共 16 个月)
阶段时间核心交付
P0 — 架构设计与基座搭建2025.01 — 2025.03技术选型确认、基础框架搭建、CI/CD 流水线、多租户基座
P1 — 核心引擎开发2025.04 — 2025.07LangGraph 编排引擎、RAG Pipeline、模型网关、知识库管理
P2 — 平台能力建设2025.08 — 2025.11可视化前端、工作流编辑器、应用市场、API 开放平台
P3 — 企业级增强2025.12 — 2026.02SSO/LDAP 集成、审计日志、性能优化、安全加固
P4 — 测试与交付2026.03 — 2026.04全量测试、压测调优、生产部署、文档交付

1.4 核心技术栈总览

层级技术栈版本
编程语言Python3.12+
Web 框架FastAPI0.135.x
AI 编排框架LangChain + LangGraphv1.2 / 0.4+
向量数据库Milvus2.6
关系数据库PostgreSQL16
缓存/消息中间件Redis7.x
异步任务队列Celery5.4+
对象存储MinIORELEASE.2025-xx
容器编排Docker / Kubernetes25.x / 1.30+
前端框架React + Ant Design ProReact 18 + UmiJS 4
可观测性OpenTelemetry + Prometheus + Grafana最新稳定版

2. 系统总体架构

2.1 分层架构设计

极简视界采用经典的分层架构设计,整体遵循**模块化单体(Modular Monolith)**架构风格——进程内部署、逻辑上独立、边界上清晰。这种架构在保持部署简单性的同时,通过严格的模块边界约束为未来可能的微服务拆分预留了空间。

系统自上而下分为六层:

接入层(Access Layer)

  • Nginx 反向代理:承担 TLS 终止、静态资源服务、请求路由、限流限速(基于 limit_req 模块)、Gzip 压缩等职责。
  • API Gateway 模块:在 FastAPI 内部实现的逻辑网关,负责统一鉴权(JWT / API Key)、请求签名校验、租户识别(从 Header/Subdomain 提取 tenant_id)、全局异常处理、请求日志记录。

应用层(Application Layer)

基于 FastAPI 构建的模块化单体应用,按领域驱动设计(DDD)划分为以下核心模块:

模块职责关键接口
auth认证鉴权、SSO、LDAP/api/v1/auth/*
tenant租户管理、工作空间/api/v1/tenants/*
agent智能体定义、版本管理/api/v1/agents/*
workflow工作流编排、节点管理/api/v1/workflows/*
knowledge知识库管理、文档处理/api/v1/knowledge/*
model_hub模型管理、Provider 配置/api/v1/models/*
conversation对话管理、消息记录/api/v1/conversations/*
app_store应用市场、模板管理/api/v1/apps/*

每个模块内部遵循三层结构:Router(路由层)→ Service(业务逻辑层)→ Repository(数据访问层),模块间通过定义良好的内部接口(Python Protocol / ABC)通信,禁止跨模块直接访问 Repository。

智能体引擎层(Agent Engine Layer)

  • LangGraph 状态图编排引擎:将智能体工作流建模为有向图(DAG / 带环图),每个节点为一个执行单元(LLM 调用、工具调用、条件判断、人工审批),边定义了状态转移逻辑。支持子图嵌套、并行分支、条件路由、循环重试等高级模式。
  • LangChain LCEL 链:在单个节点内部,使用 LangChain v1.2 的 LCEL(LangChain Expression Language)声明式语法构建处理链,如 Prompt 模板 → LLM 调用 → Output Parser → Tool Invocation。
  • 工具注册中心:统一管理内置工具(搜索、计算、代码执行)和自定义工具(OpenAPI Schema 导入),支持工具权限控制和执行沙箱。

知识引擎层(Knowledge Engine Layer)

完整的 RAG(Retrieval-Augmented Generation)管线:

文档输入 → 文档解析(Unstructured/Tika)→ 智能分块(RecursiveCharacterTextSplitter + Semantic Chunking)
    → Embedding 向量化(多模型适配)→ Milvus 向量存储 + PostgreSQL 元数据存储
    → 混合检索(向量相似度 + BM25 全文检索)→ Rerank(Cohere/BGE-Reranker)
    → 上下文组装 → LLM 生成

数据层(Data Layer)

存储引擎用途关键设计
PostgreSQL 16业务数据、多租户 Schema 隔离、全文检索Row-Level Security + Schema-per-Tenant
Milvus 2.6向量索引与检索HNSW 索引、分区键按租户隔离、GPU 加速
Redis 7缓存、会话、限流计数器、Celery BrokerRedis Stream 事件总线、Pub/Sub 实时通知
MinIO文档文件、模型文件、静态资源S3 兼容 API、按租户 Bucket 隔离

基础设施层(Infrastructure Layer)

  • 开发环境:Docker Compose 一键启动全栈服务(含热重载开发服务器)。
  • 生产环境:Kubernetes + Helm Charts 部署,支持 HPA 水平自动扩缩容、滚动更新、健康检查。
  • 异步任务:Celery Worker 集群处理文档解析、向量化、批量导入等耗时任务,通过 Redis Broker 分发任务,支持任务优先级队列、重试策略和 Dead Letter Queue。
  • 可观测性:OpenTelemetry SDK 自动埋点 → Jaeger 链路追踪;Prometheus 采集应用指标 → Grafana 仪表盘;结构化日志(JSON)→ Loki/ELK 集中式日志平台。

2.2 架构全景图

2.3 核心数据流

智能体对话请求流

3. 技术选型详述

3.1 Web 框架:FastAPI 0.135.x

选型理由

评估维度FastAPI 0.135.xDjango 5.xFlask 3.x
性能基于 Starlette + Uvicorn(ASGI),原生异步,QPS 表现优异WSGI 同步模型(需 Daphne 才能异步),中间件链重同步模型,需配合 gevent/eventlet,性能一般
类型安全深度集成 Pydantic v2,请求/响应自动校验与序列化ModelForm 有校验但非类型提示驱动需手动集成 marshmallow/webargs
OpenAPI自动生成 OpenAPI 3.1 文档,零配置需 drf-spectacular(DRF 生态)需 flask-restx 或手动维护
异步支持原生 async def 路由、依赖注入、后台任务部分支持(views 可 async,ORM 层仍同步居多)有限支持
依赖注入内置 Depends() 系统,优雅的 IoC 容器无内置,需第三方库无内置
AI 生态契合度LangChain/LlamaIndex 等 AI 框架示例首选较少 AI 框架集成示例社区较小
学习曲线中等(需理解 async/Pydantic)较高(全家桶概念多)低(极简核心)

核心决策因素

  1. 异步原生性:智能体对话场景大量涉及 LLM API 调用(网络 I/O 密集),FastAPI 的原生 async/await 支持可以在等待 LLM 响应期间释放事件循环,显著提升并发处理能力。实测在相同硬件条件下,FastAPI 的流式对话吞吐量比 Django 高出 2-3 倍。

  2. Pydantic v2 深度集成:智能体配置、工作流节点定义等数据结构复杂(嵌套 JSON Schema、动态字段),Pydantic v2 的 model_validatorDiscriminated UnionJSON Schema 生成能力极大简化了数据校验层。

  3. 依赖注入系统Depends() 机制天然适合实现多租户上下文注入、数据库会话管理、权限校验等横切关注点,代码侵入性极低。

# FastAPI 依赖注入示例:租户上下文 + 数据库会话 + 权限校验
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI(title="极简视界 API", version="2.0.0")

async def get_tenant_context(request: Request) -> TenantContext:
    """从请求头/JWT 中提取租户上下文"""
    tenant_id = request.headers.get("X-Tenant-ID") or extract_from_jwt(request)
    return TenantContext(tenant_id=tenant_id, schema=f"tenant_{tenant_id}")

async def get_db_session(tenant: TenantContext = Depends(get_tenant_context)) -> AsyncSession:
    """基于租户上下文获取隔离的数据库会话"""
    async with async_session_factory() as session:
        await session.execute(text(f"SET search_path TO {tenant.schema}, public"))
        yield session

async def require_permission(
    resource: str, action: str,
    tenant: TenantContext = Depends(get_tenant_context),
    db: AsyncSession = Depends(get_db_session),
) -> UserContext:
    """RBAC 权限校验"""
    user = await get_current_user(tenant, db)
    if not await check_permission(db, user.role_id, resource, action):
        raise HTTPException(status_code=403, detail="权限不足")
    return user

@app.post("/api/v1/agents")
async def create_agent(
    payload: AgentCreateSchema,
    user: UserContext = Depends(require_permission("agent", "create")),
    db: AsyncSession = Depends(get_db_session),
):
    return await agent_service.create(db, payload, user)

潜在劣势与应对:FastAPI 缺乏 Django 那样的内置 Admin、ORM 和 Migration 工具。我们通过 SQLAlchemy 2.0 + Alembic 补齐了 ORM 和数据库迁移能力,并基于 React Admin 构建了独立的管理后台。

3.2 AI 编排框架:LangChain v1.2 + LangGraph 0.4+

选型理由

评估维度LangChain v1.2 + LangGraph 0.4+AutoGen (Microsoft)CrewAI
编排范式状态图(Graph)+ 声明式链(LCEL)多 Agent 对话式协作角色扮演式多 Agent 协作
流程控制细粒度节点/边定义,支持条件路由、循环、并行、子图对话轮次驱动,流程控制粒度粗顺序/层级执行,灵活性有限
状态管理显式状态定义(TypedDict/Pydantic),Checkpoint 持久化对话历史即状态,缺乏结构化状态管理任务结果传递,无持久化状态
人机协作内置 Interrupt/Breakpoint 机制,支持 Human-in-the-Loop需手动实现不原生支持
流式输出原生支持 Token 级流式 + 事件流(astream_events)有限支持有限支持
可观测性LangSmith 集成 / 可自定义 CallbackLangChain 生态外较少
模型兼容广泛的 ChatModel 适配层主要绑定 OpenAI主要绑定 OpenAI
社区成熟度最成熟,文档/示例最丰富快速发展中社区较小

核心决策因素

  1. LangGraph 的状态图模型完美匹配工作流编排需求:企业的智能体工作流往往包含条件分支(如根据意图分类路由到不同处理链路)、循环(如 ReAct 模式中的"思考→行动→观察"循环)、并行执行(如同时检索多个知识库后合并结果),LangGraph 的 StateGraph + add_conditional_edges + Send API 原生支持这些模式。

  2. Checkpoint 机制支撑长时运行对话:LangGraph 的 AsyncPostgresSaver / AsyncRedisSaver 可将图执行状态持久化到数据库,实现对话断点续聊、时间旅行(回溯到历史状态)、Human-in-the-Loop 审批中断后恢复执行。

  3. LCEL 链提供节点级别的声明式组合能力:在单个图节点内部,使用 LCEL 的管道运算符(|)组合 Prompt Template、ChatModel、OutputParser、Tool 等组件,代码简洁且可测试。

# LangGraph 0.4+ 状态图编排示例:ReAct 智能体 + RAG 检索
from typing import Annotated, Literal, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_core.messages import AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    retrieved_context: str | None
    iteration_count: int

# 初始化 LLM(LangChain v1.2 统一接口)
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0.1,
    streaming=True,
    max_retries=3,
)

# 工具定义
tools = [knowledge_search_tool, web_search_tool, calculator_tool]
llm_with_tools = llm.bind_tools(tools)

# --- 节点函数 ---
async def retrieve_context(state: AgentState) -> dict:
    """RAG 检索节点:从知识库获取相关上下文"""
    last_message = state["messages"][-1]
    if state.get("retrieved_context"):
        return {}  # 已有上下文,跳过检索
    docs = await knowledge_service.hybrid_search(
        query=last_message.content,
        top_k=5,
        rerank=True,
    )
    context = "\n\n".join(doc.page_content for doc in docs)
    return {"retrieved_context": context}

async def agent_reasoning(state: AgentState) -> dict:
    """LLM 推理节点:基于上下文和工具进行推理"""
    system_prompt = SystemMessage(content=f"""你是极简视界智能助手。
    请基于以下检索到的知识上下文回答用户问题:
    <context>{state.get('retrieved_context', '无')}</context>
    如果需要更多信息,可以调用搜索工具。""")

    prompt = ChatPromptTemplate.from_messages([
        system_prompt,
        MessagesPlaceholder("messages"),
    ])
    chain = prompt | llm_with_tools
    response = await chain.ainvoke({"messages": state["messages"]})
    return {"messages": [response], "iteration_count": state.get("iteration_count", 0) + 1}

def should_continue(state: AgentState) -> Literal["tools", "generate", END]:
    """条件路由:判断下一步执行"""
    last_message = state["messages"][-1]
    if state.get("iteration_count", 0) >= 5:
        return "generate"  # 防止无限循环
    if isinstance(last_message, AIMessage) and last_message.tool_calls:
        return "tools"
    return "generate"

async def generate_response(state: AgentState) -> dict:
    """生成最终响应节点"""
    prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content="基于对话历史和检索上下文,生成最终的完整回答。"),
        MessagesPlaceholder("messages"),
    ])
    chain = prompt | llm
    response = await chain.ainvoke({"messages": state["messages"]})
    return {"messages": [response]}

# --- 构建状态图 ---
graph = StateGraph(AgentState)

graph.add_node("retrieve", retrieve_context)
graph.add_node("reason", agent_reasoning)
graph.add_node("tools", ToolNode(tools))
graph.add_node("generate", generate_response)

graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "reason")
graph.add_conditional_edges("reason", should_continue, {
    "tools": "tools",
    "generate": "generate",
    END: END,
})
graph.add_edge("tools", "reason")  # 工具执行后回到推理节点
graph.add_edge("generate", END)

# 编译图(带 Checkpoint 持久化)
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

checkpointer = AsyncPostgresSaver.from_conn_string(DATABASE_URL)
agent_app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["generate"],  # Human-in-the-Loop:生成前可人工审核
)

3.3 向量数据库:Milvus 2.6

选型理由

评估维度Milvus 2.6Chroma 0.6+Weaviate 1.28+Qdrant 1.12+
架构存算分离,分布式(Proxy/Query/Data/Index Node)嵌入式 / 单机 Server分布式(RAFT 共识)分布式(RAFT 共识)
性能(百万级)HNSW + GPU 加速,毫秒级查询内存模式,十万级后性能下降性能良好性能良好
索引类型HNSW、IVF_FLAT、IVF_SQ8、DiskANN、GPU_IVF 等HNSW(单一)HNSW(单一)HNSW(单一)
混合检索原生支持标量过滤 + 向量搜索 + 全文检索基础元数据过滤GraphQL + 向量过滤 + 向量
多租户支持Partition Key / Database 级别Collection 级别多租户原生Collection 级别
高可用多副本、故障自动转移无(单机)RAFT 复制RAFT 复制
运维复杂度较高(依赖 etcd/MinIO/Pulsar)极低中等中等
Python SDKpymilvus 官方维护chromadbweaviate-clientqdrant-client

核心决策因素

  1. 性能天花板最高:企业客户的知识库规模从数万到数千万文档不等。Milvus 的存算分离架构和 GPU 加速索引能力使其在百万到千万级向量规模下仍能保持亚秒级查询延迟,而其他方案在数据量增长后通常需要架构升级。

  2. 丰富的索引策略:不同租户的数据规模和查询特征差异大。Milvus 支持按 Collection 配置不同的索引类型——小租户使用 IVF_FLAT 节省资源,大租户使用 HNSW 保障性能,冷数据使用 DiskANN 降低成本。

  3. Partition Key 实现高效多租户隔离:通过 tenant_id 作为 Partition Key,在同一个 Collection 内实现租户数据的物理隔离,查询时自动按分区裁剪,避免跨租户数据泄露。

# Milvus 2.6 多租户 Collection 设计
from pymilvus import (
    MilvusClient, DataType, CollectionSchema, FieldSchema,
    Function, FunctionType
)

client = MilvusClient(uri="http://milvus:19530")

# 定义 Schema:tenant_id 作为 Partition Key
schema = client.create_schema(auto_id=True, enable_dynamic_field=True)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="tenant_id", datatype=DataType.VARCHAR, max_length=64, is_partition_key=True)
schema.add_field(field_name="document_id", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="chunk_index", datatype=DataType.INT32)
schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535, enable_analyzer=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=1536)
schema.add_field(field_name="sparse_embedding", datatype=DataType.SPARSE_FLOAT_VECTOR)
schema.add_field(field_name="metadata", datatype=DataType.JSON)

# BM25 全文检索函数(Milvus 2.6 新特性)
bm25_function = Function(
    name="bm25",
    input_field_names=["content"],
    output_field_names=["sparse_embedding"],
    function_type=FunctionType.BM25,
)
schema.add_function(bm25_function)

# 创建 Collection 并建立索引
index_params = client.prepare_index_params()
index_params.add_index(
    field_name="embedding",
    index_type="HNSW",
    metric_type="COSINE",
    params={"M": 16, "efConstruction": 256},
)
index_params.add_index(
    field_name="sparse_embedding",
    index_type="SPARSE_INVERTED_INDEX",
    metric_type="BM25",
)

client.create_collection(
    collection_name="knowledge_chunks",
    schema=schema,
    index_params=index_params,
    num_partitions=64,  # 分区数,按租户哈希分配
)

3.4 关系数据库:PostgreSQL 16

选型理由

评估维度PostgreSQL 16MySQL 8.4
多租户支持原生 Schema 概念,search_path 切换零成本无 Schema 概念,只能用 Database 隔离或字段标记
JSON 支持JSONB 类型,GIN 索引,丰富的操作符(@>->>jsonb_path_queryJSON 类型,索引能力有限
全文检索tsvector + tsquery,支持中文分词(zhparser),可用于 BM25 混合检索FULLTEXT 索引,功能较弱
高级特性Row-Level Security、LISTEN/NOTIFY 事件、物化视图、CTE 递归查询、Window Function相对较少
扩展性pgvector(向量)、PostGIS(地理)、pg_cron(定时任务)插件生态较小
并发控制MVCC 成熟,行锁/表锁/ Advisory LockMVCC + Next-Key Lock

核心决策因素:PostgreSQL 的 Schema 概念是实现"Shared Database, Separate Schema"多租户方案的基石——每个租户对应一个独立的 PostgreSQL Schema,通过 SET search_path 实现透明的数据隔离,无需修改业务 SQL。JSONB 类型天然适合存储智能体配置、工作流定义、工具参数等半结构化数据。pgvector 扩展可作为向量存储的备选方案(小规模场景)。

3.5 缓存与消息中间件:Redis 7 + Celery 5.4

Redis 7 选型理由

Redis 在平台中承担多重职责:

用途Redis 数据结构关键配置
API 响应缓存String + TTLcache:* 前缀,LRU 淘汰策略
会话存储Hashsession:{session_id}
限流计数器String + Lua 脚本(滑动窗口)ratelimit:{user_id}:{endpoint}
Celery BrokerList / Stream任务优先级队列
实时通知Pub/Sub + Stream对话流式推送、任务状态更新
分布式锁String + SET NX EXlock:document:{doc_id} 防止重复处理

选择 Redis 而非 Memcached 的核心原因:Redis 支持丰富的数据结构(Hash/Sorted Set/Stream)、Lua 脚本原子操作、Pub/Sub 实时通信、持久化(AOF/RDB),这些能力在智能体平台的限流、会话管理、事件通知等场景中不可或缺。

Celery 5.4 选型理由

Celery 是 Python 生态中最成熟的分布式任务队列框架,选择它处理平台中的异步耗时任务:

任务类型队列优先级超时配置
文档解析与分块document_processing30 分钟
向量 Embedding 生成embedding60 分钟
知识库全量重建knowledge_rebuild4 小时
对话日志归档log_archival10 分钟
模型文件下载model_download2 小时
# Celery 任务定义示例
from celery import Celery, chain, group, chord
from celery.signals import task_prerun, task_postrun

celery_app = Celery(
    "kunlun_tasks",
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1",
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    task_routes={
        "tasks.document.*": {"queue": "document_processing"},
        "tasks.embedding.*": {"queue": "embedding"},
        "tasks.knowledge.*": {"queue": "knowledge_rebuild"},
    },
    task_acks_late=True,            # 任务完成后才确认,防止丢失
    worker_prefetch_multiplier=1,   # 避免单个 Worker 抢占过多任务
    task_reject_on_worker_lost=True,
)

@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    time_limit=1800,  # 30 分钟硬超时
)
def process_document(self, tenant_id: str, document_id: str, file_path: str):
    """文档处理流水线:解析 → 分块 → 向量化 → 入库"""
    try:
        # Step 1: 文档解析
        self.update_state(state="PARSING", meta={"progress": 10})
        raw_content = document_parser.parse(file_path)

        # Step 2: 智能分块
        self.update_state(state="CHUNKING", meta={"progress": 30})
        chunks = text_splitter.split(
            raw_content,
            chunk_size=512,
            chunk_overlap=64,
            strategy="semantic",  # 语义分块
        )

        # Step 3: 向量化(批量并发)
        self.update_state(state="EMBEDDING", meta={"progress": 50})
        embeddings = embedding_service.batch_embed(
            texts=[chunk.text for chunk in chunks],
            batch_size=32,
        )

        # Step 4: 写入 Milvus + PostgreSQL
        self.update_state(state="INDEXING", meta={"progress": 80})
        knowledge_service.store_chunks(
            tenant_id=tenant_id,
            document_id=document_id,
            chunks=chunks,
            embeddings=embeddings,
        )

        return {"document_id": document_id, "chunk_count": len(chunks), "status": "completed"}

    except Exception as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)

3.6 文件存储:MinIO

MinIO 是一个高性能的 S3 兼容对象存储系统,选型理由:

  1. S3 API 完全兼容:使用 boto3 SDK 即可无缝对接,未来迁移到 AWS S3 / 阿里云 OSS 零代码改动。
  2. 私有化部署友好:企业客户往往要求数据不出内网,MinIO 支持完全私有化部署,满足数据合规要求。
  3. 按租户隔离:通过 Bucket 策略或 Prefix 命名空间实现租户文件隔离。
  4. 高性能:支持纠删码(Erasure Coding)保护数据可靠性,大文件分片上传/下载。

主要存储内容:用户上传的原始文档、知识库导入导出的数据包、智能体生成的文件产物、对话中的多媒体附件。

3.7 前端:React 18 + Ant Design Pro + UmiJS 4

评估维度React + Ant Design ProVue + Element PlusAngular + NG-ZORRO
生态丰富度最丰富,AI 领域组件/库最多丰富中等
Ant Design Pro开箱即用的中后台模板,含权限路由、CRUD 模板无对标成熟方案NG-ZORRO Pro 较弱
可视化编辑器React Flow / XYFlow(工作流节点编辑器首选)Vue Flow(社区较小)较少选择
TypeScript原生支持,类型推导好原生支持原生 TypeScript

核心决策:采用 React 18 + Ant Design Pro 6 作为中后台管理框架,使用 React Flow(XYFlow)构建可视化的智能体工作流编辑器(节点拖拽、连线、参数配置面板),基于 UmiJS 4 的企业级路由和状态管理方案。

4. 多租户架构设计

4.1 架构方案选型

多租户架构有三种主流方案,我们选择 Shared Database, Separate Schema 方案:

方案隔离级别运维复杂度资源利用率适用场景
Separate Database最高(独立数据库实例)高(N 套数据库运维)低(资源独占)金融/医疗等强合规行业
Shared Database, Separate Schema高(Schema 级物理隔离)中(一个实例多 Schema)中(共享连接池)企业级 SaaS 平台(本方案)
Shared Schema低(字段标记隔离)轻量级 SaaS、C 端产品

选择 Separate Schema 方案的核心考量:

  1. 物理隔离保障数据安全:每个租户的数据在 PostgreSQL Schema 层面完全隔离,即使应用层 Bug 也不会导致跨租户数据泄露(search_path 限制了可见范围)。
  2. 运维成本可控:所有租户共享同一个 PostgreSQL 实例和连接池,无需管理多个数据库实例。
  3. 灵活的数据迁移:支持单租户独立备份/恢复、独立 Schema Migration,不影响其他租户。
  4. 性能隔离:可为不同 Schema 设置不同的资源限制(通过 pg_cgroup 或连接池层面的配额)。

4.2 租户隔离中间件设计

整体隔离链路:HTTP 请求 → 租户识别中间件 → 设置 Schema → 业务处理 → 清理上下文

# ============================================================
# 文件: app/middleware/tenant.py
# 描述: 多租户隔离中间件 — 从请求中提取租户标识并设置数据库 Schema
# ============================================================

import contextvars
from typing import Callable
from uuid import UUID

from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp

# 使用 contextvars 实现线程/协程安全的租户上下文传递
_tenant_context: contextvars.ContextVar["TenantContext | None"] = contextvars.ContextVar(
    "tenant_context", default=None
)


class TenantContext:
    """租户上下文对象,贯穿请求生命周期"""
    __slots__ = ("tenant_id", "schema_name", "tenant_name", "plan", "features")

    def __init__(
        self,
        tenant_id: str,
        schema_name: str,
        tenant_name: str,
        plan: str = "standard",
        features: list[str] | None = None,
    ):
        self.tenant_id = tenant_id
        self.schema_name = schema_name  # 例如: "tenant_a1b2c3d4"
        self.tenant_name = tenant_name
        self.plan = plan
        self.features = features or []


def get_current_tenant() -> TenantContext:
    """获取当前请求的租户上下文(可在任意层级调用)"""
    ctx = _tenant_context.get()
    if ctx is None:
        raise RuntimeError("租户上下文未设置,请检查 TenantMiddleware 是否正确挂载")
    return ctx


class TenantMiddleware(BaseHTTPMiddleware):
    """
    多租户中间件 — 核心隔离入口

    处理流程:
    1. 从 JWT Token / X-Tenant-ID Header / Subdomain 提取租户标识
    2. 查询 Redis 缓存 / PostgreSQL 获取租户信息
    3. 设置 contextvars 租户上下文
    4. 将租户信息注入到 Request.state 供下游使用
    """

    # 无需租户上下文的路径白名单
    PUBLIC_PATHS = {
        "/api/v1/auth/login",
        "/api/v1/auth/register",
        "/api/v1/health",
        "/api/v1/docs",
        "/api/v1/openapi.json",
    }

    def __init__(self, app: ASGIApp, redis_client=None):
        super().__init__(app)
        self.redis = redis_client

    async def dispatch(self, request: Request, call_next: Callable) -> Response:
        path = request.url.path

        # 白名单路径跳过租户校验
        if any(path.startswith(p) for p in self.PUBLIC_PATHS):
            return await call_next(request)

        # Step 1: 提取租户标识
        tenant_id = await self._extract_tenant_id(request)
        if not tenant_id:
            return Response(
                status_code=401,
                content='{"detail": "未提供租户标识"}',
                media_type="application/json",
            )

        # Step 2: 查询租户信息(优先读 Redis 缓存)
        tenant_info = await self._resolve_tenant(tenant_id)
        if not tenant_info:
            return Response(
                status_code=404,
                content='{"detail": "租户不存在或已停用"}',
                media_type="application/json",
            )

        # Step 3: 设置租户上下文到 contextvars(协程安全)
        tenant_ctx = TenantContext(
            tenant_id=tenant_info["id"],
            schema_name=f"tenant_{tenant_info['id'].replace('-', '_')}",
            tenant_name=tenant_info["name"],
            plan=tenant_info["plan"],
            features=tenant_info.get("features", []),
        )
        token = _tenant_context.set(tenant_ctx)

        try:
            # Step 4: 注入到 request.state 供路由层直接使用
            request.state.tenant = tenant_ctx
            response = await call_next(request)
            return response
        finally:
            # Step 5: 清理上下文,防止请求间泄露
            _tenant_context.reset(token)

    async def _extract_tenant_id(self, request: Request) -> str | None:
        """多策略提取租户 ID,优先级:Header > JWT > Subdomain"""
        # 策略 1: 显式 Header
        if tenant_id := request.headers.get("X-Tenant-ID"):
            return tenant_id

        # 策略 2: JWT Token 中的 tenant_id claim
        if auth_header := request.headers.get("Authorization"):
            if auth_header.startswith("Bearer "):
                token = auth_header[7:]
                payload = decode_jwt(token)  # 内部 JWT 解码函数
                if tenant_id := payload.get("tenant_id"):
                    return tenant_id

        # 策略 3: 子域名(如 acme.kunlun-ai.com → tenant_id=acme)
        host = request.headers.get("Host", "")
        if "." in host:
            subdomain = host.split(".")[0]
            if subdomain and subdomain not in ("www", "api", "app"):
                return await self._resolve_subdomain(subdomain)

        return None

    async def _resolve_tenant(self, tenant_id: str) -> dict | None:
        """查询租户信息,Redis 缓存优先"""
        cache_key = f"tenant:info:{tenant_id}"

        # 尝试从 Redis 缓存读取
        if self.redis:
            cached = await self.redis.hgetall(cache_key)
            if cached:
                return cached

        # 回源到 PostgreSQL 主租户表(public.tenants)
        from app.repositories.tenant import TenantRepository
        tenant = await TenantRepository.get_by_id(tenant_id)

        if tenant and tenant.status == "active":
            # 写入 Redis 缓存,TTL 10 分钟
            if self.redis:
                await self.redis.hset(cache_key, mapping={
                    "id": str(tenant.id),
                    "name": tenant.name,
                    "plan": tenant.plan,
                    "features": ",".join(tenant.features),
                })
                await self.redis.expire(cache_key, 600)
            return {"id": str(tenant.id), "name": tenant.name,
                    "plan": tenant.plan, "features": tenant.features}
        return None

    async def _resolve_subdomain(self, subdomain: str) -> str | None:
        """子域名 → tenant_id 映射"""
        cache_key = f"tenant:subdomain:{subdomain}"
        if self.redis:
            tenant_id = await self.redis.get(cache_key)
            if tenant_id:
                return tenant_id
        # 回源查询...
        return None

4.3 数据库会话层隔离

# ============================================================
# 文件: app/db/session.py
# 描述: 多租户数据库会话管理 — Schema 级隔离
# ============================================================

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

from app.core.config import settings
from app.middleware.tenant import get_current_tenant

# 主引擎(连接池配置)
engine = create_async_engine(
    settings.DATABASE_URL,  # postgresql+asyncpg://user:pass@host:5432/kunlun
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,         # 连接健康检查
    pool_recycle=3600,          # 1 小时回收连接
    echo=settings.SQL_DEBUG,
)

# 基础 Session 工厂(不设 Schema,由中间件动态设置)
_async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


@asynccontextmanager
async def get_tenant_session() -> AsyncGenerator[AsyncSession, None]:
    """
    获取租户隔离的数据库会话

    核心原理:
    1. 从 contextvars 获取当前租户上下文
    2. 执行 SET search_path 将当前连接的搜索路径切换到租户 Schema
    3. 业务代码执行的所有 SQL 都自动限定在该 Schema 内
    4. 会话结束后重置 search_path

    安全性保证:
    - 每个连接在使用前都会显式设置 search_path,不依赖连接池的连接复用状态
    - 使用 asyncpg 的连接级别 search_path 设置,避免 SQL 注入
    """
    tenant = get_current_tenant()
    schema_name = tenant.schema_name  # 例如 "tenant_a1b2c3d4"

    # 校验 Schema 名称合法性(防止 SQL 注入)
    if not schema_name.isidentifier():
        raise ValueError(f"非法的 Schema 名称: {schema_name}")

    async with _async_session_factory() as session:
        try:
            # 设置当前连接的 Schema 搜索路径
            # tenant Schema 优先,public 作为后备(共享配置表等)
            await session.execute(
                text("SET search_path TO :schema, public"),
                {"schema": schema_name},
            )
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            # 重置 search_path(连接归还连接池前清理)
            await session.execute(text("SET search_path TO public"))


# FastAPI 依赖注入适配
async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """FastAPI Depends() 注入用的数据库会话依赖"""
    async with get_tenant_session() as session:
        yield session

4.4 租户 Schema 自动初始化

# ============================================================
# 文件: app/db/tenant_manager.py
# 描述: 租户 Schema 生命周期管理 — 创建/迁移/销毁
# ============================================================

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from alembic.config import Config
from alembic import command


class TenantSchemaManager:
    """管理租户 Schema 的创建、迁移和销毁"""

    def __init__(self, master_session: AsyncSession):
        """
        Args:
            master_session: 具有 superuser 权限的数据库会话(可操作 pg_catalog)
        """
        self.session = master_session

    async def create_tenant_schema(self, tenant_id: str, schema_name: str) -> None:
        """
        创建新租户的完整 Schema 环境

        步骤:
        1. 创建 PostgreSQL Schema
        2. 在该 Schema 下执行全部 Migration(创建表结构)
        3. 初始化默认数据(默认角色、默认配置等)
        4. 创建对应的 Milvus Partition
        5. 创建对应的 MinIO Bucket Prefix
        """
        # 1. 创建 Schema
        await self.session.execute(
            text(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}"')
        )
        await self.session.commit()

        # 2. 在该 Schema 下运行 Alembic Migration
        alembic_cfg = Config("alembic.ini")
        alembic_cfg.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))
        # 自定义 Alembic env.py 使其在指定 Schema 下执行
        alembic_cfg.attributes["target_schema"] = schema_name
        command.upgrade(alembic_cfg, "head")

        # 3. 初始化默认数据
        await self._seed_default_data(schema_name, tenant_id)

        # 4. Milvus 分区创建(通过 Partition Key 自动隔离,此处做元数据记录)
        # Milvus 的 Partition Key 在 Collection 创建时已配置,
        # 此处仅在元数据表记录该租户的 Milvus 配置

        # 5. MinIO 前缀目录创建
        # MinIO 不需要显式创建目录,上传时自动按 Prefix 组织

    async def _seed_default_data(self, schema_name: str, tenant_id: str) -> None:
        """为新租户初始化默认数据"""
        await self.session.execute(
            text("SET search_path TO :schema"), {"schema": schema_name}
        )

        # 创建默认角色
        await self.session.execute(
            text("""
                INSERT INTO roles (id, name, code, description, is_system)
                VALUES
                    (gen_random_uuid(), '超级管理员', 'super_admin', '拥有租户内所有权限', true),
                    (gen_random_uuid(), '管理员', 'admin', '管理应用和知识库', true),
                    (gen_random_uuid(), '开发者', 'developer', '开发智能体和工作流', true),
                    (gen_random_uuid(), '普通用户', 'member', '使用已发布的应用', true)
            """)
        )

        # 创建默认工作空间
        await self.session.execute(
            text("""
                INSERT INTO workspaces (id, name, slug, tenant_id)
                VALUES (gen_random_uuid(), '默认工作空间', 'default', :tenant_id)
            """),
            {"tenant_id": tenant_id},
        )

        # 创建默认模型 Provider 配置
        await self.session.execute(
            text("""
                INSERT INTO model_providers (id, provider_name, is_enabled, tenant_id)
                VALUES (gen_random_uuid(), 'openai', true, :tenant_id)
            """),
            {"tenant_id": tenant_id},
        )

    async def drop_tenant_schema(self, schema_name: str, backup: bool = True) -> None:
        """
        删除租户 Schema(谨慎操作!)

        Args:
            schema_name: 要删除的 Schema 名称
            backup: 是否在删除前进行数据备份
        """
        if backup:
            # 先备份 Schema 数据(pg_dump 指定 Schema)
            await self._backup_schema(schema_name)

        # CASCADE 删除 Schema 及其下所有对象
        await self.session.execute(
            text(f'DROP SCHEMA IF EXISTS "{schema_name}" CASCADE')
        )
        await self.session.commit()

    async def _backup_schema(self, schema_name: str) -> None:
        """备份指定 Schema 的数据到 MinIO"""
        # 调用 pg_dump 备份并上传到 MinIO
        ...

4.5 RBAC 权限模型设计

权限模型采用 租户 → 工作空间 → 角色 → 权限 → 资源 五级层次结构:

权限校验实现

# ============================================================
# 文件: app/core/permissions.py
# 描述: RBAC 权限校验引擎
# ============================================================

from enum import StrEnum
from functools import wraps
from typing import Annotated

from fastapi import Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.session import get_db
from app.middleware.tenant import TenantContext, get_current_tenant


class Action(StrEnum):
    """资源操作类型"""
    CREATE = "create"
    READ = "read"
    UPDATE = "update"
    DELETE = "delete"
    EXECUTE = "execute"   # 执行智能体/工作流
    PUBLISH = "publish"   # 发布应用
    EXPORT = "export"     # 导出数据
    ADMIN = "admin"       # 管理操作


class ResourceType(StrEnum):
    """资源类型"""
    AGENT = "agent"
    WORKFLOW = "workflow"
    KNOWLEDGE_BASE = "knowledge_base"
    DOCUMENT = "document"
    MODEL_CONFIG = "model_config"
    WORKSPACE = "workspace"
    TENANT = "tenant"
    API_KEY = "api_key"


class PermissionCheckResult(BaseModel):
    """权限校验结果"""
    allowed: bool
    reason: str | None = None
    user_id: str
    role_code: str
    resource_type: str
    action: str


class PermissionEngine:
    """RBAC 权限校验引擎"""

    def __init__(self, db: AsyncSession):
        self.db = db
        self._cache: dict[str, set[str]] = {}  # 内存缓存:role_id → permission_keys

    async def check(
        self,
        user_id: str,
        workspace_id: str,
        resource_type: ResourceType,
        action: Action,
        resource_id: str | None = None,
    ) -> PermissionCheckResult:
        """
        执行权限校验

        校验逻辑:
        1. 获取用户在指定工作空间内的角色
        2. 检查角色是否拥有对目标资源类型的指定操作权限
        3. 如果有 resource_id,进一步检查资源级别的 ownership 约束
        """
        tenant = get_current_tenant()

        # 获取用户在该工作空间的角色列表
        roles = await self._get_user_roles(user_id, workspace_id)
        if not roles:
            return PermissionCheckResult(
                allowed=False,
                reason="用户不属于该工作空间",
                user_id=user_id,
                role_code="none",
                resource_type=resource_type.value,
                action=action.value,
            )

        # 检查每个角色的权限
        required_permission = f"{resource_type.value}:{action.value}"

        for role in roles:
            role_permissions = await self._get_role_permissions(role["id"])

            # 超级管理员拥有所有权限
            if role["code"] == "super_admin":
                return PermissionCheckResult(
                    allowed=True,
                    user_id=user_id,
                    role_code=role["code"],
                    resource_type=resource_type.value,
                    action=action.value,
                )

            # 检查是否包含所需权限
            if required_permission in role_permissions:
                # 如果有 resource_id,检查资源所有权约束
                if resource_id and action in (Action.UPDATE, Action.DELETE):
                    if not await self._check_resource_ownership(
                        resource_type, resource_id, user_id, role["code"]
                    ):
                        continue
                return PermissionCheckResult(
                    allowed=True,
                    user_id=user_id,
                    role_code=role["code"],
                    resource_type=resource_type.value,
                    action=action.value,
                )

        return PermissionCheckResult(
            allowed=False,
            reason=f"角色 [{', '.join(r['code'] for r in roles)}] 缺少权限: {required_permission}",
            user_id=user_id,
            role_code=roles[0]["code"] if roles else "none",
            resource_type=resource_type.value,
            action=action.value,
        )

    async def _get_user_roles(self, user_id: str, workspace_id: str) -> list[dict]:
        """获取用户在指定工作空间的角色"""
        result = await self.db.execute(
            text("""
                SELECT r.id, r.code, r.name
                FROM user_roles ur
                JOIN roles r ON ur.role_id = r.id
                WHERE ur.user_id = :user_id
                  AND (ur.workspace_id = :workspace_id OR ur.workspace_id IS NULL)
                ORDER BY r.is_system DESC
            """),
            {"user_id": user_id, "workspace_id": workspace_id},
        )
        return [dict(row._mapping) for row in result.fetchall()]

    async def _get_role_permissions(self, role_id: str) -> set[str]:
        """获取角色的权限集合(带内存缓存)"""
        if role_id in self._cache:
            return self._cache[role_id]

        result = await self.db.execute(
            text("""
                SELECT resource_type || ':' || action AS permission_key
                FROM permissions
                WHERE role_id = :role_id
            """),
            {"role_id": role_id},
        )
        permissions = {row[0] for row in result.fetchall()}
        self._cache[role_id] = permissions
        return permissions

    async def _check_resource_ownership(
        self, resource_type: ResourceType, resource_id: str,
        user_id: str, role_code: str
    ) -> bool:
        """检查资源级别的所有权/可见性约束"""
        # admin 及以上角色可操作所有资源
        if role_code in ("super_admin", "admin"):
            return True

        # 普通用户只能操作自己创建的资源或被授权的资源
        table_name = f"{resource_type.value}s"  # agents, workflows, ...
        result = await self.db.execute(
            text(f"""
                SELECT 1 FROM {table_name}
                WHERE id = :resource_id
                  AND (created_by = :user_id
                       OR id IN (
                           SELECT resource_id FROM resource_shares
                           WHERE shared_with_user_id = :user_id
                       ))
            """),
            {"resource_id": resource_id, "user_id": user_id},
        )
        return result.scalar_one_or_none() is not None


# --- FastAPI 权限依赖注入装饰器 ---

def require_permission(
    resource_type: ResourceType,
    action: Action,
    resource_id_param: str | None = None,
):
    """
    权限校验装饰器 — 用于 FastAPI 路由

    用法:
        @app.delete("/api/v1/agents/{agent_id}")
        async def delete_agent(
            agent_id: str,
            _: None = Depends(require_permission(
                ResourceType.AGENT, Action.DELETE, resource_id_param="agent_id"
            )),
        ):
            ...
    """
    async def dependency(
        request: Request,
        db: AsyncSession = Depends(get_db),
    ) -> None:
        tenant = get_current_tenant()
        user = request.state.user  # 由 AuthMiddleware 设置
        workspace_id = request.headers.get("X-Workspace-ID", "default")

        resource_id = None
        if resource_id_param:
            resource_id = request.path_params.get(resource_id_param)

        engine = PermissionEngine(db)
        result = await engine.check(
            user_id=user.id,
            workspace_id=workspace_id,
            resource_type=resource_type,
            action=action,
            resource_id=resource_id,
        )

        if not result.allowed:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=result.reason,
            )

    return dependency

4.6 多租户架构下的关键设计决策

4.6.1 共享表 vs 租户表

并非所有表都需要在每个租户 Schema 下重复创建。我们区分了全局共享表租户隔离表

表类型所在 Schema示例
全局共享表publictenants(租户主表)、system_configs(系统配置)、global_model_providers(全局模型配置)
租户隔离表tenant_*agents、workflows、knowledge_bases、documents、conversations、messages、users、roles、permissions

4.6.2 Milvus 向量数据隔离

Milvus 通过 Partition Key 实现租户隔离:

  • 在 Collection Schema 中将 tenant_id 字段设置为 is_partition_key=True
  • Milvus 自动按 tenant_id 的哈希值将数据分配到不同的物理分区。
  • 查询时通过 expr="tenant_id == 'xxx'" 过滤条件,Milvus 引擎自动进行分区裁剪(Partition Pruning),只扫描对应分区的数据。
  • 性能上,Partition Key 过滤比标量过滤(普通字段过滤)快一个数量级,因为避免了全量扫描。

4.6.3 Redis 命名空间隔离

Redis 通过 Key 命名前缀实现逻辑隔离:

# 缓存 Key 规范:{业务域}:{租户ID}:{资源类型}:{资源ID}
cache:tenant_a1b2:agent:config:{agent_id}     # 智能体配置缓存
session:tenant_a1b2:{session_id}               # 用户会话
ratelimit:tenant_a1b2:{user_id}:agent:invoke   # API 限流
lock:tenant_a1b2:document:{doc_id}:processing  # 文档处理分布式锁

4.6.4 租户配额与限流

基于 Redis 实现租户级别的资源配额控制:

# ============================================================
# 文件: app/core/quota.py
# 描述: 租户资源配额管理
# ============================================================

from dataclasses import dataclass
from enum import StrEnum


class QuotaType(StrEnum):
    AGENT_COUNT = "agent_count"           # 智能体数量上限
    KNOWLEDGE_BASE_COUNT = "kb_count"     # 知识库数量上限
    DOCUMENT_STORAGE_MB = "doc_storage"   # 文档存储上限 (MB)
    API_CALLS_DAILY = "api_calls_daily"   # 每日 API 调用上限
    TOKEN_USAGE_MONTHLY = "token_monthly" # 每月 Token 用量上限
    WORKSPACE_COUNT = "workspace_count"   # 工作空间数量上限


# 各套餐默认配额
PLAN_QUOTAS: dict[str, dict[QuotaType, int]] = {
    "free": {
        QuotaType.AGENT_COUNT: 3,
        QuotaType.KNOWLEDGE_BASE_COUNT: 2,
        QuotaType.DOCUMENT_STORAGE_MB: 500,
        QuotaType.API_CALLS_DAILY: 1000,
        QuotaType.TOKEN_USAGE_MONTHLY: 1_000_000,
        QuotaType.WORKSPACE_COUNT: 1,
    },
    "standard": {
        QuotaType.AGENT_COUNT: 20,
        QuotaType.KNOWLEDGE_BASE_COUNT: 10,
        QuotaType.DOCUMENT_STORAGE_MB: 10_000,
        QuotaType.API_CALLS_DAILY: 50_000,
        QuotaType.TOKEN_USAGE_MONTHLY: 50_000_000,
        QuotaType.WORKSPACE_COUNT: 5,
    },
    "enterprise": {
        QuotaType.AGENT_COUNT: -1,   # -1 表示无限制
        QuotaType.KNOWLEDGE_BASE_COUNT: -1,
        QuotaType.DOCUMENT_STORAGE_MB: -1,
        QuotaType.API_CALLS_DAILY: -1,
        QuotaType.TOKEN_USAGE_MONTHLY: -1,
        QuotaType.WORKSPACE_COUNT: -1,
    },
}


class QuotaChecker:
    """基于 Redis 的配额检查与计数器"""

    def __init__(self, redis_client):
        self.redis = redis_client

    async def check_and_increment(
        self, tenant_id: str, plan: str, quota_type: QuotaType, increment: int = 1
    ) -> bool:
        """
        检查配额并原子性递增计数器

        使用 Redis Lua 脚本保证 check + increment 的原子性
        """
        limit = PLAN_QUOTAS.get(plan, {}).get(quota_type, 0)
        if limit == -1:
            return True  # 无限制

        # Lua 脚本:原子性检查并递增
        lua_script = """
        local current = tonumber(redis.call('GET', KEYS[1]) or '0')
        local limit = tonumber(ARGV[1])
        local increment = tonumber(ARGV[2])
        local ttl = tonumber(ARGV[3])

        if current + increment > limit then
            return -1  -- 超限
        end

        local new_val = redis.call('INCRBY', KEYS[1], increment)
        if ttl > 0 then
            redis.call('EXPIRE', KEYS[1], ttl)
        end
        return new_val
        """

        # 构造 Key(带过期时间策略)
        if quota_type in (QuotaType.API_CALLS_DAILY,):
            key = f"quota:{tenant_id}:{quota_type.value}:daily"
            ttl = 86400  # 24 小时
        elif quota_type in (QuotaType.TOKEN_USAGE_MONTHLY,):
            key = f"quota:{tenant_id}:{quota_type.value}:monthly"
            ttl = 2_592_000  # 30 天
        else:
            key = f"quota:{tenant_id}:{quota_type.value}"
            ttl = 0  # 不过期(需要手动重置)

        result = await self.redis.eval(lua_script, 1, key, limit, increment, ttl)
        return result != -1

4.7 多租户安全加固

4.7.1 PostgreSQL Row-Level Security(RLS)补充

尽管 Schema 级别的隔离已经提供了强有力的数据隔离,我们在关键的共享查询场景中额外启用 RLS 作为纵深防御:

-- 为共享视图/物化视图启用 RLS
ALTER TABLE public.tenant_usage_stats ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON public.tenant_usage_stats
    USING (tenant_id = current_setting('app.current_tenant_id')::text);

-- 在应用层设置会话变量
SET app.current_tenant_id = 'tenant-a1b2c3d4';

4.7.2 审计日志

所有跨租户的敏感操作(租户创建/删除、权限变更、数据导出)均记录到独立的审计日志表(public.audit_logs,位于 public Schema),包含操作者、操作类型、操作目标、变更前后值、IP 地址、时间戳等完整信息,满足企业合规审计要求。

5. 工作流编排引擎

5.1 概述

极简视界智能体开发平台的工作流编排引擎基于 LangGraph StateGraph 构建,提供可视化、可编排、可追溯的智能体工作流管理能力。引擎将复杂的 AI 应用拆解为若干原子化的节点(Node),通过**边(Edge)定义节点间的流转逻辑,并以统一的状态(State)**对象在节点间传递上下文,实现高度灵活的 DAG(有向无环图)或含环图(Cyclic Graph)编排。

5.2 核心概念

概念说明对应 LangGraph 原语
节点(Node)最小处理单元,封装一个独立的计算逻辑(如 LLM 调用、知识库检索、代码执行等)StateGraph.add_node()
边(Edge)节点之间的流转逻辑,分为静态边和条件边add_edge() / add_conditional_edges()
状态(State)贯穿整个工作流的上下文对象,采用 TypedDict 定义 SchemaStateGraph 的第一个参数
检查点(Checkpoint)工作流执行过程中的状态快照,支持断点恢复与时间旅行Checkpointer
中断(Interrupt)在指定节点暂停工作流,等待人工介入后继续执行interrupt()

5.3 引擎架构

graph TB
    subgraph 工作流编排引擎
        direction TB
        subgraph 编排层["编排层 (Orchestration Layer)"]
            SG[StateGraph 构建器]
            CR[条件路由器]
            PR[并行调度器<br/>Send API]
        end

        subgraph 节点层["节点层 (Node Layer)"]
            LLM[LLM 节点]
            KB[知识库检索节点]
            CB[条件分支节点]
            CE[代码执行节点]
            HR[HTTP 请求节点]
            JD[判断器节点]
        end

        subgraph 基础设施层["基础设施层 (Infrastructure)"]
            CP[Checkpoint<br/>PostgreSQL]
            HITL[Human-in-the-Loop<br/>interrupt]
            TRC[Trace 追踪<br/>LangSmith]
            CFG[配置中心]
        end
    end

    SG --> LLM & KB & CB & CE & HR & JD
    CR --> SG
    PR --> SG
    LLM & KB & CB & CE & HR & JD --> CP
    HITL --> SG
    TRC --> SG

    style SG fill:#4A90D9,color:#fff
    style CR fill:#D4A843,color:#fff
    style PR fill:#D4A843,color:#fff
    style CP fill:#7B68EE,color:#fff
    style HITL fill:#E74C3C,color:#fff
    style TRC fill:#2ECC71,color:#fff

5.4 支持的节点类型

5.4.1 LLM 节点

调用大语言模型进行文本生成、摘要、翻译等任务。支持流式输出与多供应商切换。

5.4.2 知识库检索节点

对接 RAG 引擎,根据用户查询从向量数据库中检索相关文档片段。

5.4.3 条件分支节点

基于运行时状态动态决定下游执行路径,对应 conditional_edges

5.4.4 代码执行节点

在沙箱环境中执行用户自定义 Python 代码片段,用于数据转换、计算等。

5.4.5 HTTP 请求节点

向外部 API 发起 HTTP 请求,支持 GET/POST/PUT/DELETE,可配置超时与重试策略。

5.4.6 判断器节点

基于分类模型或规则引擎对输入进行意图分类、情感分析等判断,输出结构化标签。

5.5 条件路由

条件路由基于 LangGraph 的 add_conditional_edges() 实现。在运行时,引擎根据路由函数的返回值动态选择下一个要执行的节点。

from typing import Literal

def route_by_intent(state: WorkflowState) -> Literal["rag_search", "chitchat", "code_gen"]:
    """根据意图分类结果路由到不同处理节点"""
    intent = state.get("intent", "unknown")
    if intent == "knowledge_qa":
        return "rag_search"
    elif intent == "chitchat":
        return "chitchat"
    elif intent == "code_generation":
        return "code_gen"
    else:
        return "chitchat"

# 注册条件边
graph.add_conditional_edges(
    source="intent_classifier",
    path=route_by_intent,
    path_map={
        "rag_search": "rag_search",
        "chitchat": "chitchat",
        "code_gen": "code_gen",
    },
)

5.6 并行执行(Fan-out / Fan-in)

使用 LangGraph 的 Send API 实现扇出/扇入模式。当一个节点需要并行处理多个子任务时,通过 Send 将每个子任务分发到独立的执行分支,最终汇聚结果。

from langgraph.types import Send
from typing import TypedDict, Annotated
import operator

class WorkflowState(TypedDict):
    query: str
    sub_queries: list[str]
    search_results: Annotated[list[str], operator.add]  # 自动合并
    final_answer: str

def fan_out_to_search(state: WorkflowState) -> list[Send]:
    """将多个子查询并行分发到检索节点"""
    return [
        Send("parallel_search", {"query": sub_q, "search_results": []})
        for sub_q in state["sub_queries"]
    ]

def parallel_search_node(state: dict) -> dict:
    """每个并行分支执行检索"""
    from rag_engine import knowledge_search
    results = knowledge_search(state["query"])
    return {"search_results": results}

# 注册并行边
graph.add_conditional_edges(
    source="query_decomposer",
    path=fan_out_to_search,
)
graph.add_edge("parallel_search", "result_aggregator")

5.7 Checkpoint 机制

基于 PostgreSQL 的 AsyncPostgresSaver 实现持久化检查点,支持工作流的断点恢复、重试以及时间旅行调试。

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

async def create_checkpointer() -> AsyncPostgresSaver:
    """创建基于 PostgreSQL 的异步 Checkpointer"""
    checkpointer = AsyncPostgresSaver.from_conn_string(
        "postgresql+psycopg://langgraph:langgraph@localhost:5432/langgraph_checkpoints",
    )
    await checkpointer.setup()  # 自动建表
    return checkpointer

5.8 Human-in-the-Loop

通过 interrupt() 函数实现人工审批节点。工作流在指定节点暂停并将控制权交还给用户界面,用户确认或修改后恢复执行。

from langgraph.types import interrupt, Command

def human_approval_node(state: WorkflowState) -> WorkflowState:
    """人工审批节点 - 工作流在此暂停等待人工确认"""
    # interrupt() 会暂停工作流并将数据展示给用户
    decision = interrupt(
        {
            "question": "以下回复内容是否可以直接发送给用户?",
            "draft_response": state["draft_response"],
            "confidence": state["confidence"],
        }
    )
    # decision 是用户在前端做出的选择
    if decision["action"] == "approve":
        return {"approved": True, "final_response": state["draft_response"]}
    elif decision["action"] == "edit":
        return {"approved": True, "final_response": decision["edited_response"]}
    else:
        return {"approved": False, "final_response": "抱歉,暂时无法回答您的问题。"}

5.9 完整工作流定义代码示例

以下示例展示一个完整的智能客服工作流:接收用户问题 -> 意图分类 -> 条件路由 -> 知识库检索/闲聊 -> 人工审批 -> 输出结果。

"""
极简视界智能体开发平台 - 工作流编排引擎
完整示例:智能客服工作流(LangGraph 0.4+ StateGraph API)
"""

from __future__ import annotations

import operator
from typing import Annotated, Any, Literal, TypedDict

from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send, interrupt
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver


# ──────────────────────────────────────────────
# 1. 定义工作流状态 Schema
# ──────────────────────────────────────────────
class WorkflowState(TypedDict):
    """贯穿整个工作流的上下文状态"""
    messages: Annotated[list[BaseMessage], operator.add]
    intent: str                         # 意图分类结果
    sub_queries: list[str]              # 拆解后的子查询
    search_results: Annotated[list[str], operator.add]  # 检索结果(并行合并)
    draft_response: str                 # 草稿回复
    confidence: float                   # 置信度
    approved: bool                      # 人工审批结果
    final_response: str                 # 最终回复


# ──────────────────────────────────────────────
# 2. 定义各类节点
# ──────────────────────────────────────────────
llm = ChatOpenAI(model="gpt-4o", temperature=0)


async def intent_classifier_node(state: WorkflowState) -> dict:
    """意图分类节点 - 判断器节点"""
    user_message = state["messages"][-1].content
    prompt = (
        "你是一个意图分类器。请将以下用户问题分类为以下类别之一:\n"
        "- knowledge_qa(知识问答)\n"
        "- chitchat(闲聊)\n"
        "- code_generation(代码生成)\n"
        "- complaint(投诉)\n\n"
        f"用户问题:{user_message}\n\n"
        "只输出类别名称,不要输出其他内容。"
    )
    response = await llm.ainvoke([HumanMessage(content=prompt)])
    intent = response.content.strip().lower()
    return {"intent": intent}


async def query_decomposer_node(state: WorkflowState) -> dict:
    """查询拆解节点 - 将复杂问题拆分为多个子查询"""
    user_message = state["messages"][-1].content
    prompt = (
        "请将以下复杂问题拆解为多个独立的子查询,用于知识库检索。\n"
        "每个子查询一行,不要输出序号或其他格式。\n\n"
        f"原始问题:{user_message}"
    )
    response = await llm.ainvoke([HumanMessage(content=prompt)])
    sub_queries = [q.strip() for q in response.content.strip().split("\n") if q.strip()]
    return {"sub_queries": sub_queries}


def fan_out_search(state: WorkflowState) -> list[Send]:
    """扇出函数 - 为每个子查询创建并行检索任务"""
    return [
        Send("knowledge_search", {"query": q, "search_results": []})
        for q in state.get("sub_queries", [])
    ]


async def knowledge_search_node(state: dict) -> dict:
    """知识库检索节点 - 调用 RAG 引擎"""
    # 此处对接第 7 章的 RAG 知识库引擎
    from rag_engine import hybrid_search

    query = state["query"]
    results = await hybrid_search(query=query, top_k=5)
    formatted = [f"[{i+1}] {r['content']}" for i, r in enumerate(results)]
    return {"search_results": formatted}


async def rag_answer_node(state: WorkflowState) -> dict:
    """RAG 生成节点 - 基于检索结果生成回复"""
    context = "\n\n".join(state.get("search_results", []))
    user_message = state["messages"][-1].content
    prompt = (
        "请基于以下参考资料回答用户问题。如果参考资料中没有相关信息,请如实告知。\n\n"
        f"参考资料:\n{context}\n\n"
        f"用户问题:{user_message}"
    )
    response = await llm.ainvoke([HumanMessage(content=prompt)])
    return {
        "draft_response": response.content,
        "confidence": 0.85,
        "messages": [AIMessage(content=response.content)],
    }


async def chitchat_node(state: WorkflowState) -> dict:
    """闲聊节点"""
    user_message = state["messages"][-1].content
    response = await llm.ainvoke(
        [HumanMessage(content=f"请以友好的方式与用户闲聊。用户说:{user_message}")]
    )
    return {
        "draft_response": response.content,
        "confidence": 0.95,
        "messages": [AIMessage(content=response.content)],
    }


async def code_gen_node(state: WorkflowState) -> dict:
    """代码生成节点"""
    user_message = state["messages"][-1].content
    prompt = (
        "你是一个编程助手。请根据以下需求生成 Python 代码,"
        "包含必要的注释和类型提示。\n\n"
        f"需求:{user_message}"
    )
    response = await llm.ainvoke([HumanMessage(content=prompt)])
    return {
        "draft_response": response.content,
        "confidence": 0.80,
        "messages": [AIMessage(content=response.content)],
    }


async def complaint_handler_node(state: WorkflowState) -> dict:
    """投诉处理节点 - 直接转人工"""
    return {
        "draft_response": "非常抱歉给您带来了不好的体验,已为您转接人工客服。",
        "confidence": 1.0,
        "approved": True,
        "messages": [AIMessage(content="已为您转接人工客服,请稍候。")],
    }


def human_approval_node(state: WorkflowState) -> dict:
    """人工审批节点 - Human-in-the-Loop"""
    decision = interrupt(
        {
            "question": "以下回复内容是否可以发送给用户?",
            "draft_response": state["draft_response"],
            "confidence": state.get("confidence", 0),
            "intent": state.get("intent", "unknown"),
        }
    )
    if decision["action"] == "approve":
        return {"approved": True, "final_response": state["draft_response"]}
    elif decision["action"] == "edit":
        return {"approved": True, "final_response": decision["edited_response"]}
    else:
        return {"approved": False, "final_response": "抱歉,暂时无法回答您的问题。"}


async def finalize_node(state: WorkflowState) -> dict:
    """收尾节点 - 记录日志、更新统计"""
    return {
        "messages": [AIMessage(content=state.get("final_response", ""))]
    }


# ──────────────────────────────────────────────
# 3. 条件路由函数
# ──────────────────────────────────────────────
def route_by_intent(state: WorkflowState) -> Literal[
    "query_decomposer", "chitchat", "code_gen", "complaint_handler"
]:
    """基于意图分类结果进行条件路由"""
    intent = state.get("intent", "chitchat")
    mapping = {
        "knowledge_qa": "query_decomposer",
        "chitchat": "chitchat",
        "code_generation": "code_gen",
        "complaint": "complaint_handler",
    }
    return mapping.get(intent, "chitchat")


def route_after_search(state: WorkflowState) -> Literal["rag_answer"]:
    """并行检索完成后汇聚到生成节点"""
    return "rag_answer"


def route_to_end(state: WorkflowState) -> Literal["finalize", END]:
    """审批后决定是输出还是终止"""
    if state.get("approved"):
        return "finalize"
    return END


# ──────────────────────────────────────────────
# 4. 构建工作流图
# ──────────────────────────────────────────────
def build_customer_service_graph() -> StateGraph:
    """构建智能客服工作流"""
    graph = StateGraph(WorkflowState)

    # 注册所有节点
    graph.add_node("intent_classifier", intent_classifier_node)
    graph.add_node("query_decomposer", query_decomposer_node)
    graph.add_node("knowledge_search", knowledge_search_node)
    graph.add_node("rag_answer", rag_answer_node)
    graph.add_node("chitchat", chitchat_node)
    graph.add_node("code_gen", code_gen_node)
    graph.add_node("complaint_handler", complaint_handler_node)
    graph.add_node("human_approval", human_approval_node)
    graph.add_node("finalize", finalize_node)

    # 入口边
    graph.add_edge(START, "intent_classifier")

    # 条件路由:根据意图分发到不同处理分支
    graph.add_conditional_edges(
        source="intent_classifier",
        path=route_by_intent,
        path_map={
            "query_decomposer": "query_decomposer",
            "chitchat": "chitchat",
            "code_gen": "code_gen",
            "complaint_handler": "complaint_handler",
        },
    )

    # 知识问答分支:拆解 -> 并行检索 -> 汇聚生成
    graph.add_edge("query_decomposer", "knowledge_search")  # 触发 fan-out
    graph.add_conditional_edges(
        source="query_decomposer",
        path=fan_out_search,
    )
    graph.add_edge("knowledge_search", "rag_answer")

    # 所有处理节点 -> 人工审批
    graph.add_edge("rag_answer", "human_approval")
    graph.add_edge("chitchat", "human_approval")
    graph.add_edge("code_gen", "human_approval")
    # 投诉处理直接到结束(已自动转人工)
    graph.add_edge("complaint_handler", END)

    # 审批后路由
    graph.add_conditional_edges(
        source="human_approval",
        path=route_to_end,
        path_map={"finalize": "finalize", END: END},
    )

    # 收尾 -> 结束
    graph.add_edge("finalize", END)

    return graph


# ──────────────────────────────────────────────
# 5. 编译并运行
# ──────────────────────────────────────────────
async def run_workflow(user_input: str, thread_id: str = "default"):
    """编译工作流并执行"""
    # 创建 Checkpointer
    checkpointer = AsyncPostgresSaver.from_conn_string(
        "postgresql+psycopg://langgraph:langgraph@localhost:5432/langgraph_checkpoints"
    )
    checkpointer.setup()

    # 构建并编译
    builder = build_customer_service_graph()
    app = builder.compile(checkpointer=checkpointer)

    # 执行配置
    config = {"configurable": {"thread_id": thread_id}}

    # 初始状态
    initial_state: WorkflowState = {
        "messages": [HumanMessage(content=user_input)],
        "intent": "",
        "sub_queries": [],
        "search_results": [],
        "draft_response": "",
        "confidence": 0.0,
        "approved": False,
        "final_response": "",
    }

    # 流式执行
    async for event in app.astream(initial_state, config, stream_mode="updates"):
        for node_name, node_output in event.items():
            print(f"[{node_name}] => {node_output}")

    return app


# ──────────────────────────────────────────────
# 6. 恢复中断的工作流(Human-in-the-Loop)
# ──────────────────────────────────────────────
async def resume_workflow(app, thread_id: str, user_decision: dict):
    """在人工审批后恢复工作流执行"""
    from langgraph.types import Command

    config = {"configurable": {"thread_id": thread_id}}

    # 通过 Command 传递用户决策并恢复执行
    async for event in app.astream(
        Command(resume=user_decision),
        config,
        stream_mode="updates",
    ):
        for node_name, node_output in event.items():
            print(f"[{node_name}] => {node_output}")

5.10 工作流引擎执行流程

sequenceDiagram
    participant U as 用户
    participant API as API 网关
    participant WE as 工作流引擎
    participant SG as StateGraph
    participant CP as Checkpoint<br/>(PostgreSQL)
    participant HITL as 人工审批

    U->>API: 提交请求 (消息 + thread_id)
    API->>WE: 调用 run_workflow()
    WE->>SG: compile() 编译工作流图

    loop 逐节点执行
        SG->>SG: 执行当前节点 (Node)
        SG->>CP: 保存 Checkpoint
        CP-->>SG: 确认持久化

        alt 条件路由
            SG->>SG: route_by_intent() 动态分支
        else 并行执行
            SG->>SG: Send() 扇出多个并行节点
            SG->>SG: fan-in 汇聚结果
        end
    end

    alt 遇到 interrupt()
        SG->>CP: 保存中断状态
        SG-->>WE: 暂停执行
        WE->>HITL: 推送审批请求
        HITL->>WE: 返回审批决策
        WE->>SG: Command(resume=decision)
        SG->>SG: 继续执行后续节点
    end

    SG-->>WE: 工作流完成
    WE-->>API: 返回最终结果
    API-->>U: 响应输出

5.11 工作流拓扑图

flowchart TD
    START((START)) --> IC[意图分类节点<br/>intent_classifier]

    IC -->|knowledge_qa| QD[查询拆解节点<br/>query_decomposer]
    IC -->|chitchat| CC[闲聊节点<br/>chitchat]
    IC -->|code_generation| CG[代码生成节点<br/>code_gen]
    IC -->|complaint| CH[投诉处理节点<br/>complaint_handler]

    QD -->|Send 并行分发| KS1[知识检索 #1]
    QD -->|Send 并行分发| KS2[知识检索 #2]
    QD -->|Send 并行分发| KS3[知识检索 #N]

    KS1 --> RA[RAG 生成节点<br/>rag_answer]
    KS2 --> RA
    KS3 --> RA

    RA --> HA{人工审批节点<br/>human_approval}
    CC --> HA
    CG --> HA

    HA -->|approve| FZ[收尾节点<br/>finalize]
    HA -->|reject| END_REJ((END))

    FZ --> END_OK((END))
    CH --> END_CH((END))

    style START fill:#2ECC71,color:#fff
    style END_OK fill:#E74C3C,color:#fff
    style END_REJ fill:#E74C3C,color:#fff
    style END_CH fill:#E74C3C,color:#fff
    style HA fill:#F39C12,color:#fff
    style IC fill:#3498DB,color:#fff

6. 对话引擎

6.1 概述

对话引擎是极简视界智能体开发平台的核心执行层,负责将用户输入转化为大模型调用并返回结构化响应。引擎基于 LangChain v1.2LangGraph 构建,采用 LCEL(LangChain Expression Language) 实现声明式的链式调用,支持多供应商模型热切换、提示词模板管理、多轮对话上下文管理、流式输出以及 Tool Calling 等关键能力。

6.2 引擎架构

graph TB
    subgraph 对话引擎["对话引擎 (Conversation Engine)"]
        direction TB

        subgraph 接入层["接入层"]
            SSE[SSE 流式网关]
            WS[WebSocket 适配器]
            REST[REST API]
        end

        subgraph 核心层["核心层"]
            LCEL[LCEL 链式管道]
            PM[提示词模板管理器<br/>Jinja2]
            MS[模型选择器<br/>LLMProvider]
            CM[上下文管理器<br/>Sliding Window]
            TC[Tool Calling<br/>调度器]
        end

        subgraph 供应商层["供应商层 (20+ LLM Providers)"]
            P1[OpenAI / GPT-4o]
            P2[Anthropic / Claude]
            P3[百度 / 文心一言]
            P4[阿里 / 通义千问]
            P5[智谱 / GLM-4]
            P6[DeepSeek]
            P7[Moonshot / Kimi]
            PN[...更多供应商]
        end

        subgraph 存储层["存储层"]
            Redis[(Redis<br/>会话存储)]
            PG[(PostgreSQL<br/>持久化)]
        end
    end

    SSE & WS & REST --> LCEL
    LCEL --> PM & MS & CM & TC
    MS --> P1 & P2 & P3 & P4 & P5 & P6 & P7 & PN
    CM --> Redis
    TC --> LCEL

    style LCEL fill:#4A90D9,color:#fff
    style MS fill:#D4A843,color:#fff
    style Redis fill:#E74C3C,color:#fff

6.3 LCEL 链式调用

LCEL(LangChain Expression Language)是 LangChain 的核心编排语法,通过管道符 | 将多个组件串联为一条声明式管道。

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 声明式 LCEL 管道
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是极简视界智能助手,请用专业且友好的语气回答问题。"),
    ("placeholder", "{chat_history}"),
    ("human", "{input}"),
])

llm = ChatOpenAI(model="gpt-4o", temperature=0.7, streaming=True)

# 管道组装:prompt -> llm -> parser
chain = prompt | llm | StrOutputParser()

6.4 模型选择与切换(LLMProvider)

通过统一的 LLMProvider 抽象接口,平台支持 20+ 供应商的无缝切换。运行时可根据租户配置、负载均衡策略或模型能力动态选择供应商。

from __future__ import annotations

from abc import ABC, abstractmethod
from enum import Enum
from typing import Any

from langchain_core.language_models import BaseChatModel


class ModelProvider(str, Enum):
    """支持的模型供应商枚举"""
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    BAIDU_WENXIN = "baidu_wenxin"
    ALIBABA_QWEN = "alibaba_qwen"
    ZHIPU_GLM = "zhipu_glm"
    DEEPSEEK = "deepseek"
    MOONSHOT = "moonshot"
    BYTEDANCE_DOUBAO = "bytedance_doubao"
    # ... 更多供应商


class LLMProvider(ABC):
    """LLM 供应商抽象接口"""

    @abstractmethod
    def create_chat_model(
        self,
        model_name: str,
        temperature: float = 0.7,
        max_tokens: int | None = None,
        streaming: bool = True,
        **kwargs: Any,
    ) -> BaseChatModel:
        """创建对应供应商的 ChatModel 实例"""
        ...

    @abstractmethod
    def get_available_models(self) -> list[str]:
        """获取该供应商可用的模型列表"""
        ...


class OpenAIProvider(LLMProvider):
    def create_chat_model(self, model_name: str, temperature: float = 0.7,
                          max_tokens: int | None = None, streaming: bool = True,
                          **kwargs) -> BaseChatModel:
        from langchain_openai import ChatOpenAI
        return ChatOpenAI(
            model=model_name,
            temperature=temperature,
            max_tokens=max_tokens,
            streaming=streaming,
            **kwargs,
        )

    def get_available_models(self) -> list[str]:
        return ["gpt-4o", "gpt-4o-mini", "gpt-4.1", "o3-mini"]


class AnthropicProvider(LLMProvider):
    def create_chat_model(self, model_name: str, temperature: float = 0.7,
                          max_tokens: int | None = None, streaming: bool = True,
                          **kwargs) -> BaseChatModel:
        from langchain_anthropic import ChatAnthropic
        return ChatAnthropic(
            model=model_name,
            temperature=temperature,
            max_tokens=max_tokens or 4096,
            streaming=streaming,
            **kwargs,
        )

    def get_available_models(self) -> list[str]:
        return ["claude-sonnet-4-20250514", "claude-3-5-sonnet-20241022", "claude-3-haiku-20240307"]


class ZhipuGLMProvider(LLMProvider):
    def create_chat_model(self, model_name: str, temperature: float = 0.7,
                          max_tokens: int | None = None, streaming: bool = True,
                          **kwargs) -> BaseChatModel:
        from langchain_community.chat_models import ChatZhipuAI
        return ChatZhipuAI(
            model=model_name,
            temperature=temperature,
            max_tokens=max_tokens,
            streaming=streaming,
            **kwargs,
        )

    def get_available_models(self) -> list[str]:
        return ["glm-4-plus", "glm-4-air", "glm-4-flash"]


# ──────────────────────────────────────────────
# 模型工厂 - 统一管理所有供应商
# ──────────────────────────────────────────────
class LLMFactory:
    """LLM 模型工厂 - 统一管理和创建模型实例"""

    _providers: dict[ModelProvider, LLMProvider] = {}

    @classmethod
    def register_provider(cls, provider: ModelProvider, impl: LLMProvider) -> None:
        cls._providers[provider] = impl

    @classmethod
    def create_model(
        cls,
        provider: ModelProvider | str,
        model_name: str,
        **kwargs,
    ) -> BaseChatModel:
        """根据供应商和模型名称创建 ChatModel"""
        if isinstance(provider, str):
            provider = ModelProvider(provider)
        prov = cls._providers.get(provider)
        if not prov:
            raise ValueError(f"未注册的模型供应商: {provider}")
        return prov.create_chat_model(model_name=model_name, **kwargs)


# 初始化注册
LLMFactory.register_provider(ModelProvider.OPENAI, OpenAIProvider())
LLMFactory.register_provider(ModelProvider.ANTHROPIC, AnthropicProvider())
LLMFactory.register_provider(ModelProvider.ZHIPU_GLM, ZhipuGLMProvider())

6.5 提示词模板管理

使用 Jinja2 模板引擎实现提示词的参数化管理,支持变量注入、条件渲染、循环、模板继承等高级特性。

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

from jinja2 import Environment, BaseLoader, StrictUndefined


@dataclass
class PromptTemplate:
    """提示词模板定义"""
    name: str
    template: str
    description: str = ""
    variables: list[str] = field(default_factory=list)
    version: str = "1.0"


class PromptManager:
    """提示词模板管理器"""

    def __init__(self):
        self._env = Environment(loader=BaseLoader(), undefined=StrictUndefined)
        self._templates: dict[str, PromptTemplate] = {}

    def register(self, tmpl: PromptTemplate) -> None:
        """注册模板"""
        self._templates[tmpl.name] = tmpl

    def render(self, name: str, variables: dict[str, Any]) -> str:
        """渲染模板"""
        tmpl = self._templates.get(name)
        if not tmpl:
            raise KeyError(f"未找到提示词模板: {name}")
        jinja_tmpl = self._env.from_string(tmpl.template)
        return jinja_tmpl.render(**variables)

    def get_template(self, name: str) -> PromptTemplate | None:
        return self._templates.get(name)

    def list_templates(self) -> list[str]:
        return list(self._templates.keys())


# ──────────────────────────────────────────────
# 预置模板示例
# ──────────────────────────────────────────────
RAG_QA_TEMPLATE = PromptTemplate(
    name="rag_qa",
    template="""你是极简视界智能助手。请基于以下参考资料回答用户问题。

## 参考资料
{% for doc in context_documents %}
### 文档 {{ loop.index }}(相关度: {{ doc.score }})
{{ doc.content }}

{% endfor %}

## 要求
1. 优先使用参考资料中的信息
2. 如果参考资料不足以回答,请明确告知用户
3. 在回答中标注引用来源(使用 [文档N] 格式)
{% if answer_style %}
4. 回答风格:{{ answer_style }}
{% endif %}

## 用户问题
{{ user_query }}""",
    description="RAG 问答模板 - 支持引用标注",
    variables=["context_documents", "user_query", "answer_style"],
)

INTENT_CLASSIFY_TEMPLATE = PromptTemplate(
    name="intent_classify",
    template="""你是一个意图分类器。请将用户消息分类为以下类别之一:
{% for intent in intent_labels %}
- {{ intent.name }}:{{ intent.description }}
{% endfor %}

用户消息:{{ user_message }}

请只输出类别名称({{ intent_labels | map(attribute='name') | join(' / ') }}),不要输出其他内容。""",
    description="意图分类模板",
    variables=["intent_labels", "user_message"],
)

6.6 多轮对话上下文管理

基于 Redis 的会话存储,采用 Sliding Window(滑动窗口) 策略管理多轮对话上下文,兼顾上下文连贯性和 Token 成本控制。

from __future__ import annotations

import json
import time
from typing import Any

import redis.asyncio as aioredis
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage


class ConversationManager:
    """多轮对话上下文管理器

    策略:
    1. Sliding Window - 保留最近 N 轮对话
    2. Token Budget - 在 Token 预算内尽量保留更多历史
    3. Summary Fallback - 超出窗口的历史由 LLM 摘要替代
    """

    def __init__(
        self,
        redis_client: aioredis.Redis,
        max_turns: int = 20,
        max_tokens: int = 8000,
        summary_threshold: int = 30,
    ):
        self.redis = redis_client
        self.max_turns = max_turns
        self.max_tokens = max_tokens
        self.summary_threshold = summary_threshold

    def _session_key(self, session_id: str) -> str:
        return f"conversation:{session_id}:messages"

    def _summary_key(self, session_id: str) -> str:
        return f"conversation:{session_id}:summary"

    def _meta_key(self, session_id: str) -> str:
        return f"conversation:{session_id}:meta"

    async def add_message(self, session_id: str, message: BaseMessage) -> None:
        """添加消息到会话"""
        key = self._session_key(session_id)
        msg_data = {
            "role": message.type,
            "content": message.content,
            "timestamp": time.time(),
            "additional_kwargs": message.additional_kwargs,
        }
        await self.redis.rpush(key, json.dumps(msg_data, ensure_ascii=False))
        # 设置 24 小时过期
        await self.redis.expire(key, 86400)
        # 更新元信息
        await self.redis.hincrby(self._meta_key(session_id), "turn_count", 1)

    async def get_history(
        self, session_id: str, strategy: str = "sliding_window"
    ) -> list[BaseMessage]:
        """获取对话历史,应用上下文管理策略"""
        key = self._session_key(session_id)
        total = await self.redis.llen(key)

        if strategy == "sliding_window":
            # 滑动窗口:取最近 max_turns * 2 条消息(human + ai 交替)
            start = max(0, total - self.max_turns * 2)
            raw_messages = await self.redis.lrange(key, start, -1)
        else:
            raw_messages = await self.redis.lrange(key, 0, -1)

        messages = [json.loads(m) for m in raw_messages]
        result: list[BaseMessage] = []

        # 如果有历史摘要,先加入摘要
        summary = await self.redis.get(self._summary_key(session_id))
        if summary:
            result.append(SystemMessage(
                content=f"[历史对话摘要] {summary.decode('utf-8')}"
            ))

        # 转换为 LangChain Message 对象
        for msg in messages:
            if msg["role"] == "human":
                result.append(HumanMessage(content=msg["content"]))
            elif msg["role"] == "ai":
                result.append(AIMessage(
                    content=msg["content"],
                    additional_kwargs=msg.get("additional_kwargs", {}),
                ))
            elif msg["role"] == "system":
                result.append(SystemMessage(content=msg["content"]))

        return result

    async def maybe_summarize(self, session_id: str, llm: Any) -> None:
        """当消息数量超过阈值时,对较早的消息进行摘要"""
        key = self._session_key(session_id)
        total = await self.redis.llen(key)

        if total <= self.summary_threshold * 2:
            return

        # 取出需要摘要的早期消息(保留最近 max_turns 轮)
        keep_start = total - self.max_turns * 2
        old_messages = await self.redis.lrange(key, 0, keep_start - 1)

        if not old_messages:
            return

        # 拼接并调用摘要
        text = "\n".join(
            json.loads(m)["content"] for m in old_messages
        )
        summary_prompt = (
            f"请将以下对话历史压缩为一段简洁的摘要(不超过 500 字),"
            f"保留关键信息和上下文脉络:\n\n{text}"
        )
        from langchain_core.messages import HumanMessage as HM
        summary_response = await llm.ainvoke([HM(content=summary_prompt)])

        # 保存摘要并裁剪旧消息
        await self.redis.set(self._summary_key(session_id), summary_response.content)
        await self.redis.ltrim(key, keep_start, -1)

    async def clear_session(self, session_id: str) -> None:
        """清空会话"""
        for suffix in ["messages", "summary", "meta"]:
            await self.redis.delete(f"conversation:{session_id}:{suffix}")

6.7 流式输出(SSE + Async Generator)

基于 Server-Sent Events 协议实现实时流式输出,后端使用 async generator 逐 token 推送。

from __future__ import annotations

import asyncio
import json
from typing import AsyncGenerator

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
from pydantic import BaseModel

app = FastAPI()


class ChatRequest(BaseModel):
    session_id: str
    message: str
    model_provider: str = "openai"
    model_name: str = "gpt-4o"
    temperature: float = 0.7
    max_tokens: int | None = None


async def stream_chat_response(
    request: ChatRequest,
    conversation_manager: ConversationManager,
    llm_factory: LLMFactory,
) -> AsyncGenerator[str, None]:
    """异步流式生成器 - 逐 token 推送 SSE 事件"""

    # 创建模型实例
    llm = llm_factory.create_model(
        provider=request.model_provider,
        model_name=request.model_name,
        temperature=request.temperature,
        max_tokens=request.max_tokens,
        streaming=True,
    )

    # 获取对话历史
    history = await conversation_manager.get_history(request.session_id)

    # 添加当前用户消息
    user_msg = HumanMessage(content=request.message)
    await conversation_manager.add_message(request.session_id, user_msg)
    history.append(user_msg)

    # 流式调用 LLM
    full_response = ""
    async for chunk in llm.astream(history):
        token = chunk.content
        if token:
            full_response += token
            # 构造 SSE 事件
            sse_data = json.dumps(
                {"type": "token", "content": token},
                ensure_ascii=False,
            )
            yield f"data: {sse_data}\n\n"

    # 发送结束事件
    yield f"data: {json.dumps({'type': 'done', 'content': ''})}\n\n"

    # 保存完整响应到会话历史
    from langchain_core.messages import AIMessage
    ai_msg = AIMessage(content=full_response)
    await conversation_manager.add_message(request.session_id, ai_msg)

    # 检查是否需要摘要
    await conversation_manager.maybe_summarize(request.session_id, llm)


@app.post("/api/v1/chat/stream")
async def chat_stream(request: ChatRequest):
    """SSE 流式对话接口"""
    return StreamingResponse(
        stream_chat_response(request, conversation_manager, llm_factory),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Nginx 不缓冲
        },
    )

6.8 Tool Calling

基于 LangChain 的 bind_tools 机制实现工具自动注册与调度,支持 LLM 自主决定何时调用工具、调用哪个工具。

from __future__ import annotations

from typing import Any

from langchain_core.tools import tool
from langchain_core.messages import ToolMessage
from langchain_core.language_models import BaseChatModel


# ──────────────────────────────────────────────
# 1. 定义工具(使用 @tool 装饰器自动注册)
# ──────────────────────────────────────────────
@tool
def search_knowledge_base(query: str, top_k: int = 5) -> str:
    """搜索知识库,返回与查询最相关的文档片段。
    适用于需要查询事实性信息、产品文档、FAQ 等场景。

    Args:
        query: 搜索查询关键词
        top_k: 返回结果数量,默认 5
    """
    # 对接 RAG 引擎
    from rag_engine import hybrid_search
    results = hybrid_search(query=query, top_k=top_k)
    return "\n\n".join(f"[{i+1}] {r['content']}" for i, r in enumerate(results))


@tool
def get_current_weather(city: str) -> str:
    """获取指定城市的实时天气信息。

    Args:
        city: 城市名称,如 "北京"、"上海"
    """
    import httpx
    resp = httpx.get(f"https://api.weather.example.com/v1/current?city={city}")
    return resp.json()["description"]


@tool
def execute_python_code(code: str) -> str:
    """在安全沙箱中执行 Python 代码并返回输出结果。

    Args:
        code: 要执行的 Python 代码字符串
    """
    from sandbox_executor import SafeExecutor
    executor = SafeExecutor(timeout=10, memory_limit_mb=256)
    result = executor.run(code)
    return result.output


# ──────────────────────────────────────────────
# 2. 工具注册表
# ──────────────────────────────────────────────
class ToolRegistry:
    """工具注册表 - 统一管理所有可用工具"""

    _tools: dict[str, Any] = {}

    @classmethod
    def register(cls, tool_fn: Any) -> None:
        cls._tools[tool_fn.name] = tool_fn

    @classmethod
    def get_tools(cls) -> list[Any]:
        return list(cls._tools.values())

    @classmethod
    def get_tool(cls, name: str) -> Any | None:
        return cls._tools.get(name)


# 批量注册
for t in [search_knowledge_base, get_current_weather, execute_python_code]:
    ToolRegistry.register(t)


# ──────────────────────────────────────────────
# 3. Tool Calling 执行循环
# ──────────────────────────────────────────────
async def run_with_tools(
    llm: BaseChatModel,
    messages: list,
    tools: list | None = None,
    max_iterations: int = 5,
) -> str:
    """带 Tool Calling 的对话执行循环

    LLM 可多轮调用工具,直到给出最终文本回复。
    """
    if tools is None:
        tools = ToolRegistry.get_tools()

    # 绑定工具到模型
    llm_with_tools = llm.bind_tools(tools)

    for iteration in range(max_iterations):
        response = await llm_with_tools.ainvoke(messages)
        messages.append(response)

        # 如果没有工具调用,说明 LLM 已给出最终回复
        if not response.tool_calls:
            return response.content

        # 执行所有工具调用
        for tool_call in response.tool_calls:
            tool_fn = ToolRegistry.get_tool(tool_call["name"])
            if tool_fn:
                try:
                    result = await tool_fn.ainvoke(tool_call["args"])
                except Exception as e:
                    result = f"工具执行出错: {str(e)}"
            else:
                result = f"未找到工具: {tool_call['name']}"

            messages.append(ToolMessage(
                content=str(result),
                tool_call_id=tool_call["id"],
            ))

    return "达到最大工具调用次数,请简化问题后重试。"

6.9 对话引擎核心流程

sequenceDiagram
    participant U as 用户
    participant API as SSE 网关
    participant CE as 对话引擎
    participant PM as 提示词管理器
    participant CM as 上下文管理器
    participant MS as 模型选择器
    participant LLM as 大模型
    participant TR as 工具注册表
    participant Redis as Redis

    U->>API: POST /chat/stream
    API->>CE: 接收请求

    CE->>CM: get_history(session_id)
    CM->>Redis: LRANGE 获取历史消息
    Redis-->>CM: 历史消息列表
    CM-->>CE: List[BaseMessage]

    CE->>PM: render("rag_qa", variables)
    PM-->>CE: 渲染后的系统提示词

    CE->>MS: create_model(provider, model_name)
    MS-->>CE: ChatModel 实例

    CE->>LLM: bind_tools(tools)
    CE->>LLM: astream(messages)

    loop Tool Calling 循环
        LLM-->>CE: AIMessage(tool_calls=[...])
        CE->>TR: 执行工具调用
        TR-->>CE: 工具返回结果
        CE->>LLM: astream(messages + ToolMessage)
    end

    LLM-->>CE: 最终文本回复(流式 tokens)

    loop SSE 推送
        CE-->>API: data: {"type":"token","content":"..."}
        API-->>U: SSE 事件
    end

    CE->>CM: add_message(ai_response)
    CM->>Redis: RPUSH 保存消息
    CE-->>API: data: {"type":"done"}
    API-->>U: 流结束

6.10 对话引擎完整编排代码

"""
极简视界智能体开发平台 - 对话引擎
完整编排示例:整合 LCEL + 上下文管理 + Tool Calling + 流式输出
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, AsyncGenerator

from langchain_core.language_models import BaseChatModel
from langchain_core.messages import (
    AIMessage, BaseMessage, HumanMessage, SystemMessage, ToolMessage,
)
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder


@dataclass
class ConversationEngine:
    """对话引擎 - 核心编排类"""

    llm_factory: LLMFactory
    prompt_manager: PromptManager
    conversation_manager: ConversationManager
    tool_registry: ToolRegistry

    async def chat_stream(
        self,
        session_id: str,
        user_input: str,
        provider: str = "openai",
        model_name: str = "gpt-4o",
        temperature: float = 0.7,
        max_tokens: int | None = None,
        system_prompt_template: str | None = None,
        enable_tools: bool = True,
    ) -> AsyncGenerator[str, None]:
        """完整的对话流式接口"""

        # 1. 创建模型
        llm = self.llm_factory.create_model(
            provider=provider,
            model_name=model_name,
            temperature=temperature,
            max_tokens=max_tokens,
            streaming=True,
        )

        # 2. 获取对话历史
        history = await self.conversation_manager.get_history(session_id)

        # 3. 保存用户消息
        user_msg = HumanMessage(content=user_input)
        await self.conversation_manager.add_message(session_id, user_msg)

        # 4. 构建消息列表
        messages: list[BaseMessage] = []

        # 系统提示词
        if system_prompt_template:
            rendered = self.prompt_manager.render(system_prompt_template, {
                "user_name": "用户",
                "platform_name": "极简视界",
            })
            messages.append(SystemMessage(content=rendered))

        messages.extend(history)
        messages.append(user_msg)

        # 5. Tool Calling 循环
        tools = self.tool_registry.get_tools() if enable_tools else []
        if tools:
            llm = llm.bind_tools(tools)

        max_iterations = 5
        final_content = ""

        for _ in range(max_iterations):
            response = await llm.ainvoke(messages)
            messages.append(response)

            if not response.tool_calls:
                # 没有工具调用 → 最终回复
                final_content = response.content
                break

            # 执行工具
            for tc in response.tool_calls:
                tool_fn = self.tool_registry.get_tool(tc["name"])
                try:
                    result = await tool_fn.ainvoke(tc["args"]) if tool_fn else "工具不存在"
                except Exception as e:
                    result = f"执行失败: {e}"
                messages.append(ToolMessage(content=str(result), tool_call_id=tc["id"]))
        else:
            final_content = "处理超时,请简化问题后重试。"

        # 6. 流式输出最终回复(二次流式化)
        # 在实际生产环境中,步骤 5 可直接使用 astream 实现逐 token 输出
        # 此处为演示 Tool Calling 循环,先完整获取再流式推送
        for char in final_content:
            import json
            yield f"data: {json.dumps({'type': 'token', 'content': char}, ensure_ascii=False)}\n\n"

        yield f"data: {json.dumps({'type': 'done'})}\n\n"

        # 7. 保存 AI 回复
        ai_msg = AIMessage(content=final_content)
        await self.conversation_manager.add_message(session_id, ai_msg)

        # 8. 检查是否需要摘要压缩
        await self.conversation_manager.maybe_summarize(session_id, llm)

7. RAG 知识库引擎

7.1 概述

RAG(Retrieval-Augmented Generation)知识库引擎是极简视界智能体开发平台的"知识底座",为智能体的回答提供事实性支撑。引擎覆盖从文档解析、文本分块、向量化存储到混合检索、重排序的完整 Pipeline,支持多种文档格式、多种 Embedding 模型和向量数据库,并针对多租户场景做了严格的隔离设计。

7.2 RAG Pipeline 架构

flowchart LR
    subgraph 文档解析层["文档解析层"]
        PDF[PDF]
        DOCX[Word/DOCX]
        MD[Markdown]
        HTML[HTML]
        CSV[CSV/Excel]
    end

    subgraph 分块层["文本分块层"]
        RC[RecursiveCharacter<br/>TextSplitter]
        SC[语义分块<br/>Semantic Chunking]
        OW[重叠窗口<br/>Overlap]
    end

    subgraph 向量化层["向量化层"]
        BGE[BGE-M3]
        M3E[M3E]
        TE3[text-embedding-3]
        CUSTOM[自定义模型]
    end

    subgraph 存储层["向量存储层"]
        MILVUS[(Milvus 2.6<br/>多租户隔离)]
        META[(PostgreSQL<br/>元数据)]
    end

    subgraph 检索层["检索层"]
        VR[向量检索<br/>ANN]
        BM[BM25 关键词检索]
        RRF[RRF 融合排序]
        RK[Rerank<br/>BGE-Reranker]
        MF[元数据过滤]
    end

    PDF & DOCX & MD & HTML & CSV --> RC & SC
    RC & SC --> OW
    OW --> BGE & M3E & TE3 & CUSTOM
    BGE & M3E & TE3 & CUSTOM --> MILVUS
    RC & SC --> META

    VR & BM --> RRF
    RRF --> RK
    RK --> MF
    MILVUS --> VR
    MILVUS --> BM
    META --> MF

7.3 文档解析层

使用 unstructured 库统一处理多种文档格式,提供标准化的文档对象。

from __future__ import annotations

import os
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any


class DocumentFormat(str, Enum):
    PDF = "pdf"
    DOCX = "docx"
    MARKDOWN = "markdown"
    HTML = "html"
    CSV = "csv"
    TXT = "txt"


@dataclass
class Document:
    """统一文档对象"""
    content: str
    metadata: dict[str, Any] = field(default_factory=dict)
    format: DocumentFormat = DocumentFormat.TXT
    source: str = ""


class DocumentParser:
    """文档解析器 - 支持多格式统一解析"""

    def parse(self, file_path: str | Path) -> list[Document]:
        """解析文档,返回标准化的 Document 列表"""
        path = Path(file_path)
        suffix = path.suffix.lower().lstrip(".")

        parsers = {
            "pdf": self._parse_pdf,
            "docx": self._parse_docx,
            "md": self._parse_markdown,
            "html": self._parse_html,
            "htm": self._parse_html,
            "csv": self._parse_csv,
            "txt": self._parse_txt,
        }

        parser_fn = parsers.get(suffix)
        if not parser_fn:
            raise ValueError(f"不支持的文档格式: {suffix}")

        return parser_fn(path)

    def _parse_pdf(self, path: Path) -> list[Document]:
        """解析 PDF 文档"""
        from unstructured.partition.pdf import partition_pdf

        elements = partition_pdf(
            filename=str(path),
            strategy="hi_res",           # 高精度模式
            infer_table_structure=True,  # 识别表格结构
            extract_images_in_pdf=True,  # 提取图片
        )

        documents = []
        current_content = []
        current_page = 1

        for elem in elements:
            if hasattr(elem.metadata, "page_number"):
                page = elem.metadata.page_number or current_page
            else:
                page = current_page

            if page != current_page and current_content:
                documents.append(Document(
                    content="\n".join(current_content),
                    metadata={"page": current_page, "source": str(path)},
                    format=DocumentFormat.PDF,
                    source=str(path),
                ))
                current_content = []
                current_page = page

            text = str(elem).strip()
            if text:
                current_content.append(text)

        if current_content:
            documents.append(Document(
                content="\n".join(current_content),
                metadata={"page": current_page, "source": str(path)},
                format=DocumentFormat.PDF,
                source=str(path),
            ))

        return documents

    def _parse_docx(self, path: Path) -> list[Document]:
        from unstructured.partition.docx import partition_docx
        elements = partition_docx(filename=str(path))
        content = "\n".join(str(e) for e in elements if str(e).strip())
        return [Document(content=content, metadata={"source": str(path)},
                         format=DocumentFormat.DOCX, source=str(path))]

    def _parse_markdown(self, path: Path) -> list[Document]:
        from unstructured.partition.md import partition_md
        elements = partition_md(filename=str(path))
        content = "\n".join(str(e) for e in elements if str(e).strip())
        return [Document(content=content, metadata={"source": str(path)},
                         format=DocumentFormat.MARKDOWN, source=str(path))]

    def _parse_html(self, path: Path) -> list[Document]:
        from unstructured.partition.html import partition_html
        elements = partition_html(filename=str(path))
        content = "\n".join(str(e) for e in elements if str(e).strip())
        return [Document(content=content, metadata={"source": str(path)},
                         format=DocumentFormat.HTML, source=str(path))]

    def _parse_csv(self, path: Path) -> list[Document]:
        import csv
        documents = []
        with open(path, encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for i, row in enumerate(reader):
                content = "\n".join(f"{k}: {v}" for k, v in row.items() if v)
                documents.append(Document(
                    content=content,
                    metadata={"row": i + 1, "source": str(path)},
                    format=DocumentFormat.CSV,
                    source=str(path),
                ))
        return documents

    def _parse_txt(self, path: Path) -> list[Document]:
        content = path.read_text(encoding="utf-8")
        return [Document(content=content, metadata={"source": str(path)},
                         format=DocumentFormat.TXT, source=str(path))]

7.4 文本分块策略

综合使用递归字符分块、语义分块和重叠窗口三种策略,确保分块质量。

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from langchain_text_splitters import RecursiveCharacterTextSplitter


@dataclass
class TextChunk:
    """文本块对象"""
    content: str
    index: int
    metadata: dict[str, Any]
    doc_source: str = ""


class ChunkingStrategy:
    """文本分块策略管理器"""

    def __init__(
        self,
        chunk_size: int = 512,
        chunk_overlap: int = 64,
        separators: list[str] | None = None,
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or [
            "\n\n",   # 段落分隔
            "\n",     # 换行
            "。",     # 中文句号
            "!",     # 中文感叹号
            "?",     # 中文问号
            ". ",     # 英文句号
            ";",      # 分号
            ",",      # 逗号
            " ",      # 空格
            "",       # 字符级
        ]

    def recursive_split(self, documents: list[Document]) -> list[TextChunk]:
        """递归字符分块 - 优先按语义边界分割"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            separators=self.separators,
            length_function=len,
        )

        chunks: list[TextChunk] = []
        global_index = 0

        for doc in documents:
            splits = splitter.split_text(doc.content)
            for i, text in enumerate(splits):
                chunks.append(TextChunk(
                    content=text,
                    index=global_index,
                    metadata={
                        **doc.metadata,
                        "chunk_index": i,
                        "total_chunks": len(splits),
                    },
                    doc_source=doc.source,
                ))
                global_index += 1

        return chunks

    def semantic_split(
        self,
        documents: list[Document],
        embedding_model: Any,
        similarity_threshold: float = 0.5,
    ) -> list[TextChunk]:
        """语义分块 - 基于相邻句子的语义相似度进行分割

        当相邻句子的余弦相似度低于阈值时,视为语义边界,在此处切分。
        """
        import numpy as np

        chunks: list[TextChunk] = []
        global_index = 0

        for doc in documents:
            # 按句子分割
            sentences = self._split_sentences(doc.content)
            if not sentences:
                continue

            # 计算句子向量
            embeddings = embedding_model.embed_documents(sentences)
            embeddings_np = np.array(embeddings)

            # 计算相邻句子的余弦相似度
            current_chunk: list[str] = [sentences[0]]

            for i in range(1, len(sentences)):
                sim = float(np.dot(embeddings_np[i - 1], embeddings_np[i]) / (
                    np.linalg.norm(embeddings_np[i - 1]) * np.linalg.norm(embeddings_np[i]) + 1e-8
                ))

                if sim < similarity_threshold and current_chunk:
                    # 语义边界,输出当前块
                    chunks.append(TextChunk(
                        content="".join(current_chunk),
                        index=global_index,
                        metadata={**doc.metadata, "strategy": "semantic"},
                        doc_source=doc.source,
                    ))
                    global_index += 1
                    current_chunk = []

                current_chunk.append(sentences[i])

            if current_chunk:
                chunks.append(TextChunk(
                    content="".join(current_chunk),
                    index=global_index,
                    metadata={**doc.metadata, "strategy": "semantic"},
                    doc_source=doc.source,
                ))
                global_index += 1

        return chunks

    @staticmethod
    def _split_sentences(text: str) -> list[str]:
        """中英文混合句子分割"""
        import re
        # 按中英文句号、问号、感叹号、换行分割
        sentences = re.split(r'(?<=[。!?.!?\n])', text)
        return [s.strip() for s in sentences if s.strip()]

7.5 向量化(Embedding)

统一抽象多种 Embedding 模型,支持 BGE-M3、M3E、OpenAI text-embedding-3 等。

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any


class BaseEmbedding(ABC):
    """Embedding 模型抽象基类"""

    @abstractmethod
    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        """批量文本向量化"""
        ...

    @abstractmethod
    def embed_query(self, text: str) -> list[float]:
        """单条查询向量化"""
        ...

    @property
    @abstractmethod
    def dimension(self) -> int:
        """向量维度"""
        ...


class BGEEmbedding(BaseEmbedding):
    """BGE-M3 Embedding - 支持中英文"""

    def __init__(self, model_name: str = "BAAI/bge-m3", device: str = "cuda"):
        from FlagEmbedding import BGEM3FlagModel
        self.model = BGEM3FlagModel(model_name, use_fp16=True, device=device)
        self._dimension = 1024

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        output = self.model.encode(texts, batch_size=64)
        return output["dense_vecs"].tolist()

    def embed_query(self, text: str) -> list[float]:
        output = self.model.encode([text])
        return output["dense_vecs"][0].tolist()

    @property
    def dimension(self) -> int:
        return self._dimension


class OpenAIEmbedding(BaseEmbedding):
    """OpenAI text-embedding-3 Embedding"""

    def __init__(self, model_name: str = "text-embedding-3-small", api_key: str | None = None):
        from langchain_openai import OpenAIEmbeddings
        self.model = OpenAIEmbeddings(model=model_name, api_key=api_key)
        self._dimension = 1536 if "small" in model_name else 3072

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        return self.model.embed_documents(texts)

    def embed_query(self, text: str) -> list[float]:
        return self.model.embed_query(text)

    @property
    def dimension(self) -> int:
        return self._dimension


class EmbeddingFactory:
    """Embedding 模型工厂"""

    _registry: dict[str, type[BaseEmbedding]] = {
        "bge-m3": BGEEmbedding,
        "text-embedding-3-small": OpenAIEmbedding,
        "text-embedding-3-large": OpenAIEmbedding,
    }

    @classmethod
    def create(cls, model_name: str, **kwargs) -> BaseEmbedding:
        model_cls = cls._registry.get(model_name)
        if not model_cls:
            raise ValueError(f"未知的 Embedding 模型: {model_name}")
        return model_cls(model_name=model_name, **kwargs)

7.6 向量存储(Milvus 2.6 多租户设计)

采用 Milvus 2.6 作为向量数据库,通过 Collection + Partition 实现多租户数据隔离。

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from pymilvus import (
    Collection, CollectionSchema, DataType, FieldSchema,
    MilvusClient, utility,
)


@dataclass
class MilvusConfig:
    """Milvus 连接配置"""
    uri: str = "http://localhost:19530"
    token: str = ""
    db_name: str = "kunlun_lingjing"


class MilvusVectorStore:
    """Milvus 向量存储 - 多租户隔离设计

    隔离策略:
    - 每个租户一个 Collection(命名规则: kb_{tenant_id})
    - Collection 内按知识库 ID 分区(Partition Key: kb_id)
    - 元数据字段支持 Scalar Filtering
    """

    def __init__(self, config: MilvusConfig, embedding_dim: int = 1024):
        self.client = MilvusClient(uri=config.uri, token=config.token)
        self.embedding_dim = embedding_dim

    def _collection_name(self, tenant_id: str) -> str:
        """租户级 Collection 命名"""
        return f"kb_{tenant_id}".replace("-", "_")

    def create_collection(self, tenant_id: str) -> str:
        """为租户创建 Collection"""
        collection_name = self._collection_name(tenant_id)

        if self.client.has_collection(collection_name):
            return collection_name

        schema = self.client.create_schema(auto_id=True, enable_dynamic_field=True)

        # 主键
        schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        # 向量字段
        schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=self.embedding_dim)
        # 分区键 - 知识库 ID
        schema.add_field(field_name="kb_id", datatype=DataType.VARCHAR, max_length=64, is_partition_key=True)
        # 元数据字段 - 支持 Scalar Filtering
        schema.add_field(field_name="doc_source", datatype=DataType.VARCHAR, max_length=512)
        schema.add_field(field_name="chunk_index", datatype=DataType.INT64)
        schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535)
        schema.add_field(field_name="format", datatype=DataType.VARCHAR, max_length=32)
        schema.add_field(field_name="tags", datatype=DataType.JSON)        # 自定义标签
        schema.add_field(field_name="created_at", datatype=DataType.INT64) # 时间戳

        # 索引配置 - IVF_FLAT 适合中小规模,HNSW 适合大规模
        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="embedding",
            index_type="HNSW",
            metric_type="COSINE",
            params={"M": 16, "efConstruction": 256},
        )

        self.client.create_collection(
            collection_name=collection_name,
            schema=schema,
            index_params=index_params,
        )

        return collection_name

    def insert_chunks(
        self,
        tenant_id: str,
        kb_id: str,
        chunks: list[TextChunk],
        embeddings: list[list[float]],
    ) -> int:
        """批量插入文本块"""
        import time
        collection_name = self._collection_name(tenant_id)

        data = []
        for chunk, embedding in zip(chunks, embeddings):
            data.append({
                "embedding": embedding,
                "kb_id": kb_id,
                "doc_source": chunk.doc_source,
                "chunk_index": chunk.index,
                "content": chunk.content,
                "format": chunk.metadata.get("format", ""),
                "tags": chunk.metadata.get("tags", {}),
                "created_at": int(time.time()),
            })

        result = self.client.insert(collection_name=collection_name, data=data)
        return result["insert_count"]

    def vector_search(
        self,
        tenant_id: str,
        query_embedding: list[float],
        kb_id: str | None = None,
        top_k: int = 10,
        filter_expr: str | None = None,
    ) -> list[dict[str, Any]]:
        """向量近似最近邻搜索(ANN)"""
        collection_name = self._collection_name(tenant_id)

        # 构建过滤表达式
        expr_parts = []
        if kb_id:
            expr_parts.append(f'kb_id == "{kb_id}"')
        if filter_expr:
            expr_parts.append(filter_expr)
        expr = " and ".join(expr_parts) if expr_parts else None

        results = self.client.search(
            collection_name=collection_name,
            data=[query_embedding],
            limit=top_k,
            filter=expr,
            output_fields=["content", "doc_source", "chunk_index", "kb_id", "tags"],
            search_params={"metric_type": "COSINE", "params": {"ef": 128}},
        )

        return [
            {
                "id": hit["id"],
                "score": hit["distance"],
                "content": hit["entity"]["content"],
                "doc_source": hit["entity"]["doc_source"],
                "chunk_index": hit["entity"]["chunk_index"],
                "metadata": {
                    "kb_id": hit["entity"]["kb_id"],
                    "tags": hit["entity"].get("tags", {}),
                },
            }
            for hit in results[0]
        ]

7.7 混合检索 + RRF 融合排序 + Rerank

综合向量检索与 BM25 关键词检索,通过 Reciprocal Rank Fusion (RRF) 算法融合排序,再使用 Cross-Encoder 进行精排。

from __future__ import annotations

import math
from dataclasses import dataclass
from typing import Any


@dataclass
class SearchResult:
    """检索结果"""
    content: str
    score: float
    doc_source: str
    chunk_index: int
    metadata: dict[str, Any]
    rank_source: str = ""  # "vector" | "bm25" | "rrf" | "rerank"


class BM25Retriever:
    """BM25 关键词检索器

    基于 rank_bm25 库实现,适用于中小规模语料。
    大规模场景建议使用 Elasticsearch。
    """

    def __init__(self):
        self.index = None
        self.documents: list[dict] = []

    def build_index(self, documents: list[dict]) -> None:
        """构建 BM25 索引"""
        from rank_bm25 import BM25Okapi

        self.documents = documents
        tokenized = [self._tokenize(doc["content"]) for doc in documents]
        self.index = BM25Okapi(tokenized)

    def search(self, query: str, top_k: int = 10) -> list[SearchResult]:
        """BM25 检索"""
        if not self.index:
            return []

        tokens = self._tokenize(query)
        scores = self.index.get_scores(tokens)

        # 取 top_k
        top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_k]

        return [
            SearchResult(
                content=self.documents[i]["content"],
                score=float(scores[i]),
                doc_source=self.documents[i].get("doc_source", ""),
                chunk_index=self.documents[i].get("chunk_index", 0),
                metadata=self.documents[i].get("metadata", {}),
                rank_source="bm25",
            )
            for i in top_indices
            if scores[i] > 0
        ]

    @staticmethod
    def _tokenize(text: str) -> list[str]:
        """中英文分词"""
        import jieba
        return list(jieba.cut_for_search(text))


class RRFusionRanker:
    """Reciprocal Rank Fusion (RRF) 融合排序

    公式:RRF_score(d) = Σ 1 / (k + rank_i(d))
    其中 k 为常数(通常取 60),rank_i(d) 为文档 d 在第 i 个检索器中的排名。
    """

    def __init__(self, k: int = 60):
        self.k = k

    def fuse(
        self,
        *result_lists: list[SearchResult],
        top_k: int = 10,
    ) -> list[SearchResult]:
        """融合多路检索结果"""
        score_map: dict[str, float] = {}
        doc_map: dict[str, SearchResult] = {}

        for results in result_lists:
            for rank, result in enumerate(results, start=1):
                doc_key = f"{result.doc_source}:{result.chunk_index}"
                rrf_score = 1.0 / (self.k + rank)
                score_map[doc_key] = score_map.get(doc_key, 0) + rrf_score

                if doc_key not in doc_map:
                    doc_map[doc_key] = result

        # 按 RRF 分数排序
        sorted_keys = sorted(score_map, key=score_map.get, reverse=True)[:top_k]

        return [
            SearchResult(
                content=doc_map[k].content,
                score=score_map[k],
                doc_source=doc_map[k].doc_source,
                chunk_index=doc_map[k].chunk_index,
                metadata=doc_map[k].metadata,
                rank_source="rrf",
            )
            for k in sorted_keys
        ]


class CrossEncoderReranker:
    """基于 Cross-Encoder 的重排序器

    使用 BGE-Reranker 对候选文档进行精排。
    Cross-Encoder 将 query 和 document 拼接后联合编码,
    输出相关性分数,精度远高于 Bi-Encoder。
    """

    def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3", device: str = "cuda"):
        from FlagEmbedding import FlagReranker
        self.reranker = FlagReranker(model_name, use_fp16=True, device=device)

    def rerank(
        self,
        query: str,
        candidates: list[SearchResult],
        top_k: int = 5,
    ) -> list[SearchResult]:
        """重排序候选文档"""
        if not candidates:
            return []

        pairs = [(query, c.content) for c in candidates]
        scores = self.reranker.compute_score(pairs, normalize=True)

        # 确保 scores 是列表
        if isinstance(scores, float):
            scores = [scores]

        # 按重排分数排序
        scored = list(zip(candidates, scores))
        scored.sort(key=lambda x: x[1], reverse=True)

        return [
            SearchResult(
                content=c.content,
                score=float(s),
                doc_source=c.doc_source,
                chunk_index=c.chunk_index,
                metadata=c.metadata,
                rank_source="rerank",
            )
            for c, s in scored[:top_k]
        ]

7.8 完整 RAG Pipeline 编排

"""
极简视界智能体开发平台 - RAG 知识库引擎
完整 Pipeline:文档解析 → 分块 → 向量化 → 存储 → 混合检索 → Rerank
"""

from __future__ import annotations

import asyncio
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any


@dataclass
class RAGConfig:
    """RAG Pipeline 配置"""
    # 分块参数
    chunk_size: int = 512
    chunk_overlap: int = 64
    chunking_strategy: str = "recursive"  # "recursive" | "semantic"

    # Embedding 参数
    embedding_model: str = "bge-m3"

    # 检索参数
    vector_top_k: int = 20
    bm25_top_k: int = 20
    rrf_k: int = 60
    rerank_top_k: int = 5
    rerank_model: str = "BAAI/bge-reranker-v2-m3"

    # 过滤参数
    min_score_threshold: float = 0.3


class RAGPipeline:
    """RAG 知识库引擎 - 完整 Pipeline"""

    def __init__(
        self,
        config: RAGConfig,
        vector_store: MilvusVectorStore,
        embedding: BaseEmbedding,
    ):
        self.config = config
        self.vector_store = vector_store
        self.embedding = embedding
        self.parser = DocumentParser()
        self.chunker = ChunkingStrategy(
            chunk_size=config.chunk_size,
            chunk_overlap=config.chunk_overlap,
        )
        self.reranker = CrossEncoderReranker(model_name=config.rerank_model)
        self.rrf = RRFusionRanker(k=config.rrf_k)

        # BM25 索引缓存(按知识库)
        self._bm25_cache: dict[str, BM25Retriever] = {}

    # ──────────────────────────────────────────
    # 索引阶段(Ingestion)
    # ──────────────────────────────────────────
    async def ingest_documents(
        self,
        tenant_id: str,
        kb_id: str,
        file_paths: list[str | Path],
    ) -> dict[str, Any]:
        """文档入库:解析 → 分块 → 向量化 → 存储"""

        # 1. 确保 Collection 存在
        self.vector_store.create_collection(tenant_id)

        # 2. 解析文档
        all_documents: list[Document] = []
        for fp in file_paths:
            docs = self.parser.parse(fp)
            all_documents.extend(docs)

        # 3. 文本分块
        if self.config.chunking_strategy == "semantic":
            chunks = self.chunker.semantic_split(all_documents, self.embedding)
        else:
            chunks = self.chunker.recursive_split(all_documents)

        # 4. 批量向量化
        texts = [c.content for c in chunks]
        embeddings = self.embedding.embed_documents(texts)

        # 5. 写入 Milvus
        insert_count = self.vector_store.insert_chunks(
            tenant_id=tenant_id,
            kb_id=kb_id,
            chunks=chunks,
            embeddings=embeddings,
        )

        # 6. 构建 BM25 索引
        bm25 = BM25Retriever()
        bm25_docs = [
            {"content": c.content, "doc_source": c.doc_source,
             "chunk_index": c.index, "metadata": c.metadata}
            for c in chunks
        ]
        bm25.build_index(bm25_docs)
        self._bm25_cache[f"{tenant_id}:{kb_id}"] = bm25

        return {
            "total_documents": len(all_documents),
            "total_chunks": len(chunks),
            "inserted": insert_count,
            "embedding_model": self.config.embedding_model,
            "embedding_dim": self.embedding.dimension,
        }

    # ──────────────────────────────────────────
    # 检索阶段(Retrieval)
    # ──────────────────────────────────────────
    async def hybrid_search(
        self,
        tenant_id: str,
        kb_id: str,
        query: str,
        top_k: int | None = None,
        filter_expr: str | None = None,
    ) -> list[SearchResult]:
        """混合检索:向量检索 + BM25 + RRF 融合 + Rerank"""

        top_k = top_k or self.config.rerank_top_k

        # 1. 向量检索
        query_embedding = self.embedding.embed_query(query)
        vector_results = self.vector_store.vector_search(
            tenant_id=tenant_id,
            query_embedding=query_embedding,
            kb_id=kb_id,
            top_k=self.config.vector_top_k,
            filter_expr=filter_expr,
        )
        vector_search_results = [
            SearchResult(
                content=r["content"],
                score=r["score"],
                doc_source=r["doc_source"],
                chunk_index=r["chunk_index"],
                metadata=r.get("metadata", {}),
                rank_source="vector",
            )
            for r in vector_results
        ]

        # 2. BM25 关键词检索
        bm25_key = f"{tenant_id}:{kb_id}"
        bm25 = self._bm25_cache.get(bm25_key)
        bm25_results = []
        if bm25:
            bm25_results = bm25.search(query, top_k=self.config.bm25_top_k)

        # 3. RRF 融合排序
        fused_results = self.rrf.fuse(
            vector_search_results,
            bm25_results,
            top_k=max(top_k * 3, 15),  # RRF 输出较多候选给 Rerank
        )

        # 4. Cross-Encoder 重排序
        reranked = self.reranker.rerank(query, fused_results, top_k=top_k)

        # 5. 分数阈值过滤
        filtered = [r for r in reranked if r.score >= self.config.min_score_threshold]

        return filtered

    # ──────────────────────────────────────────
    # 便捷方法
    # ──────────────────────────────────────────
    async def search_and_format(
        self,
        tenant_id: str,
        kb_id: str,
        query: str,
        top_k: int = 5,
    ) -> str:
        """检索并格式化为上下文(供 LLM 使用)"""
        results = await self.hybrid_search(tenant_id, kb_id, query, top_k)

        formatted_parts = []
        for i, r in enumerate(results, 1):
            source = Path(r.doc_source).name if r.doc_source else "未知来源"
            formatted_parts.append(
                f"[文档{i}] (相关度: {r.score:.3f}, 来源: {source})\n{r.content}"
            )

        return "\n\n---\n\n".join(formatted_parts)

7.9 RAG Pipeline 完整流程图

flowchart TD
    subgraph 索引阶段["索引阶段 (Ingestion)"]
        direction TB
        UP[上传文档] --> PARSE[文档解析<br/>unstructured]
        PARSE --> CHUNK[文本分块<br/>Recursive / Semantic]
        CHUNK --> EMB[向量化<br/>BGE-M3 / OpenAI]
        EMB --> STORE[(Milvus 2.6<br/>向量存储)]
        CHUNK --> BM25_IDX[BM25 索引构建]
    end

    subgraph 检索阶段["检索阶段 (Retrieval)"]
        direction TB
        Q[用户查询] --> QE[查询向量化]
        Q --> QT[查询分词]

        QE --> VS[向量检索<br/>ANN Search]
        QT --> BS[BM25 检索<br/>关键词匹配]

        VS --> RRF[RRF 融合排序<br/>1/k+rank]
        BS --> RRF

        RRF --> RK[Rerank 重排序<br/>BGE-Reranker]
        RK --> MF[元数据过滤<br/>Score Threshold]
        MF --> CTX[格式化上下文]
    end

    subgraph 生成阶段["生成阶段 (Generation)"]
        direction TB
        CTX2[上下文注入] --> PROMPT[提示词组装]
        PROMPT --> LLM[LLM 生成回答]
        LLM --> RESP[返回结果<br/>含引用标注]
    end

    STORE --> VS
    BM25_IDX --> BS
    CTX --> CTX2

    style STORE fill:#7B68EE,color:#fff
    style RRF fill:#D4A843,color:#fff
    style RK fill:#E74C3C,color:#fff
    style LLM fill:#2ECC71,color:#fff

8. 智能体模块设计

8.1 概述

智能体(Agent)模块是极简视界智能体开发平台的核心决策层,负责根据用户意图自主规划执行步骤、调用工具、与环境交互。平台支持三种主流智能体架构:ReAct Agent(推理-行动循环)、Plan-and-Execute Agent(先规划后执行)和 Multi-Agent 协作(多智能体协同),并通过丰富的功能组件(判断器、问题优化器、函数库、内置标签等)增强智能体能力。

8.2 智能体类型

8.2.1 ReAct Agent

ReAct(Reasoning + Acting)模式是最经典的 Agent 架构。智能体在每一步先进行思考(Thought),然后决定行动(Action),观察行动结果后继续推理,直到给出最终回答。

8.2.2 Plan-and-Execute Agent

先由规划器(Planner)将复杂任务拆解为有序步骤列表,然后由执行器(Executor)逐步执行,执行过程中可触发重规划(Re-plan)

8.2.3 Multi-Agent 协作

多个专业化的智能体组成团队,由**协调器(Supervisor)**分配任务,各智能体独立执行后汇总结果。适用于复杂的多领域任务。

8.3 智能体架构图

graph TB
    subgraph 智能体模块["智能体模块 (Agent Module)"]
        direction TB

        subgraph AgentTypes["智能体类型"]
            RA[ReAct Agent<br/>推理-行动循环]
            PE[Plan-and-Execute<br/>Agent]
            MA[Multi-Agent<br/>协作]
        end

        subgraph Components["功能组件"]
            IC[判断器<br/>意图分类]
            QR[问题优化器<br/>Query Rewrite]
            FL[函数库<br/>自定义工具]
            BT[内置标签<br/>敏感词/情感]
        end

        subgraph Infrastructure["基础设施"]
            SM[Agent 状态机<br/>LangGraph]
            TR2[Tool 注册与调度]
            MEM[记忆管理<br/>Short/Long-term]
            OBS[可观测性<br/>LangSmith]
        end
    end

    RA & PE & MA --> SM
    IC & QR & FL & BT --> TR2
    SM --> TR2
    SM --> MEM
    SM --> OBS

    style RA fill:#3498DB,color:#fff
    style PE fill:#2ECC71,color:#fff
    style MA fill:#E67E22,color:#fff
    style SM fill:#9B59B6,color:#fff

8.4 功能组件

8.4.1 判断器(意图分类器)

对用户输入进行意图分类,输出结构化标签,用于路由和决策。

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from typing import Any

from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, SystemMessage


class IntentLabel(str, Enum):
    """意图标签枚举"""
    KNOWLEDGE_QA = "knowledge_qa"       # 知识问答
    CHITCHAT = "chitchat"               # 闲聊
    CODE_GENERATION = "code_generation" # 代码生成
    DATA_ANALYSIS = "data_analysis"     # 数据分析
    TASK_EXECUTION = "task_execution"   # 任务执行
    COMPLAINT = "complaint"             # 投诉
    CLARIFICATION = "clarification"     # 需要澄清
    UNKNOWN = "unknown"                 # 未知


@dataclass
class IntentResult:
    """意图分类结果"""
    label: IntentLabel
    confidence: float
    reasoning: str = ""
    sub_intents: list[IntentLabel] | None = None  # 多意图场景


class IntentClassifier:
    """判断器 - 基于 LLM 的意图分类器"""

    SYSTEM_PROMPT = """你是极简视界智能体的意图分类器。
请分析用户消息,输出以下 JSON 格式:

{
    "label": "意图标签",
    "confidence": 0.0-1.0的置信度,
    "reasoning": "分类理由(简短)"
}

可用的意图标签:
- knowledge_qa: 需要查询知识库或搜索信息的问题
- chitchat: 日常闲聊、问候、非任务导向的对话
- code_generation: 需要编写、修改或调试代码
- data_analysis: 需要数据分析、统计、可视化
- task_execution: 需要执行具体操作(发邮件、创建文件等)
- complaint: 表达不满或投诉
- clarification: 问题模糊,需要进一步澄清
- unknown: 无法确定意图

只输出 JSON,不要输出其他内容。"""

    def __init__(self, llm: BaseChatModel):
        self.llm = llm

    async def classify(self, user_message: str, context: str = "") -> IntentResult:
        """分类用户意图"""
        messages = [
            SystemMessage(content=self.SYSTEM_PROMPT),
        ]
        if context:
            messages.append(HumanMessage(content=f"[对话上下文] {context}"))
        messages.append(HumanMessage(content=f"[用户消息] {user_message}"))

        response = await self.llm.ainvoke(messages)

        import json
        try:
            data = json.loads(response.content.strip())
            return IntentResult(
                label=IntentLabel(data.get("label", "unknown")),
                confidence=float(data.get("confidence", 0.5)),
                reasoning=data.get("reasoning", ""),
            )
        except (json.JSONDecodeError, ValueError):
            return IntentResult(label=IntentLabel.UNKNOWN, confidence=0.0, reasoning="解析失败")

8.4.2 问题优化器(Query Rewrite)

对用户原始查询进行改写优化,提升检索效果。

class QueryRewriter:
    """问题优化器 - 对用户查询进行改写以提升检索质量

    策略:
    1. Query Expansion - 扩展查询,添加同义词/相关概念
    2. Query Decomposition - 将复杂问题拆解为多个子查询
    3. HyDE - 生成假设性答案用于检索
    4. Step-back Prompting - 抽象化查询以获得更广泛的检索结果
    """

    def __init__(self, llm: BaseChatModel):
        self.llm = llm

    async def rewrite(
        self,
        query: str,
        strategy: str = "expansion",
        chat_history: list | None = None,
    ) -> list[str]:
        """改写查询"""
        strategies = {
            "expansion": self._expansion,
            "decomposition": self._decomposition,
            "hyde": self._hyde,
            "step_back": self._step_back,
        }
        fn = strategies.get(strategy, self._expansion)
        return await fn(query, chat_history)

    async def _expansion(self, query: str, history: list | None) -> list[str]:
        """查询扩展 - 生成多个语义等价的查询变体"""
        prompt = f"""请对以下查询生成 3 个语义等价但表述不同的查询变体,用于提升检索召回率。
每个变体一行,不要序号。

原始查询:{query}"""
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        variants = [line.strip() for line in response.content.strip().split("\n") if line.strip()]
        return [query] + variants

    async def _decomposition(self, query: str, history: list | None) -> list[str]:
        """查询拆解 - 将复杂问题拆解为独立的子查询"""
        prompt = f"""请将以下复杂问题拆解为 2-4 个独立的子查询,每个子查询可以独立检索回答。
每个子查询一行,不要序号。

复杂问题:{query}"""
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        return [line.strip() for line in response.content.strip().split("\n") if line.strip()]

    async def _hyde(self, query: str, history: list | None) -> list[str]:
        """HyDE - 先生成假设性答案,用答案去检索"""
        prompt = f"""请针对以下问题生成一个专业、详细的假设性答案(即使你不确定准确性)。
这个答案将用于在知识库中检索相关文档。

问题:{query}

请直接输出假设性答案:"""
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        return [response.content.strip()]

    async def _step_back(self, query: str, history: list | None) -> list[str]:
        """Step-back - 抽象化查询"""
        prompt = f"""请将以下具体问题抽象为一个更宽泛、更通用的问题。
通用问题应涵盖原始问题的核心概念。

具体问题:{query}

请输出通用问题:"""
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        return [query, response.content.strip()]

8.4.3 内置标签系统

提供敏感词检测、情感分析等内置标签组件,为智能体增加安全护栏。

from __future__ import annotations

import re
from dataclasses import dataclass
from enum import Enum


class SensitivityLevel(str, Enum):
    SAFE = "safe"
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    BLOCKED = "blocked"


class SentimentLabel(str, Enum):
    POSITIVE = "positive"
    NEUTRAL = "neutral"
    NEGATIVE = "negative"


@dataclass
class TagResult:
    """标签结果"""
    sensitivity: SensitivityLevel = SensitivityLevel.SAFE
    sentiment: SentimentLabel = SentimentLabel.NEUTRAL
    flagged_words: list[str] | None = None
    language: str = "zh"


class BuiltInTagProcessor:
    """内置标签处理器 - 敏感词检测 + 情感分析 + 语言检测"""

    def __init__(self, sensitive_words_path: str | None = None):
        self._sensitive_words: set[str] = set()
        self._blocked_patterns: list[re.Pattern] = []

        if sensitive_words_path:
            self._load_sensitive_words(sensitive_words_path)

    def _load_sensitive_words(self, path: str) -> None:
        with open(path, encoding="utf-8") as f:
            for line in f:
                word = line.strip()
                if word and not word.startswith("#"):
                    self._sensitive_words.add(word)

    def add_blocked_patterns(self, patterns: list[str]) -> None:
        """添加正则匹配模式"""
        self._blocked_patterns.extend(re.compile(p) for p in patterns)

    def process(self, text: str) -> TagResult:
        """处理文本,返回标签结果"""
        result = TagResult()

        # 1. 敏感词检测
        result.sensitivity, result.flagged_words = self._check_sensitivity(text)

        # 2. 情感分析(基于规则)
        result.sentiment = self._analyze_sentiment(text)

        # 3. 语言检测
        result.language = self._detect_language(text)

        return result

    def _check_sensitivity(self, text: str) -> tuple[SensitivityLevel, list[str]]:
        """敏感词检测"""
        flagged: list[str] = []
        text_lower = text.lower()

        # 精确匹配
        for word in self._sensitive_words:
            if word.lower() in text_lower:
                flagged.append(word)

        # 正则匹配
        for pattern in self._blocked_patterns:
            matches = pattern.findall(text)
            flagged.extend(matches)

        if not flagged:
            return SensitivityLevel.SAFE, []
        elif len(flagged) <= 1:
            return SensitivityLevel.LOW, flagged
        elif len(flagged) <= 3:
            return SensitivityLevel.MEDIUM, flagged
        else:
            return SensitivityLevel.HIGH, flagged

    @staticmethod
    def _analyze_sentiment(text: str) -> SentimentLabel:
        """基于关键词的简单情感分析"""
        positive_words = {"感谢", "谢谢", "太好了", "满意", "喜欢", "赞", "优秀", "棒"}
        negative_words = {"不满", "投诉", "差评", "垃圾", "失望", "愤怒", "退款", "问题"}

        pos_count = sum(1 for w in positive_words if w in text)
        neg_count = sum(1 for w in negative_words if w in text)

        if pos_count > neg_count:
            return SentimentLabel.POSITIVE
        elif neg_count > pos_count:
            return SentimentLabel.NEGATIVE
        return SentimentLabel.NEUTRAL

    @staticmethod
    def _detect_language(text: str) -> str:
        """简单语言检测"""
        chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        total = len(text)
        if total == 0:
            return "unknown"
        return "zh" if chinese_chars / total > 0.3 else "en"

8.5 基于 LangGraph 的 Agent 状态机设计

使用 LangGraph StateGraph 构建智能体的状态机,支持 ReAct 循环、Plan-and-Execute 和多 Agent 协作。

8.5.1 ReAct Agent 状态机

stateDiagram-v2
    [*] --> ReceiveInput: 用户输入
    ReceiveInput --> IntentClassify: 预处理
    IntentClassify --> QueryRewrite: 知识问答
    IntentClassify --> DirectResponse: 闲聊
    IntentClassify --> ToolSelection: 任务执行

    QueryRewrite --> RAGSearch: 检索
    RAGSearch --> EvaluateResult: 评估结果

    ToolSelection --> ExecuteTool: 选择工具
    ExecuteTool --> EvaluateResult: 观察结果

    EvaluateResult --> Thought: 需要更多信息
    EvaluateResult --> FinalAnswer: 信息充足
    Thought --> ToolSelection: 继续推理
    Thought --> RAGSearch: 需要检索

    DirectResponse --> [*]
    FinalAnswer --> SafetyCheck: 安全检查
    SafetyCheck --> [*]: 通过
    SafetyCheck --> Regenerate: 不通过
    Regenerate --> FinalAnswer

8.5.2 ReAct Agent 完整实现

"""
极简视界智能体开发平台 - 智能体模块
ReAct Agent 完整实现(基于 LangGraph StateGraph)
"""

from __future__ import annotations

import json
import operator
from typing import Annotated, Any, Literal, TypedDict

from langchain_core.language_models import BaseChatModel
from langchain_core.messages import (
    AIMessage, BaseMessage, HumanMessage, SystemMessage, ToolMessage,
)
from langchain_core.tools import BaseTool
from langgraph.graph import END, START, StateGraph
from langgraph.prebuilt import ToolNode


# ──────────────────────────────────────────────
# 1. Agent 状态定义
# ──────────────────────────────────────────────
class AgentState(TypedDict):
    """ReAct Agent 状态"""
    messages: Annotated[list[BaseMessage], operator.add]
    intent: str
    rewritten_queries: list[str]
    tool_results: list[str]
    iteration_count: int
    max_iterations: int
    final_answer: str
    tags: dict[str, Any]


# ──────────────────────────────────────────────
# 2. Agent 配置
# ──────────────────────────────────────────────
AGENT_SYSTEM_PROMPT = """你是极简视界智能助手,一个功能强大的 AI Agent。

你可以使用以下工具来帮助用户解决问题:
{tool_descriptions}

## 工作原则
1. 先思考(Thought),再行动(Action)
2. 每次只调用一个最合适的工具
3. 观察工具返回结果后决定下一步行动
4. 当信息足够时,给出最终回答
5. 如果不确定,诚实地告知用户

## 输出格式
思考:[你的分析过程]
行动:[调用工具或给出最终回答]
"""


# ──────────────────────────────────────────────
# 3. 构建 ReAct Agent
# ──────────────────────────────────────────────
class ReactAgentBuilder:
    """ReAct Agent 构建器"""

    def __init__(
        self,
        llm: BaseChatModel,
        tools: list[BaseTool],
        intent_classifier: IntentClassifier | None = None,
        query_rewriter: QueryRewriter | None = None,
        tag_processor: BuiltInTagProcessor | None = None,
    ):
        self.llm = llm
        self.tools = tools
        self.intent_classifier = intent_classifier
        self.query_rewriter = query_rewriter
        self.tag_processor = tag_processor
        self.llm_with_tools = llm.bind_tools(tools)

    def _build_system_prompt(self) -> str:
        tool_descs = "\n".join(
            f"- {t.name}: {t.description}" for t in self.tools
        )
        return AGENT_SYSTEM_PROMPT.format(tool_descriptions=tool_descs)

    async def agent_node(self, state: AgentState) -> dict:
        """Agent 推理节点 - 决定下一步行动"""
        messages = [SystemMessage(content=self._build_system_prompt())]
        messages.extend(state["messages"])

        response = await self.llm_with_tools.ainvoke(messages)

        return {
            "messages": [response],
            "iteration_count": state.get("iteration_count", 0) + 1,
        }

    async def intent_node(self, state: AgentState) -> dict:
        """意图分类节点"""
        if not self.intent_classifier:
            return {"intent": "unknown"}

        user_msg = ""
        for msg in reversed(state["messages"]):
            if isinstance(msg, HumanMessage):
                user_msg = msg.content
                break

        result = await self.intent_classifier.classify(user_msg)
        return {"intent": result.label.value, "tags": {"intent_confidence": result.confidence}}

    async def rewrite_node(self, state: AgentState) -> dict:
        """查询改写节点"""
        if not self.query_rewriter:
            return {"rewritten_queries": []}

        user_msg = ""
        for msg in reversed(state["messages"]):
            if isinstance(msg, HumanMessage):
                user_msg = msg.content
                break

        queries = await self.query_rewriter.rewrite(user_msg, strategy="expansion")
        return {"rewritten_queries": queries}

    async def safety_check_node(self, state: AgentState) -> dict:
        """安全检查节点"""
        if not self.tag_processor:
            return {}

        last_ai = ""
        for msg in reversed(state["messages"]):
            if isinstance(msg, AIMessage) and msg.content:
                last_ai = msg.content
                break

        if last_ai:
            tag_result = self.tag_processor.process(last_ai)
            return {"tags": {
                "sensitivity": tag_result.sensitivity.value,
                "sentiment": tag_result.sentiment.value,
            }}
        return {}

    def should_continue(self, state: AgentState) -> Literal["tools", "safety_check", "end"]:
        """决定是否继续工具调用循环"""
        last_message = state["messages"][-1]
        iteration = state.get("iteration_count", 0)
        max_iter = state.get("max_iterations", 10)

        # 超出最大迭代次数
        if iteration >= max_iter:
            return "end"

        # 如果有工具调用,执行工具
        if isinstance(last_message, AIMessage) and last_message.tool_calls:
            return "tools"

        # 否则进行安全检查后结束
        return "safety_check"

    def build_graph(self) -> StateGraph:
        """构建 ReAct Agent 状态图"""
        graph = StateGraph(AgentState)

        # 注册节点
        graph.add_node("intent_classify", self.intent_node)
        graph.add_node("query_rewrite", self.rewrite_node)
        graph.add_node("agent", self.agent_node)
        graph.add_node("tools", ToolNode(self.tools))
        graph.add_node("safety_check", self.safety_check_node)

        # 边
        graph.add_edge(START, "intent_classify")
        graph.add_edge("intent_classify", "query_rewrite")
        graph.add_edge("query_rewrite", "agent")

        # 条件边:决定是否继续循环
        graph.add_conditional_edges(
            source="agent",
            path=self.should_continue,
            path_map={
                "tools": "tools",
                "safety_check": "safety_check",
                "end": END,
            },
        )

        # 工具执行后回到 agent 继续推理
        graph.add_edge("tools", "agent")

        # 安全检查后结束
        graph.add_edge("safety_check", END)

        return graph

8.6 Plan-and-Execute Agent

"""Plan-and-Execute Agent - 先规划后执行"""

from __future__ import annotations

import operator
from typing import Annotated, Any, Literal, TypedDict

from langchain_core.language_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
from langchain_core.tools import BaseTool
from langgraph.graph import END, START, StateGraph
from pydantic import BaseModel, Field


class PlanStep(BaseModel):
    """计划步骤"""
    index: int = Field(description="步骤序号")
    description: str = Field(description="步骤描述")
    tool_needed: str | None = Field(default=None, description="需要的工具名称")
    status: str = Field(default="pending", description="状态: pending/running/done/failed")
    result: str = Field(default="", description="执行结果")


class PlanState(TypedDict):
    """Plan-and-Execute Agent 状态"""
    messages: Annotated[list[BaseMessage], operator.add]
    task: str                    # 原始任务描述
    plan: list[dict]             # 计划步骤列表
    current_step: int            # 当前执行步骤
    step_results: list[str]      # 各步骤结果
    final_answer: str            # 最终回答
    should_replan: bool          # 是否需要重规划


class PlanAndExecuteAgent:
    """Plan-and-Execute Agent"""

    def __init__(self, llm: BaseChatModel, tools: list[BaseTool]):
        self.llm = llm
        self.tools = tools
        self.tool_map = {t.name: t for t in tools}
        self.llm_with_tools = llm.bind_tools(tools)

    async def planner_node(self, state: PlanState) -> dict:
        """规划器 - 将任务拆解为有序步骤"""
        tool_names = ", ".join(t.name for t in self.tools)
        prompt = f"""你是一个任务规划器。请将以下任务拆解为有序的执行步骤。

可用工具:{tool_names}

任务:{state['task']}

请以 JSON 数组格式输出步骤:
[
    {{"index": 1, "description": "步骤描述", "tool_needed": "工具名或null"}},
    ...
]"""
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])

        import json
        try:
            plan = json.loads(response.content.strip())
        except json.JSONDecodeError:
            plan = [{"index": 1, "description": "直接回答", "tool_needed": None}]

        return {"plan": plan, "current_step": 0, "should_replan": False}

    async def executor_node(self, state: PlanState) -> dict:
        """执行器 - 执行当前步骤"""
        plan = state["plan"]
        step_idx = state["current_step"]

        if step_idx >= len(plan):
            return {"should_replan": False}

        step = plan[step_idx]
        step["status"] = "running"

        tool_name = step.get("tool_needed")
        if tool_name and tool_name in self.tool_map:
            # 使用工具执行
            tool = self.tool_map[tool_name]
            try:
                result = await tool.ainvoke({"query": step["description"]})
                step["result"] = str(result)
                step["status"] = "done"
            except Exception as e:
                step["result"] = f"执行失败: {e}"
                step["status"] = "failed"
        else:
            # 使用 LLM 直接回答
            context = "\n".join(state.get("step_results", []))
            prompt = f"基于以下已有信息,完成步骤:{step['description']}\n\n已有信息:\n{context}"
            response = await self.llm.ainvoke([HumanMessage(content=prompt)])
            step["result"] = response.content
            step["status"] = "done"

        results = state.get("step_results", []) + [f"步骤{step_idx + 1}: {step['result']}"]
        return {
            "plan": plan,
            "step_results": results,
            "current_step": step_idx + 1,
        }

    def should_replan(self, state: PlanState) -> Literal["planner", "executor", "synthesizer"]:
        """判断是否需要重规划"""
        if state.get("should_replan"):
            return "planner"

        current = state.get("current_step", 0)
        plan = state.get("plan", [])

        if current >= len(plan):
            return "synthesizer"

        return "executor"

    async def synthesizer_node(self, state: PlanState) -> dict:
        """综合器 - 汇总所有步骤结果生成最终回答"""
        results = "\n\n".join(state.get("step_results", []))
        prompt = f"""请基于以下步骤的执行结果,生成完整的最终回答。

原始任务:{state['task']}

各步骤结果:
{results}

请给出综合、完整、结构化的最终回答:"""
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        return {
            "final_answer": response.content,
            "messages": [AIMessage(content=response.content)],
        }

    def build_graph(self) -> StateGraph:
        """构建 Plan-and-Execute 状态图"""
        graph = StateGraph(PlanState)

        graph.add_node("planner", self.planner_node)
        graph.add_node("executor", self.executor_node)
        graph.add_node("synthesizer", self.synthesizer_node)

        graph.add_edge(START, "planner")
        graph.add_conditional_edges(
            source="planner",
            path=lambda s: "executor",
        )
        graph.add_conditional_edges(
            source="executor",
            path=self.should_replan,
            path_map={
                "planner": "planner",
                "executor": "executor",
                "synthesizer": "synthesizer",
            },
        )
        graph.add_edge("synthesizer", END)

        return graph

8.7 Multi-Agent 协作

"""Multi-Agent 协作 - Supervisor 模式"""

from __future__ import annotations

import operator
from typing import Annotated, Literal, TypedDict

from langchain_core.language_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
from langgraph.graph import END, START, StateGraph


class MultiAgentState(TypedDict):
    """Multi-Agent 协作状态"""
    messages: Annotated[list[BaseMessage], operator.add]
    task: str
    next_agent: str               # 下一个要执行的 Agent
    agent_results: dict[str, str] # 各 Agent 的执行结果
    completed_agents: list[str]   # 已完成的 Agent 列表
    final_answer: str


AGENT_ROLES = {
    "researcher": "研究员:负责信息检索和知识整理",
    "analyst": "分析师:负责数据分析和逻辑推理",
    "writer": "撰写员:负责内容创作和文本润色",
    "reviewer": "审核员:负责质量检查和改进建议",
}


class MultiAgentOrchestrator:
    """Multi-Agent 协调器 - Supervisor 模式"""

    def __init__(
        self,
        supervisor_llm: BaseChatModel,
        agent_llms: dict[str, BaseChatModel],
    ):
        self.supervisor_llm = supervisor_llm
        self.agent_llms = agent_llms

    async def supervisor_node(self, state: MultiAgentState) -> dict:
        """协调器节点 - 决定下一个执行的 Agent"""
        agent_names = list(AGENT_ROLES.keys())
        completed = state.get("completed_agents", [])
        results_summary = "\n".join(
            f"[{k}]: {v[:200]}..." for k, v in state.get("agent_results", {}).items()
        )

        prompt = f"""你是一个任务协调器,负责管理以下专业 Agent 团队:
{chr(10).join(f'- {name}: {role}' for name, role in AGENT_ROLES.items())}

当前任务:{state['task']}

已完成的 Agent:{', '.join(completed) if completed else '无'}
已有结果摘要:
{results_summary if results_summary else '无'}

请决定下一步行动:
1. 选择一个 Agent 执行(输出 Agent 名称)
2. 如果任务已完成,输出 "FINISH"

只输出 Agent 名称或 "FINISH":"""

        response = await self.supervisor_llm.ainvoke([HumanMessage(content=prompt)])
        decision = response.content.strip().lower()

        if decision == "finish" or decision not in AGENT_ROLES:
            return {"next_agent": "FINISH"}
        return {"next_agent": decision}

    def _create_agent_node(self, agent_name: str, role: str):
        """为每个 Agent 创建节点函数"""

        async def agent_node(state: MultiAgentState) -> dict:
            llm = self.agent_llms.get(agent_name, self.supervisor_llm)
            other_results = "\n".join(
                f"[{k}]: {v}" for k, v in state.get("agent_results", {}).items()
                if k != agent_name
            )

            prompt = f"""你是{role}。

你的任务:基于团队目标完成你负责的部分。

团队目标:{state['task']}
其他成员的成果:
{other_results if other_results else '暂无'}

请输出你的工作成果:"""

            response = await llm.ainvoke([HumanMessage(content=prompt)])

            results = dict(state.get("agent_results", {}))
            results[agent_name] = response.content

            completed = list(state.get("completed_agents", [])) + [agent_name]

            return {
                "agent_results": results,
                "completed_agents": completed,
                "messages": [AIMessage(content=f"[{agent_name}] {response.content[:500]}")],
            }

        return agent_node

    def route_to_agent(self, state: MultiAgentState) -> str:
        """路由到指定的 Agent"""
        next_agent = state.get("next_agent", "FINISH")
        if next_agent == "FINISH":
            return "final_synthesis"
        return next_agent

    async def final_synthesis_node(self, state: MultiAgentState) -> dict:
        """最终综合 - 将所有 Agent 的成果整合为完整回答"""
        results = "\n\n".join(
            f"## {AGENT_ROLES.get(name, name)}的成果\n{content}"
            for name, content in state.get("agent_results", {}).items()
        )

        prompt = f"""请将以下各 Agent 的工作成果整合为一份完整、结构化的最终报告。

原始任务:{state['task']}

各 Agent 成果:
{results}

请输出最终报告:"""

        response = await self.supervisor_llm.ainvoke([HumanMessage(content=prompt)])
        return {
            "final_answer": response.content,
            "messages": [AIMessage(content=response.content)],
        }

    def build_graph(self) -> StateGraph:
        """构建 Multi-Agent 协作状态图"""
        graph = StateGraph(MultiAgentState)

        # 协调器节点
        graph.add_node("supervisor", self.supervisor_node)

        # 各 Agent 节点
        for name, role in AGENT_ROLES.items():
            graph.add_node(name, self._create_agent_node(name, role))

        # 最终综合节点
        graph.add_node("final_synthesis", self.final_synthesis_node)

        # 边
        graph.add_edge(START, "supervisor")

        # 从 Supervisor 路由到各 Agent 或结束
        graph.add_conditional_edges(
            source="supervisor",
            path=self.route_to_agent,
            path_map={
                **{name: name for name in AGENT_ROLES},
                "final_synthesis": "final_synthesis",
            },
        )

        # 每个 Agent 完成后回到 Supervisor
        for name in AGENT_ROLES:
            graph.add_edge(name, "supervisor")

        graph.add_edge("final_synthesis", END)

        return graph

8.8 Tool 注册与调度机制

"""
极简视界智能体开发平台 - Tool 注册与调度
统一工具管理框架
"""

from __future__ import annotations

import inspect
import time
from dataclasses import dataclass, field
from typing import Any, Callable

from langchain_core.tools import BaseTool, StructuredTool


@dataclass
class ToolMetadata:
    """工具元数据"""
    name: str
    description: str
    category: str = "general"         # 工具分类
    tags: list[str] = field(default_factory=list)
    requires_auth: bool = False       # 是否需要鉴权
    rate_limit: int = 100             # 每分钟调用限制
    timeout_seconds: int = 30         # 超时时间
    retry_count: int = 3             # 重试次数
    version: str = "1.0.0"


@dataclass
class ToolCallRecord:
    """工具调用记录"""
    tool_name: str
    args: dict[str, Any]
    result: Any
    duration_ms: float
    success: bool
    error: str | None = None
    timestamp: float = field(default_factory=time.time)


class ToolDispatcher:
    """工具调度器 - 统一管理工具注册、鉴权、执行、监控"""

    def __init__(self):
        self._tools: dict[str, BaseTool] = {}
        self._metadata: dict[str, ToolMetadata] = {}
        self._call_history: list[ToolCallRecord] = []
        self._rate_counters: dict[str, list[float]] = {}

    def register_tool(
        self,
        tool: BaseTool,
        metadata: ToolMetadata | None = None,
    ) -> None:
        """注册工具"""
        self._tools[tool.name] = tool
        self._metadata[tool.name] = metadata or ToolMetadata(
            name=tool.name,
            description=tool.description or "",
        )

    def register_function(
        self,
        func: Callable,
        name: str | None = None,
        description: str | None = None,
        metadata: ToolMetadata | None = None,
    ) -> None:
        """从普通函数注册为工具"""
        tool_name = name or func.__name__
        tool_desc = description or func.__doc__ or ""

        # 推断参数 schema
        sig = inspect.signature(func)
        args_schema = {}
        for param_name, param in sig.parameters.items():
            if param_name == "self":
                continue
            param_type = "string"
            if param.annotation == int:
                param_type = "integer"
            elif param.annotation == float:
                param_type = "number"
            elif param.annotation == bool:
                param_type = "boolean"
            args_schema[param_name] = {"type": param_type}

        tool = StructuredTool.from_function(
            coroutine=func if inspect.iscoroutinefunction(func) else None,
            func=func if not inspect.iscoroutinefunction(func) else None,
            name=tool_name,
            description=tool_desc,
        )

        self.register_tool(tool, metadata)

    def get_tools(self) -> list[BaseTool]:
        """获取所有已注册的工具"""
        return list(self._tools.values())

    def get_tool(self, name: str) -> BaseTool | None:
        return self._tools.get(name)

    async def dispatch(
        self,
        tool_name: str,
        args: dict[str, Any],
        user_id: str | None = None,
    ) -> ToolCallRecord:
        """调度执行工具"""
        tool = self._tools.get(tool_name)
        meta = self._metadata.get(tool_name)

        if not tool:
            return ToolCallRecord(
                tool_name=tool_name, args=args, result=None,
                duration_ms=0, success=False, error=f"工具不存在: {tool_name}",
            )

        # 速率限制检查
        if meta and not self._check_rate_limit(tool_name, meta.rate_limit):
            return ToolCallRecord(
                tool_name=tool_name, args=args, result=None,
                duration_ms=0, success=False, error="超出调用频率限制",
            )

        # 执行工具(带重试)
        max_retries = meta.retry_count if meta else 3
        last_error = None

        for attempt in range(max_retries):
            start_time = time.time()
            try:
                result = await tool.ainvoke(args)
                duration = (time.time() - start_time) * 1000

                record = ToolCallRecord(
                    tool_name=tool_name, args=args, result=result,
                    duration_ms=duration, success=True,
                )
                self._call_history.append(record)
                return record

            except Exception as e:
                last_error = str(e)
                duration = (time.time() - start_time) * 1000
                if attempt < max_retries - 1:
                    await _async_sleep(2 ** attempt)  # 指数退避

        record = ToolCallRecord(
            tool_name=tool_name, args=args, result=None,
            duration_ms=duration, success=False, error=last_error,
        )
        self._call_history.append(record)
        return record

    def _check_rate_limit(self, tool_name: str, limit: int) -> bool:
        """检查速率限制"""
        now = time.time()
        counter = self._rate_counters.setdefault(tool_name, [])
        # 清理 60 秒前的记录
        counter[:] = [t for t in counter if now - t < 60]
        if len(counter) >= limit:
            return False
        counter.append(now)
        return True

    def get_call_history(self, tool_name: str | None = None, limit: int = 50) -> list[ToolCallRecord]:
        """获取调用历史"""
        records = self._call_history
        if tool_name:
            records = [r for r in records if r.tool_name == tool_name]
        return records[-limit:]


async def _async_sleep(seconds: float) -> None:
    import asyncio
    await asyncio.sleep(seconds)

8.9 智能体编排完整示例

"""
极简视界智能体开发平台 - 智能体编排
完整示例:构建并运行一个 Multi-Agent 协作系统
"""

from __future__ import annotations

import asyncio

from langchain_openai import ChatOpenAI
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver


async def main():
    # ──────────────────────────────────────────
    # 1. 初始化组件
    # ──────────────────────────────────────────
    supervisor_llm = ChatOpenAI(model="gpt-4o", temperature=0)
    agent_llms = {
        "researcher": ChatOpenAI(model="gpt-4o", temperature=0.3),
        "analyst": ChatOpenAI(model="gpt-4o", temperature=0.1),
        "writer": ChatOpenAI(model="gpt-4o", temperature=0.7),
        "reviewer": ChatOpenAI(model="gpt-4o", temperature=0.2),
    }

    intent_classifier = IntentClassifier(llm=supervisor_llm)
    query_rewriter = QueryRewriter(llm=supervisor_llm)
    tag_processor = BuiltInTagProcessor(sensitive_words_path="config/sensitive_words.txt")

    # ──────────────────────────────────────────
    # 2. 注册工具
    # ──────────────────────────────────────────
    dispatcher = ToolDispatcher()

    from langchain_core.tools import tool

    @tool
    def web_search(query: str) -> str:
        """在网络上搜索相关信息"""
        return f"搜索结果:关于'{query}'的最新信息..."

    @tool
    def calculate(expression: str) -> str:
        """计算数学表达式"""
        try:
            result = eval(expression)
            return str(result)
        except Exception as e:
            return f"计算错误: {e}"

    dispatcher.register_tool(web_search, ToolMetadata(
        name="web_search", description="网络搜索",
        category="retrieval", rate_limit=60,
    ))
    dispatcher.register_tool(calculate, ToolMetadata(
        name="calculate", description="数学计算",
        category="computation", rate_limit=200,
    ))

    # ──────────────────────────────────────────
    # 3. 构建 ReAct Agent
    # ──────────────────────────────────────────
    react_builder = ReactAgentBuilder(
        llm=supervisor_llm,
        tools=dispatcher.get_tools(),
        intent_classifier=intent_classifier,
        query_rewriter=query_rewriter,
        tag_processor=tag_processor,
    )
    react_graph = react_builder.build_graph()

    # ──────────────────────────────────────────
    # 4. 构建 Multi-Agent 协作
    # ──────────────────────────────────────────
    orchestrator = MultiAgentOrchestrator(
        supervisor_llm=supervisor_llm,
        agent_llms=agent_llms,
    )
    multi_agent_graph = orchestrator.build_graph()

    # ──────────────────────────────────────────
    # 5. 编译并运行
    # ──────────────────────────────────────────
    checkpointer = AsyncPostgresSaver.from_conn_string(
        "postgresql+psycopg://langgraph:langgraph@localhost:5432/langgraph_agents"
    )
    checkpointer.setup()

    # 运行 ReAct Agent
    react_app = react_graph.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "react-session-001"}}
    initial_state: AgentState = {
        "messages": [HumanMessage(content="请帮我分析一下 Python 中 asyncio 和 threading 的区别")],
        "intent": "",
        "rewritten_queries": [],
        "tool_results": [],
        "iteration_count": 0,
        "max_iterations": 10,
        "final_answer": "",
        "tags": {},
    }

    print("=" * 60)
    print("ReAct Agent 执行")
    print("=" * 60)
    async for event in react_app.astream(initial_state, config, stream_mode="updates"):
        for node_name, output in event.items():
            print(f"\n[{node_name}]")
            if "messages" in output:
                for msg in output["messages"]:
                    if isinstance(msg, AIMessage):
                        print(f"  AI: {msg.content[:200]}")

    # 运行 Multi-Agent 协作
    multi_app = multi_agent_graph.compile(checkpointer=checkpointer)

    config2 = {"configurable": {"thread_id": "multi-agent-001"}}
    multi_state: MultiAgentState = {
        "messages": [],
        "task": "请撰写一份关于 2025 年 AI Agent 发展趋势的分析报告",
        "next_agent": "",
        "agent_results": {},
        "completed_agents": [],
        "final_answer": "",
    }

    print("\n" + "=" * 60)
    print("Multi-Agent 协作执行")
    print("=" * 60)
    async for event in multi_app.astream(multi_state, config2, stream_mode="updates"):
        for node_name, output in event.items():
            print(f"\n[{node_name}]")
            if "messages" in output:
                for msg in output["messages"]:
                    if isinstance(msg, AIMessage):
                        print(f"  {msg.content[:200]}")


if __name__ == "__main__":
    asyncio.run(main())

8.10 智能体执行流程总览

flowchart TD
    subgraph Input["输入层"]
        U[用户输入]
    end

    subgraph Preprocess["预处理层"]
        IC[判断器<br/>意图分类]
        ST[内置标签<br/>敏感词/情感]
        QR[问题优化器<br/>Query Rewrite]
    end

    subgraph AgentCore["智能体核心"]
        direction TB
        subgraph ReAct["ReAct 循环"]
            T[Thought<br/>思考] --> A[Action<br/>行动]
            A --> O[Observation<br/>观察]
            O --> T
        end

        subgraph PlanExec["Plan-and-Execute"]
            PL[Planner<br/>规划器] --> EX[Executor<br/>执行器]
            EX --> RP{Re-plan?}
            RP -->|是| PL
            RP -->|否| SYN[Synthesizer<br/>综合器]
        end

        subgraph MultiA["Multi-Agent"]
            SUP[Supervisor<br/>协调器] --> AG1[研究员]
            SUP --> AG2[分析师]
            SUP --> AG3[撰写员]
            SUP --> AG4[审核员]
            AG1 & AG2 & AG3 & AG4 --> SUP
        end
    end

    subgraph ToolLayer["工具层"]
        TD[Tool Dispatcher<br/>调度器]
        FL2[函数库]
        API[外部 API]
        KB2[知识库]
    end

    subgraph Output["输出层"]
        SC[安全检查]
        FMT[格式化输出]
        SSE2[SSE 流式推送]
    end

    U --> IC & ST
    IC --> QR
    QR --> ReAct & PlanExec & MultiA
    ReAct & PlanExec & MultiA --> TD
    TD --> FL2 & API & KB2
    ReAct & PlanExec & MultiA --> SC
    SC --> FMT --> SSE2

    style IC fill:#3498DB,color:#fff
    style TD fill:#E67E22,color:#fff
    style SC fill:#E74C3C,color:#fff
    style SUP fill:#9B59B6,color:#fff

9. 数据库设计

9.1 总体设计原则

本平台采用 PostgreSQL 16 作为主数据库,遵循以下设计原则:

  • 多租户隔离:所有业务表均携带 tenant_id 字段,通过行级安全策略(Row-Level Security, RLS)实现逻辑隔离。
  • 软删除机制:核心业务表使用 deleted_at 字段标记删除,保留审计追溯能力。
  • JSONB 灵活配置:对智能体配置、模型参数、工作流定义等易变结构使用 JSONB 存储,兼顾灵活性与查询性能。
  • UUID 主键:全部采用 uuid_generate_v4() 生成主键,避免分布式环境下的 ID 冲突。
  • 时间戳规范:统一使用 TIMESTAMPTZ 类型,确保时区一致性。

9.2 核心表结构设计

9.2.1 租户表 (tenants)

CREATE TABLE tenants (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name            VARCHAR(128) NOT NULL,
    plan            VARCHAR(32) NOT NULL DEFAULT 'free'
                        CHECK (plan IN ('free', 'starter', 'professional', 'enterprise')),
    status          VARCHAR(16) NOT NULL DEFAULT 'active'
                        CHECK (status IN ('active', 'suspended', 'cancelled')),
    config          JSONB NOT NULL DEFAULT '{}',
    max_users       INT NOT NULL DEFAULT 5,
    max_agents      INT NOT NULL DEFAULT 3,
    max_storage_mb  INT NOT NULL DEFAULT 1024,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at      TIMESTAMPTZ
);

CREATE INDEX idx_tenants_status ON tenants(status) WHERE deleted_at IS NULL;

COMMENT ON TABLE tenants IS '租户表 - 平台多租户管理的核心实体';
COMMENT ON COLUMN tenants.config IS '租户级配置项,如默认模型、语言偏好等';

9.2.2 用户表 (users)

CREATE TABLE users (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
    username        VARCHAR(64) NOT NULL,
    email           VARCHAR(256) NOT NULL,
    password_hash   VARCHAR(256) NOT NULL,
    avatar_url      VARCHAR(512),
    phone           VARCHAR(32),
    status          VARCHAR(16) NOT NULL DEFAULT 'active'
                        CHECK (status IN ('active', 'disabled', 'pending_activation')),
    last_login_at   TIMESTAMPTZ,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at      TIMESTAMPTZ,

    CONSTRAINT uq_users_tenant_email UNIQUE (tenant_id, email)
);

CREATE INDEX idx_users_tenant_id ON users(tenant_id);
CREATE INDEX idx_users_email ON users(email);

-- 启用行级安全策略
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON users
    USING (tenant_id = current_setting('app.current_tenant_id')::UUID);

9.2.3 角色权限表 (RBAC)

CREATE TABLE roles (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
    name            VARCHAR(64) NOT NULL,
    description     TEXT,
    is_system       BOOLEAN NOT NULL DEFAULT FALSE,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),

    CONSTRAINT uq_roles_tenant_name UNIQUE (tenant_id, name)
);

CREATE TABLE permissions (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    module          VARCHAR(64) NOT NULL,
    action          VARCHAR(32) NOT NULL,
    resource        VARCHAR(64) NOT NULL,
    description     TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),

    CONSTRAINT uq_permissions UNIQUE (module, action, resource)
);

CREATE TABLE role_permissions (
    role_id         UUID NOT NULL REFERENCES roles(id) ON DELETE CASCADE,
    permission_id   UUID NOT NULL REFERENCES permissions(id) ON DELETE CASCADE,
    granted_at      TIMESTAMPTZ NOT NULL DEFAULT now(),

    PRIMARY KEY (role_id, permission_id)
);

CREATE TABLE user_roles (
    user_id         UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    role_id         UUID NOT NULL REFERENCES roles(id) ON DELETE CASCADE,
    assigned_at     TIMESTAMPTZ NOT NULL DEFAULT now(),

    PRIMARY KEY (user_id, role_id)
);

-- 预置系统权限
INSERT INTO permissions (module, action, resource, description) VALUES
    ('agent',    'create',  '*',         '创建智能体'),
    ('agent',    'read',    '*',         '查看智能体'),
    ('agent',    'update',  '*',         '编辑智能体'),
    ('agent',    'delete',  '*',         '删除智能体'),
    ('agent',    'chat',    '*',         '与智能体对话'),
    ('workflow', 'create',  '*',         '创建工作流'),
    ('workflow', 'execute', '*',         '执行工作流'),
    ('kb',       'create',  '*',         '创建知识库'),
    ('kb',       'upload',  '*',         '上传文档'),
    ('kb',       'search',  '*',         '检索知识库'),
    ('settings', 'manage',  'tenant',    '管理租户设置'),
    ('settings', 'manage',  'llm',       '管理LLM供应商'),
    ('user',     'manage',  '*',         '管理用户'),
    ('analytics','read',    '*',         '查看运营分析');

-- 预置系统角色
INSERT INTO roles (name, description, is_system) VALUES
    ('super_admin',  '超级管理员 - 拥有所有权限', TRUE),
    ('admin',        '管理员 - 管理租户内所有资源', TRUE),
    ('developer',    '开发者 - 创建和管理智能体、工作流、知识库', TRUE),
    ('viewer',       '观察者 - 只读访问和对话', TRUE);

9.2.4 工作空间表 (workspaces)

CREATE TABLE workspaces (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
    name            VARCHAR(128) NOT NULL,
    description     TEXT,
    avatar_url      VARCHAR(512),
    status          VARCHAR(16) NOT NULL DEFAULT 'active'
                        CHECK (status IN ('active', 'archived')),
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at      TIMESTAMPTZ
);

CREATE INDEX idx_workspaces_tenant_id ON workspaces(tenant_id);

9.2.5 智能体表 (agents)

CREATE TABLE agents (
    id                UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workspace_id      UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
    name              VARCHAR(128) NOT NULL,
    description       TEXT,
    type              VARCHAR(32) NOT NULL DEFAULT 'chatbot'
                          CHECK (type IN ('chatbot', 'assistant', 'task_bot', 'rag_bot', 'workflow_bot')),
    config            JSONB NOT NULL DEFAULT '{}',
    prompt_template   TEXT,
    model_config      JSONB NOT NULL DEFAULT '{
        "provider": "deepseek",
        "model": "deepseek-chat",
        "temperature": 0.7,
        "max_tokens": 4096,
        "top_p": 0.9
    }',
    tools_config      JSONB NOT NULL DEFAULT '[]',
    avatar_url        VARCHAR(512),
    status            VARCHAR(16) NOT NULL DEFAULT 'draft'
                          CHECK (status IN ('draft', 'published', 'disabled')),
    version           INT NOT NULL DEFAULT 1,
    created_by        UUID REFERENCES users(id),
    created_at        TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at        TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at        TIMESTAMPTZ
);

CREATE INDEX idx_agents_workspace_id ON agents(workspace_id);
CREATE INDEX idx_agents_type ON agents(type);
CREATE INDEX idx_agents_status ON agents(status);
CREATE INDEX idx_agents_config_gin ON agents USING GIN (config);

COMMENT ON COLUMN agents.config IS '智能体高级配置:welcome_message, guardrails, memory_policy 等';
COMMENT ON COLUMN agents.model_config IS '模型调用参数:provider, model, temperature, max_tokens 等';
COMMENT ON COLUMN agents.tools_config IS '关联工具列表及参数映射';

9.2.6 工作流表 (workflows)

CREATE TABLE workflows (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    agent_id            UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
    name                VARCHAR(128) NOT NULL,
    description         TEXT,
    graph_definition    JSONB NOT NULL,
    version             INT NOT NULL DEFAULT 1,
    status              VARCHAR(16) NOT NULL DEFAULT 'draft'
                            CHECK (status IN ('draft', 'published', 'deprecated')),
    entry_node_id       VARCHAR(64),
    created_by          UUID REFERENCES users(id),
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at          TIMESTAMPTZ
);

CREATE INDEX idx_workflows_agent_id ON workflows(agent_id);
CREATE INDEX idx_workflows_version ON workflows(agent_id, version DESC);

-- 工作流执行实例表
CREATE TABLE workflow_executions (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_id         UUID NOT NULL REFERENCES workflows(id),
    conversation_id     UUID REFERENCES conversations(id),
    status              VARCHAR(16) NOT NULL DEFAULT 'running'
                            CHECK (status IN ('running', 'completed', 'failed', 'paused', 'cancelled')),
    current_node_id     VARCHAR(64),
    node_states         JSONB NOT NULL DEFAULT '{}',
    global_variables    JSONB NOT NULL DEFAULT '{}',
    error_info          JSONB,
    started_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    completed_at        TIMESTAMPTZ,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_wf_exec_workflow_id ON workflow_executions(workflow_id);
CREATE INDEX idx_wf_exec_status ON workflow_executions(status);

9.2.7 知识库表 (knowledge_bases)

CREATE TABLE knowledge_bases (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workspace_id        UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
    name                VARCHAR(128) NOT NULL,
    description         TEXT,
    embedding_model     VARCHAR(64) NOT NULL DEFAULT 'text-embedding-v3',
    embedding_dimension INT NOT NULL DEFAULT 1024,
    milvus_collection   VARCHAR(128) NOT NULL,
    chunk_config        JSONB NOT NULL DEFAULT '{
        "chunk_size": 512,
        "chunk_overlap": 64,
        "separator": "\n\n"
    }',
    document_count      INT NOT NULL DEFAULT 0,
    total_chunks        INT NOT NULL DEFAULT 0,
    status              VARCHAR(16) NOT NULL DEFAULT 'active'
                            CHECK (status IN ('active', 'building', 'error', 'archived')),
    created_by          UUID REFERENCES users(id),
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at          TIMESTAMPTZ
);

CREATE INDEX idx_kb_workspace_id ON knowledge_bases(workspace_id);
CREATE UNIQUE INDEX idx_kb_milvus_collection ON knowledge_bases(milvus_collection);

9.2.8 文档表 (documents)

CREATE TABLE documents (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    knowledge_base_id   UUID NOT NULL REFERENCES knowledge_bases(id) ON DELETE CASCADE,
    filename            VARCHAR(256) NOT NULL,
    file_type           VARCHAR(16) NOT NULL
                            CHECK (file_type IN ('pdf', 'docx', 'txt', 'md', 'csv', 'html', 'xlsx')),
    file_size           BIGINT NOT NULL DEFAULT 0,
    storage_path        VARCHAR(512) NOT NULL,
    status              VARCHAR(16) NOT NULL DEFAULT 'pending'
                            CHECK (status IN ('pending', 'parsing', 'chunking', 'embedding', 'completed', 'failed')),
    chunk_count         INT NOT NULL DEFAULT 0,
    error_message       TEXT,
    metadata            JSONB NOT NULL DEFAULT '{}',
    parse_config        JSONB NOT NULL DEFAULT '{}',
    created_by          UUID REFERENCES users(id),
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at          TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_documents_kb_id ON documents(knowledge_base_id);
CREATE INDEX idx_documents_status ON documents(status);

9.2.9 对话表 (conversations)

CREATE TABLE conversations (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    agent_id        UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
    user_id         UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    title           VARCHAR(256),
    metadata        JSONB NOT NULL DEFAULT '{}',
    message_count   INT NOT NULL DEFAULT 0,
    total_tokens    INT NOT NULL DEFAULT 0,
    status          VARCHAR(16) NOT NULL DEFAULT 'active'
                        CHECK (status IN ('active', 'archived')),
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_conv_agent_id ON conversations(agent_id);
CREATE INDEX idx_conv_user_id ON users(user_id);
CREATE INDEX idx_conv_created ON conversations(created_at DESC);
CREATE INDEX idx_conv_metadata ON conversations USING GIN (metadata);

9.2.10 消息表 (messages)

CREATE TABLE messages (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id     UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    role                VARCHAR(16) NOT NULL
                            CHECK (role IN ('system', 'user', 'assistant', 'tool', 'function')),
    content             TEXT NOT NULL,
    token_count         INT NOT NULL DEFAULT 0,
    metadata            JSONB NOT NULL DEFAULT '{}',
    tool_calls          JSONB,
    tool_call_id        VARCHAR(64),
    latency_ms          INT,
    model_used          VARCHAR(64),
    finish_reason       VARCHAR(32),
    sequence            INT NOT NULL DEFAULT 0,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_messages_conv_id ON messages(conversation_id);
CREATE INDEX idx_messages_sequence ON messages(conversation_id, sequence);
CREATE INDEX idx_messages_role ON messages(role);
CREATE INDEX idx_messages_created ON messages(created_at);

-- 按月分区以提升大表查询性能
-- CREATE TABLE messages PARTITION BY RANGE (created_at);

9.2.11 插件/工具表 (tools)

CREATE TABLE tools (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workspace_id    UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
    name            VARCHAR(128) NOT NULL,
    display_name    VARCHAR(128),
    description     TEXT,
    type            VARCHAR(32) NOT NULL DEFAULT 'http'
                        CHECK (type IN ('http', 'function', 'mcp', 'builtin')),
    schema          JSONB NOT NULL DEFAULT '{}',
    endpoint        VARCHAR(512),
    auth_config     JSONB NOT NULL DEFAULT '{}',
    headers         JSONB NOT NULL DEFAULT '{}',
    timeout_ms      INT NOT NULL DEFAULT 30000,
    retry_policy    JSONB NOT NULL DEFAULT '{"max_retries": 3, "backoff": "exponential"}',
    status          VARCHAR(16) NOT NULL DEFAULT 'active'
                        CHECK (status IN ('active', 'disabled')),
    created_by      UUID REFERENCES users(id),
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at      TIMESTAMPTZ
);

CREATE INDEX idx_tools_workspace_id ON tools(workspace_id);
CREATE INDEX idx_tools_type ON tools(type);

COMMENT ON COLUMN tools.schema IS 'OpenAPI/JSON Schema 格式的工具参数定义';
COMMENT ON COLUMN tools.auth_config IS '工具鉴权配置:api_key, oauth2, bearer 等';

9.2.12 LLM 供应商表 (llm_providers)

CREATE TABLE llm_providers (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
    provider_name   VARCHAR(64) NOT NULL
                        CHECK (provider_name IN ('deepseek', 'qwen', 'doubao', 'chatglm', 'openai', 'anthropic', 'custom')),
    model_name      VARCHAR(128) NOT NULL,
    display_name    VARCHAR(128),
    api_key         BYTEA NOT NULL,
    api_base_url    VARCHAR(512),
    config          JSONB NOT NULL DEFAULT '{}',
    capabilities    JSONB NOT NULL DEFAULT '[]',
    is_default      BOOLEAN NOT NULL DEFAULT FALSE,
    status          VARCHAR(16) NOT NULL DEFAULT 'active'
                        CHECK (status IN ('active', 'disabled', 'error')),
    last_tested_at  TIMESTAMPTZ,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_llm_tenant_id ON llm_providers(tenant_id);
CREATE INDEX idx_llm_provider_name ON llm_providers(provider_name);

COMMENT ON COLUMN llm_providers.api_key IS '使用 pgcrypto 的 pgp_sym_encrypt 加密存储';
COMMENT ON COLUMN llm_providers.capabilities IS '模型能力列表:["chat", "embedding", "vision", "function_calling"]';
COMMENT ON COLUMN llm_providers.config IS '供应商级别配置:rate_limit, timeout, fallback_model 等';

9.2.13 对话日志表 (conversation_logs)

CREATE TABLE conversation_logs (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id     UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    agent_id            UUID NOT NULL REFERENCES agents(id),
    tenant_id           UUID NOT NULL REFERENCES tenants(id),
    request_payload     JSONB NOT NULL,
    response_payload    JSONB NOT NULL,
    model_used          VARCHAR(64) NOT NULL,
    latency_ms          INT NOT NULL,
    prompt_tokens       INT NOT NULL DEFAULT 0,
    completion_tokens   INT NOT NULL DEFAULT 0,
    total_tokens        INT NOT NULL DEFAULT 0,
    estimated_cost      DECIMAL(10, 6) NOT NULL DEFAULT 0,
    finish_reason       VARCHAR(32),
    error_code          VARCHAR(32),
    error_message       TEXT,
    user_rating         SMALLINT CHECK (user_rating BETWEEN 1 AND 5),
    user_feedback       TEXT,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_logs_conversation_id ON conversation_logs(conversation_id);
CREATE INDEX idx_logs_agent_id ON conversation_logs(agent_id);
CREATE INDEX idx_logs_tenant_id ON conversation_logs(tenant_id);
CREATE INDEX idx_logs_created_at ON conversation_logs(created_at DESC);
CREATE INDEX idx_logs_model_used ON conversation_logs(model_used);

-- 按月分区
CREATE TABLE conversation_logs_2026_01 PARTITION OF conversation_logs
    FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE conversation_logs_2026_02 PARTITION OF conversation_logs
    FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

9.3 ER 关系图

erDiagram
    tenants ||--o{ users : "拥有"
    tenants ||--o{ workspaces : "拥有"
    tenants ||--o{ llm_providers : "配置"
    tenants ||--o{ roles : "定义"
    tenants ||--o{ conversation_logs : "产生"

    users ||--o{ user_roles : "分配"
    roles ||--o{ user_roles : "包含"
    roles ||--o{ role_permissions : "关联"
    permissions ||--o{ role_permissions : "被分配"

    workspaces ||--o{ agents : "包含"
    workspaces ||--o{ knowledge_bases : "包含"
    workspaces ||--o{ tools : "包含"

    agents ||--o{ workflows : "绑定"
    agents ||--o{ conversations : "参与"

    workflows ||--o{ workflow_executions : "执行产生"

    knowledge_bases ||--o{ documents : "包含"

    conversations ||--o{ messages : "包含"
    conversations ||--o{ conversation_logs : "记录"
    conversations ||--o{ workflow_executions : "触发"

    users ||--o{ conversations : "发起"

    tenants {
        UUID id PK
        VARCHAR name
        VARCHAR plan
        VARCHAR status
        JSONB config
        TIMESTAMPTZ created_at
    }

    users {
        UUID id PK
        UUID tenant_id FK
        VARCHAR username
        VARCHAR email
        VARCHAR password_hash
        VARCHAR status
        TIMESTAMPTZ created_at
    }

    roles {
        UUID id PK
        UUID tenant_id FK
        VARCHAR name
        BOOLEAN is_system
    }

    permissions {
        UUID id PK
        VARCHAR module
        VARCHAR action
        VARCHAR resource
    }

    role_permissions {
        UUID role_id FK
        UUID permission_id FK
    }

    user_roles {
        UUID user_id FK
        UUID role_id FK
    }

    workspaces {
        UUID id PK
        UUID tenant_id FK
        VARCHAR name
        TEXT description
        VARCHAR status
    }

    agents {
        UUID id PK
        UUID workspace_id FK
        VARCHAR name
        VARCHAR type
        JSONB config
        TEXT prompt_template
        JSONB model_config
        VARCHAR status
    }

    workflows {
        UUID id PK
        UUID agent_id FK
        VARCHAR name
        JSONB graph_definition
        INT version
        VARCHAR status
    }

    workflow_executions {
        UUID id PK
        UUID workflow_id FK
        UUID conversation_id FK
        VARCHAR status
        VARCHAR current_node_id
        JSONB node_states
    }

    knowledge_bases {
        UUID id PK
        UUID workspace_id FK
        VARCHAR name
        VARCHAR embedding_model
        VARCHAR milvus_collection
        JSONB chunk_config
    }

    documents {
        UUID id PK
        UUID knowledge_base_id FK
        VARCHAR filename
        VARCHAR file_type
        VARCHAR status
        INT chunk_count
        JSONB metadata
    }

    conversations {
        UUID id PK
        UUID agent_id FK
        UUID user_id FK
        VARCHAR title
        JSONB metadata
        INT message_count
    }

    messages {
        UUID id PK
        UUID conversation_id FK
        VARCHAR role
        TEXT content
        INT token_count
        JSONB tool_calls
        INT sequence
    }

    tools {
        UUID id PK
        UUID workspace_id FK
        VARCHAR name
        VARCHAR type
        JSONB schema
        VARCHAR endpoint
    }

    llm_providers {
        UUID id PK
        UUID tenant_id FK
        VARCHAR provider_name
        VARCHAR model_name
        BYTEA api_key
        JSONB config
        JSONB capabilities
    }

    conversation_logs {
        UUID id PK
        UUID conversation_id FK
        UUID agent_id FK
        UUID tenant_id FK
        JSONB request_payload
        JSONB response_payload
        INT latency_ms
        INT total_tokens
        DECIMAL estimated_cost
    }

10. API 接口设计

10.1 设计规范

  • RESTful 风格:遵循 REST 架构约束,使用 HTTP 方法语义化操作资源。
  • 版本管理:所有接口统一使用 /api/v1 前缀。
  • 认证方式:Bearer Token(JWT),通过 Authorization: Bearer <token> 请求头传递。
  • 分页规范:列表接口统一使用 page(页码,从 1 开始)+ page_size(每页数量,默认 20,最大 100)。
  • 响应格式:统一 JSON 响应体,包含 codemessagedata 三个字段。
  • 错误码体系:采用 6 位数字错误码,前两位表示模块(10=认证,20=租户,30=智能体...)。
  • 限流策略:默认 100 次/分钟,通过 X-RateLimit-* 响应头告知客户端。

10.2 统一响应格式

{
    "code": 0,
    "message": "success",
    "data": {},
    "request_id": "req_a1b2c3d4e5f6"
}

错误响应:

{
    "code": 300404,
    "message": "智能体不存在",
    "data": null,
    "request_id": "req_x9y8z7w6"
}

10.3 核心接口列表

10.3.1 认证模块 /api/v1/auth

方法路径说明认证
POST/auth/register用户注册(创建租户+管理员)
POST/auth/login用户登录
POST/auth/refresh-token刷新访问令牌
POST/auth/logout退出登录
GET/auth/me获取当前用户信息
PUT/auth/password修改密码

10.3.2 租户管理 /api/v1/tenants

方法路径说明认证
GET/tenants获取租户列表(超管)
GET/tenants/{id}获取租户详情
PUT/tenants/{id}更新租户信息
DELETE/tenants/{id}删除租户
GET/tenants/{id}/usage获取租户用量统计
PUT/tenants/{id}/plan升级/降级套餐

10.3.3 用户管理 /api/v1/users

方法路径说明认证
GET/users获取用户列表
POST/users创建用户
GET/users/{id}获取用户详情
PUT/users/{id}更新用户信息
DELETE/users/{id}删除用户
PUT/users/{id}/status启用/禁用用户
PUT/users/{id}/roles分配角色

10.3.4 工作空间 /api/v1/workspaces

方法路径说明认证
GET/workspaces获取工作空间列表
POST/workspaces创建工作空间
GET/workspaces/{id}获取工作空间详情
PUT/workspaces/{id}更新工作空间
DELETE/workspaces/{id}删除工作空间

10.3.5 智能体 /api/v1/agents

方法路径说明认证
GET/agents获取智能体列表
POST/agents创建智能体
GET/agents/{id}获取智能体详情
PUT/agents/{id}更新智能体
DELETE/agents/{id}删除智能体
POST/agents/{id}/publish发布智能体
POST/agents/{id}/chat发送对话消息
GET/agents/{id}/chat/streamSSE 流式对话
GET/agents/{id}/conversations获取智能体会话列表

10.3.6 工作流 /api/v1/workflows

方法路径说明认证
GET/workflows获取工作流列表
POST/workflows创建工作流
GET/workflows/{id}获取工作流详情
PUT/workflows/{id}更新工作流
DELETE/workflows/{id}删除工作流
POST/workflows/{id}/execute执行工作流
GET/workflows/{id}/executions/{exec_id}/status查询执行状态
POST/workflows/{id}/executions/{exec_id}/pause暂停执行
POST/workflows/{id}/executions/{exec_id}/resume恢复执行
GET/workflows/{id}/versions获取版本历史

10.3.7 知识库 /api/v1/knowledge-bases

方法路径说明认证
GET/knowledge-bases获取知识库列表
POST/knowledge-bases创建知识库
GET/knowledge-bases/{id}获取知识库详情
PUT/knowledge-bases/{id}更新知识库
DELETE/knowledge-bases/{id}删除知识库
POST/knowledge-bases/{id}/documents/upload上传文档
GET/knowledge-bases/{id}/documents获取文档列表
DELETE/knowledge-bases/{id}/documents/{doc_id}删除文档
POST/knowledge-bases/{id}/search语义检索
POST/knowledge-bases/{id}/rebuild重建索引

10.3.8 LLM 供应商 /api/v1/llm-providers

方法路径说明认证
GET/llm-providers获取供应商列表
POST/llm-providers添加供应商配置
GET/llm-providers/{id}获取供应商详情
PUT/llm-providers/{id}更新供应商配置
DELETE/llm-providers/{id}删除供应商配置
POST/llm-providers/{id}/test连通性测试
GET/llm-providers/models获取可用模型列表

10.3.9 对话管理 /api/v1/conversations

方法路径说明认证
GET/conversations获取对话列表
GET/conversations/{id}获取对话详情
PUT/conversations/{id}更新对话(标题等)
DELETE/conversations/{id}删除对话
GET/conversations/{id}/messages获取消息列表
POST/conversations/{id}/export导出对话记录

10.3.10 运营分析 /api/v1/analytics

方法路径说明认证
GET/analytics/overview总览数据面板
GET/analytics/usage用量趋势(按天/周/月)
GET/analytics/conversations对话分析(热度、时长)
GET/analytics/agents/{id}单个智能体分析
GET/analytics/models模型调用分析
GET/analytics/costs成本分析

10.4 关键接口详细示例

10.4.1 用户登录

请求

POST /api/v1/auth/login
Content-Type: application/json
{
    "email": "admin@kunlun-ai.com",
    "password": "SecureP@ss2026",
    "captcha_id": "cap_8f3a2b1c",
    "captcha_code": "x7Km"
}

成功响应

{
    "code": 0,
    "message": "success",
    "data": {
        "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzAxMjMiLCJ0ZW5hbnRfaWQiOiJ0X2FiYyIsInJvbGUiOiJhZG1pbiIsImlhdCI6MTcxODU2MDAwMCwiZXhwIjoxNzE4NTYzNjAwfQ.xxx",
        "refresh_token": "rt_9a8b7c6d5e4f3g2h1i",
        "token_type": "Bearer",
        "expires_in": 3600,
        "user": {
            "id": "usr_01234567-89ab-cdef-0123-456789abcdef",
            "tenant_id": "t_abcdef01-2345-6789-abcd-ef0123456789",
            "username": "管理员",
            "email": "admin@kunlun-ai.com",
            "role": "admin",
            "avatar_url": "https://cdn.kunlun-ai.com/avatars/default.png"
        }
    },
    "request_id": "req_login_2026061701"
}

10.4.2 智能体对话(SSE 流式)

请求

POST /api/v1/agents/agt_001/chat
Content-Type: application/json
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...
{
    "conversation_id": "conv_abc123",
    "message": "请帮我分析一下最近一周的销售数据趋势",
    "stream": true,
    "tools_enabled": true,
    "metadata": {
        "source": "web_widget",
        "session_id": "sess_xyz789"
    }
}

SSE 流式响应

data: {"type": "start", "conversation_id": "conv_abc123", "message_id": "msg_001"}

data: {"type": "delta", "content": "好的", "token_count": 1}

data: {"type": "delta", "content": ",让我查询", "token_count": 3}

data: {"type": "delta", "content": "一下最近一周的销售数据。", "token_count": 8}

data: {"type": "tool_call", "tool": "sales_query", "arguments": {"date_range": "last_7_days", "metrics": ["revenue", "orders"]}}

data: {"type": "tool_result", "tool": "sales_query", "result": {"revenue": 1258000, "orders": 3456, "trend": "up_12%"}}

data: {"type": "delta", "content": "根据查询结果,最近一周的销售数据如下:\n\n", "token_count": 18}

data: {"type": "delta", "content": "| 指标 | 数值 | 环比 |\n|------|------|------|\n| 总收入 | ¥1,258,000 | +12% |\n| 订单数 | 3,456 | +8% |\n\n", "token_count": 42}

data: {"type": "delta", "content": "整体呈上升趋势,收入环比增长12%,订单量增长8%。", "token_count": 58}

data: {"type": "done", "message_id": "msg_001", "usage": {"prompt_tokens": 156, "completion_tokens": 58, "total_tokens": 214}, "latency_ms": 2340}

data: [DONE]

10.4.3 知识库语义检索

请求

POST /api/v1/knowledge-bases/kb_001/search
Content-Type: application/json
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...
{
    "query": "极简视界平台支持哪些大语言模型?",
    "top_k": 5,
    "score_threshold": 0.7,
    "filters": {
        "file_type": ["pdf", "md"],
        "created_after": "2026-01-01"
    },
    "hybrid_search": true,
    "rerank": true,
    "rerank_model": "bge-reranker-v2-m3"
}

响应

{
    "code": 0,
    "message": "success",
    "data": {
        "query": "极简视界平台支持哪些大语言模型?",
        "results": [
            {
                "chunk_id": "chk_001",
                "document_id": "doc_abc",
                "document_name": "平台技术白皮书 v2.0.pdf",
                "content": "极简视界智能体开发平台原生支持主流大语言模型,包括:DeepSeek-V3/DeepSeek-R1、通义千问 Qwen-Max/Qwen-Plus、字节跳动豆包大模型、智谱 ChatGLM-4 系列,以及 OpenAI GPT-4o 系列。平台采用统一的 LLMProvider 抽象层,支持零代码切换模型供应商...",
                "score": 0.94,
                "rerank_score": 0.96,
                "metadata": {
                    "page": 12,
                    "section": "3.2 模型支持"
                }
            },
            {
                "chunk_id": "chk_002",
                "document_id": "doc_def",
                "document_name": "API参考手册.md",
                "content": "在 llm_providers 配置中,provider_name 字段支持以下取值:deepseek、qwen、doubao、chatglm、openai、anthropic。开发者也可通过 custom 类型接入任意兼容 OpenAI API 格式的模型服务...",
                "score": 0.88,
                "rerank_score": 0.91,
                "metadata": {
                    "section": "LLM供应商配置"
                }
            }
        ],
        "total": 2,
        "search_latency_ms": 186,
        "rerank_latency_ms": 45
    },
    "request_id": "req_search_2026061701"
}

10.4.4 工作流执行

请求

POST /api/v1/workflows/wf_001/execute
Content-Type: application/json
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...
{
    "input": {
        "user_query": "帮我生成一份本周的项目周报并发送到团队邮箱",
        "context": {
            "project_name": "极简视界 v2.0",
            "week_start": "2026-06-10",
            "week_end": "2026-06-16"
        }
    },
    "conversation_id": "conv_weekly_report",
    "async": true,
    "callback_url": "https://hooks.kunlun-ai.com/workflow/callback"
}

响应

{
    "code": 0,
    "message": "success",
    "data": {
        "execution_id": "exec_20260617_001",
        "workflow_id": "wf_001",
        "status": "running",
        "started_at": "2026-06-17T09:00:00Z",
        "estimated_duration_ms": 15000,
        "nodes": [
            {"id": "start", "status": "completed"},
            {"id": "gather_data", "status": "running"},
            {"id": "llm_generate", "status": "pending"},
            {"id": "format_report", "status": "pending"},
            {"id": "send_email", "status": "pending"}
        ],
        "polling_url": "/api/v1/workflows/wf_001/executions/exec_20260617_001/status"
    },
    "request_id": "req_wf_exec_2026061701"
}

11. 插件化模型管理

11.1 设计思想

平台采用 策略模式 + 插件注册 架构实现 LLM 供应商的统一管理。核心目标:

  • 统一接口:所有供应商实现相同的抽象接口,业务层无需感知底层差异。
  • 零代码扩展:新增供应商只需实现适配器并注册,无需修改核心代码。
  • 运行时切换:支持在对话级别动态切换模型,满足 A/B 测试和降级需求。
  • 流式兼容:所有供应商统一支持流式输出,内部处理协议差异。

11.2 抽象基类定义

from abc import ABC, abstractmethod
from typing import AsyncIterator, Optional
from dataclasses import dataclass, field
from enum import Enum


class ModelCapability(str, Enum):
    CHAT = "chat"
    EMBEDDING = "embedding"
    VISION = "vision"
    FUNCTION_CALLING = "function_calling"
    REASONING = "reasoning"


@dataclass
class ChatMessage:
    role: str  # "system" | "user" | "assistant" | "tool"
    content: str
    name: Optional[str] = None
    tool_calls: Optional[list[dict]] = None
    tool_call_id: Optional[str] = None


@dataclass
class ChatRequest:
    messages: list[ChatMessage]
    model: str
    temperature: float = 0.7
    max_tokens: int = 4096
    top_p: float = 0.9
    tools: Optional[list[dict]] = None
    tool_choice: Optional[str] = None
    stop: Optional[list[str]] = None
    extra_params: dict = field(default_factory=dict)


@dataclass
class ChatResponse:
    content: str
    tool_calls: Optional[list[dict]] = None
    finish_reason: str = "stop"
    usage: Optional[dict] = None
    model: str = ""
    raw_response: Optional[dict] = None


@dataclass
class StreamChunk:
    delta: str = ""
    tool_calls_delta: Optional[list[dict]] = None
    finish_reason: Optional[str] = None
    usage: Optional[dict] = None


@dataclass
class EmbeddingRequest:
    texts: list[str]
    model: str
    dimensions: Optional[int] = None


@dataclass
class EmbeddingResponse:
    embeddings: list[list[float]]
    model: str = ""
    usage: Optional[dict] = None


class AbstractLLMProvider(ABC):
    """LLM 供应商抽象基类 - 所有适配器必须继承此类"""

    def __init__(self, api_key: str, base_url: Optional[str] = None, **kwargs):
        self.api_key = api_key
        self.base_url = base_url
        self.config = kwargs

    @property
    @abstractmethod
    def provider_name(self) -> str:
        """供应商标识名称"""
        ...

    @property
    @abstractmethod
    def capabilities(self) -> list[ModelCapability]:
        """供应商支持的能力列表"""
        ...

    @abstractmethod
    async def chat(self, request: ChatRequest) -> ChatResponse:
        """同步对话接口 - 返回完整响应"""
        ...

    @abstractmethod
    async def chat_stream(self, request: ChatRequest) -> AsyncIterator[StreamChunk]:
        """流式对话接口 - 逐步返回内容块"""
        ...

    @abstractmethod
    async def embedding(self, request: EmbeddingRequest) -> EmbeddingResponse:
        """文本向量化接口"""
        ...

    async def health_check(self) -> bool:
        """连通性检测,默认发送简短请求验证"""
        try:
            req = ChatRequest(
                messages=[ChatMessage(role="user", content="ping")],
                model=self.config.get("default_model", ""),
                max_tokens=5,
            )
            await self.chat(req)
            return True
        except Exception:
            return False

11.3 供应商适配器实现

DeepSeek 适配器

import httpx
import json
from typing import AsyncIterator


class DeepSeekProvider(AbstractLLMProvider):
    """DeepSeek 大模型适配器"""

    DEFAULT_BASE_URL = "https://api.deepseek.com/v1"

    def __init__(self, api_key: str, base_url: Optional[str] = None, **kwargs):
        super().__init__(api_key, base_url or self.DEFAULT_BASE_URL, **kwargs)
        self._client = httpx.AsyncClient(
            base_url=self.base_url,
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json",
            },
            timeout=httpx.Timeout(120.0, connect=10.0),
        )

    @property
    def provider_name(self) -> str:
        return "deepseek"

    @property
    def capabilities(self) -> list[ModelCapability]:
        return [
            ModelCapability.CHAT,
            ModelCapability.FUNCTION_CALLING,
            ModelCapability.REASONING,
        ]

    async def chat(self, request: ChatRequest) -> ChatResponse:
        payload = self._build_payload(request, stream=False)
        resp = await self._client.post("/chat/completions", json=payload)
        resp.raise_for_status()
        data = resp.json()

        choice = data["choices"][0]
        return ChatResponse(
            content=choice["message"]["content"] or "",
            tool_calls=choice["message"].get("tool_calls"),
            finish_reason=choice.get("finish_reason", "stop"),
            usage=data.get("usage"),
            model=data.get("model", request.model),
        )

    async def chat_stream(self, request: ChatRequest) -> AsyncIterator[StreamChunk]:
        payload = self._build_payload(request, stream=True)
        async with self._client.stream(
            "POST", "/chat/completions", json=payload
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    break
                chunk_data = json.loads(data_str)
                delta = chunk_data["choices"][0].get("delta", {})
                yield StreamChunk(
                    delta=delta.get("content", ""),
                    tool_calls_delta=delta.get("tool_calls"),
                    finish_reason=chunk_data["choices"][0].get("finish_reason"),
                    usage=chunk_data.get("usage"),
                )

    async def embedding(self, request: EmbeddingRequest) -> EmbeddingResponse:
        payload = {
            "model": request.model,
            "input": request.texts,
        }
        if request.dimensions:
            payload["dimensions"] = request.dimensions

        resp = await self._client.post("/embeddings", json=payload)
        resp.raise_for_status()
        data = resp.json()

        return EmbeddingResponse(
            embeddings=[item["embedding"] for item in data["data"]],
            model=data.get("model", request.model),
            usage=data.get("usage"),
        )

    def _build_payload(self, request: ChatRequest, stream: bool) -> dict:
        payload = {
            "model": request.model,
            "messages": [
                {"role": m.role, "content": m.content}
                for m in request.messages
            ],
            "temperature": request.temperature,
            "max_tokens": request.max_tokens,
            "top_p": request.top_p,
            "stream": stream,
        }
        if request.tools:
            payload["tools"] = request.tools
        if request.tool_choice:
            payload["tool_choice"] = request.tool_choice
        if request.stop:
            payload["stop"] = request.stop
        return payload

通义千问适配器

class QwenProvider(AbstractLLMProvider):
    """通义千问(阿里云百炼平台)适配器"""

    DEFAULT_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"

    def __init__(self, api_key: str, base_url: Optional[str] = None, **kwargs):
        super().__init__(api_key, base_url or self.DEFAULT_BASE_URL, **kwargs)
        self._client = httpx.AsyncClient(
            base_url=self.base_url,
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json",
            },
            timeout=httpx.Timeout(120.0, connect=10.0),
        )

    @property
    def provider_name(self) -> str:
        return "qwen"

    @property
    def capabilities(self) -> list[ModelCapability]:
        return [
            ModelCapability.CHAT,
            ModelCapability.EMBEDDING,
            ModelCapability.VISION,
            ModelCapability.FUNCTION_CALLING,
        ]

    async def chat(self, request: ChatRequest) -> ChatResponse:
        # 通义千问兼容 OpenAI 接口格式,处理逻辑类似
        payload = self._build_payload(request, stream=False)
        resp = await self._client.post("/chat/completions", json=payload)
        resp.raise_for_status()
        data = resp.json()
        choice = data["choices"][0]
        return ChatResponse(
            content=choice["message"]["content"] or "",
            tool_calls=choice["message"].get("tool_calls"),
            finish_reason=choice.get("finish_reason", "stop"),
            usage=data.get("usage"),
            model=data.get("model", request.model),
        )

    async def chat_stream(self, request: ChatRequest) -> AsyncIterator[StreamChunk]:
        payload = self._build_payload(request, stream=True)
        async with self._client.stream(
            "POST", "/chat/completions", json=payload
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    break
                chunk_data = json.loads(data_str)
                delta = chunk_data["choices"][0].get("delta", {})
                yield StreamChunk(
                    delta=delta.get("content", ""),
                    tool_calls_delta=delta.get("tool_calls"),
                    finish_reason=chunk_data["choices"][0].get("finish_reason"),
                    usage=chunk_data.get("usage"),
                )

    async def embedding(self, request: EmbeddingRequest) -> EmbeddingResponse:
        payload = {
            "model": request.model or "text-embedding-v3",
            "input": request.texts,
        }
        if request.dimensions:
            payload["dimensions"] = request.dimensions
        resp = await self._client.post("/embeddings", json=payload)
        resp.raise_for_status()
        data = resp.json()
        return EmbeddingResponse(
            embeddings=[item["embedding"] for item in data["data"]],
            model=data.get("model", request.model),
            usage=data.get("usage"),
        )

    def _build_payload(self, request: ChatRequest, stream: bool) -> dict:
        payload = {
            "model": request.model,
            "messages": [
                {"role": m.role, "content": m.content}
                for m in request.messages
            ],
            "temperature": request.temperature,
            "max_tokens": request.max_tokens,
            "top_p": request.top_p,
            "stream": stream,
        }
        if request.tools:
            payload["tools"] = request.tools
        # 通义千问特有参数
        if request.extra_params.get("enable_search"):
            payload["enable_search"] = True
        return payload

11.4 模型注册与发现机制

import importlib
import inspect
from typing import Type
import logging

logger = logging.getLogger(__name__)


class ProviderRegistry:
    """LLM 供应商注册中心 - 管理所有适配器的注册、发现与实例化"""

    _providers: dict[str, Type[AbstractLLMProvider]] = {}
    _instances: dict[str, AbstractLLMProvider] = {}

    @classmethod
    def register(cls, provider_class: Type[AbstractLLMProvider]) -> Type[AbstractLLMProvider]:
        """注册供应商适配器(可作为装饰器使用)"""
        instance = provider_class.__new__(provider_class)
        name = instance.provider_name
        if name in cls._providers:
            logger.warning(f"覆盖已注册的供应商: {name}")
        cls._providers[name] = provider_class
        logger.info(f"注册供应商: {name}, 能力: {instance.capabilities}")
        return provider_class

    @classmethod
    def get_provider(
        cls,
        provider_name: str,
        api_key: str,
        base_url: Optional[str] = None,
        **kwargs,
    ) -> AbstractLLMProvider:
        """获取或创建供应商实例(单例模式)"""
        cache_key = f"{provider_name}:{api_key[:8]}"
        if cache_key not in cls._instances:
            if provider_name not in cls._providers:
                raise ValueError(f"未注册的供应商: {provider_name}")
            provider_cls = cls._providers[provider_name]
            cls._instances[cache_key] = provider_cls(
                api_key=api_key, base_url=base_url, **kwargs
            )
        return cls._instances[cache_key]

    @classmethod
    def list_providers(cls) -> list[dict]:
        """列出所有已注册的供应商信息"""
        result = []
        for name, provider_cls in cls._providers.items():
            instance = provider_cls.__new__(provider_cls)
            result.append({
                "name": name,
                "class": provider_cls.__name__,
                "capabilities": [c.value for c in instance.capabilities],
            })
        return result

    @classmethod
    def auto_discover(cls, package_path: str = "providers"):
        """自动扫描并注册指定包下的所有适配器"""
        pkg = importlib.import_module(package_path)
        for _, module_name, _ in inspect.getmembers(pkg):
            if not isinstance(module_name, str):
                continue
            try:
                module = importlib.import_module(f"{package_path}.{module_name}")
                for _, obj in inspect.getmembers(module, inspect.isclass):
                    if (
                        issubclass(obj, AbstractLLMProvider)
                        and obj is not AbstractLLMProvider
                    ):
                        cls.register(obj)
            except ImportError as e:
                logger.debug(f"跳过模块 {module_name}: {e}")


# ===== 使用装饰器注册 =====

@ProviderRegistry.register
class DoubaoProvider(AbstractLLMProvider):
    """字节跳动豆包大模型适配器"""
    # ... 实现略,结构与 QwenProvider 类似

    @property
    def provider_name(self) -> str:
        return "doubao"

    @property
    def capabilities(self) -> list[ModelCapability]:
        return [ModelCapability.CHAT, ModelCapability.FUNCTION_CALLING]


@ProviderRegistry.register
class ChatGLMProvider(AbstractLLMProvider):
    """智谱 ChatGLM 适配器"""

    @property
    def provider_name(self) -> str:
        return "chatglm"

    @property
    def capabilities(self) -> list[ModelCapability]:
        return [ModelCapability.CHAT, ModelCapability.EMBEDDING, ModelCapability.VISION]


@ProviderRegistry.register
class OpenAIProvider(AbstractLLMProvider):
    """OpenAI GPT 系列适配器"""

    @property
    def provider_name(self) -> str:
        return "openai"

    @property
    def capabilities(self) -> list[ModelCapability]:
        return [
            ModelCapability.CHAT,
            ModelCapability.EMBEDDING,
            ModelCapability.VISION,
            ModelCapability.FUNCTION_CALLING,
        ]


# ===== 应用启动时自动发现 =====
# ProviderRegistry.auto_discover("app.providers")

11.5 统一调用层(ChatService)

from typing import AsyncIterator
import structlog

logger = structlog.get_logger()


class ChatService:
    """统一对话服务层 - 屏蔽供应商差异,提供一致的调用接口"""

    def __init__(self, provider_registry: ProviderRegistry):
        self.registry = provider_registry

    async def chat(
        self,
        provider_name: str,
        api_key: str,
        request: ChatRequest,
        base_url: Optional[str] = None,
    ) -> ChatResponse:
        provider = self.registry.get_provider(
            provider_name, api_key, base_url
        )
        logger.info(
            "chat_request",
            provider=provider_name,
            model=request.model,
            msg_count=len(request.messages),
        )
        response = await provider.chat(request)
        logger.info(
            "chat_response",
            provider=provider_name,
            model=response.model,
            tokens=response.usage,
        )
        return response

    async def chat_stream(
        self,
        provider_name: str,
        api_key: str,
        request: ChatRequest,
        base_url: Optional[str] = None,
    ) -> AsyncIterator[StreamChunk]:
        provider = self.registry.get_provider(
            provider_name, api_key, base_url
        )
        async for chunk in provider.chat_stream(request):
            yield chunk

    async def embedding(
        self,
        provider_name: str,
        api_key: str,
        request: EmbeddingRequest,
        base_url: Optional[str] = None,
    ) -> EmbeddingResponse:
        provider = self.registry.get_provider(
            provider_name, api_key, base_url
        )
        return await provider.embedding(request)

12. ASR/TTS 多模态模块

12.1 模块概述

ASR(Automatic Speech Recognition)和 TTS(Text-to-Speech)模块为平台提供语音交互能力,使智能体支持语音输入和语音输出。同时集成多语言翻译能力,构建完整的多模态交互链路。

12.2 架构设计

graph TB
    subgraph 客户端
        A[语音输入] --> B[WebSocket 实时传输]
        C[文本输入]
    end

    subgraph API网关
        B --> D[ASR Gateway]
        C --> E[Chat Gateway]
    end

    subgraph ASR服务层
        D --> F{ASR Router}
        F --> G[Whisper Provider]
        F --> H[阿里云 ASR Provider]
        F --> I[讯飞 ASR Provider]
    end

    subgraph 核心处理
        G --> J[识别文本]
        H --> J
        I --> J
        J --> K[Agent Engine]
        E --> K
        K --> L[回复文本]
    end

    subgraph TTS服务层
        L --> M{TTS Router}
        M --> N[Edge TTS Provider]
        M --> O[阿里云 TTS Provider]
        M --> P[讯飞 TTS Provider]
        M --> Q[Fish Speech Provider]
    end

    subgraph 翻译服务层
        J -.-> R[翻译引擎]
        L -.-> R
        R --> S[DeepL]
        R --> T[Google Translate]
        R --> U[LLM Translation]
    end

    subgraph 输出
        N --> V[音频流输出]
        O --> V
        P --> V
        Q --> V
    end

12.3 ASR 语音识别

12.3.1 ASR 抽象接口

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import AsyncIterator
from enum import Enum


class ASRProvider(str, Enum):
    WHISPER = "whisper"
    ALIYUN = "aliyun"
    IFLYTEK = "iflytek"


@dataclass
class ASRResult:
    text: str
    language: str
    confidence: float
    duration_seconds: float
    segments: list[dict]  # [{"start": 0.0, "end": 2.5, "text": "..."}]
    provider: str


class AbstractASRProvider(ABC):
    """ASR 供应商抽象基类"""

    @abstractmethod
    async def transcribe(self, audio_data: bytes, language: str = "auto") -> ASRResult:
        """整段音频转文字"""
        ...

    @abstractmethod
    async def transcribe_stream(
        self, audio_stream: AsyncIterator[bytes], language: str = "auto"
    ) -> AsyncIterator[ASRResult]:
        """实时流式语音识别"""
        ...


class WhisperASRProvider(AbstractASRProvider):
    """OpenAI Whisper / 本地部署 whisper 适配器"""

    def __init__(self, api_key: str, model: str = "whisper-1",
                 base_url: str = "https://api.openai.com/v1"):
        self.api_key = api_key
        self.model = model
        self.base_url = base_url

    async def transcribe(self, audio_data: bytes, language: str = "auto") -> ASRResult:
        import httpx

        files = {"file": ("audio.wav", audio_data, "audio/wav")}
        data = {"model": self.model, "response_format": "verbose_json"}
        if language != "auto":
            data["language"] = language

        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"{self.base_url}/audio/transcriptions",
                headers={"Authorization": f"Bearer {self.api_key}"},
                files=files,
                data=data,
                timeout=60.0,
            )
            resp.raise_for_status()
            result = resp.json()

        return ASRResult(
            text=result["text"],
            language=result.get("language", "unknown"),
            confidence=result.get("confidence", 0.0),
            duration_seconds=result.get("duration", 0.0),
            segments=result.get("segments", []),
            provider="whisper",
        )

    async def transcribe_stream(
        self, audio_stream: AsyncIterator[bytes], language: str = "auto"
    ) -> AsyncIterator[ASRResult]:
        # Whisper API 不支持原生流式,采用分段缓冲策略
        buffer = bytearray()
        chunk_threshold = 32000 * 3  # 约 3 秒的 16kHz 16bit 音频

        async for chunk in audio_stream:
            buffer.extend(chunk)
            if len(buffer) >= chunk_threshold:
                result = await self.transcribe(bytes(buffer), language)
                buffer.clear()
                yield result

        # 处理剩余缓冲
        if buffer:
            result = await self.transcribe(bytes(buffer), language)
            yield result


class AliyunASRProvider(AbstractASRProvider):
    """阿里云智能语音交互(实时语音识别)适配器"""

    def __init__(self, access_key_id: str, access_key_secret: str,
                 app_key: str, region: str = "cn-shanghai"):
        self.access_key_id = access_key_id
        self.access_key_secret = access_key_secret
        self.app_key = app_key
        self.region = region

    async def transcribe(self, audio_data: bytes, language: str = "auto") -> ASRResult:
        # 使用阿里云录音文件识别 API
        # 1. 提交识别任务
        # 2. 轮询获取结果
        # 实现略(涉及阿里云 SDK 调用)
        ...

    async def transcribe_stream(
        self, audio_stream: AsyncIterator[bytes], language: str = "auto"
    ) -> AsyncIterator[ASRResult]:
        # 使用阿里云实时语音识别 WebSocket API
        # 基于 nls (Natural Language Service) SDK
        # 实现略
        ...

12.4 TTS 语音合成

12.4.1 TTS 抽象接口

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import AsyncIterator


@dataclass
class TTSRequest:
    text: str
    voice: str = "default"
    speed: float = 1.0
    pitch: float = 1.0
    audio_format: str = "mp3"  # "mp3" | "wav" | "pcm" | "opus"
    sample_rate: int = 24000
    language: str = "zh-CN"


@dataclass
class TTSResult:
    audio_data: bytes
    duration_seconds: float
    format: str
    sample_rate: int
    provider: str


class AbstractTTSProvider(ABC):
    """TTS 供应商抽象基类"""

    @abstractmethod
    async def synthesize(self, request: TTSRequest) -> TTSResult:
        """完整文本合成"""
        ...

    @abstractmethod
    async def synthesize_stream(
        self, request: TTSRequest
    ) -> AsyncIterator[bytes]:
        """流式语音合成 - 返回音频数据块"""
        ...


class EdgeTTSProvider(AbstractTTSProvider):
    """Microsoft Edge TTS(免费、高质量)适配器"""

    VOICE_MAP = {
        "zh-CN-female": "zh-CN-XiaoxiaoNeural",
        "zh-CN-male": "zh-CN-YunxiNeural",
        "en-US-female": "en-US-JennyNeural",
        "en-US-male": "en-US-GuyNeural",
        "ja-JP-female": "ja-JP-NanamiNeural",
    }

    async def synthesize(self, request: TTSRequest) -> TTSResult:
        import edge_tts
        import io

        voice = self.VOICE_MAP.get(request.voice, request.voice)
        communicate = edge_tts.Communicate(
            text=request.text,
            voice=voice,
            rate=f"{int((request.speed - 1) * 100):+d}%",
            pitch=f"{int((request.pitch - 1) * 100):+d}Hz",
        )

        audio_buffer = io.BytesIO()
        async for chunk in communicate.stream():
            if chunk["type"] == "audio":
                audio_buffer.write(chunk["data"])

        audio_data = audio_buffer.getvalue()
        return TTSResult(
            audio_data=audio_data,
            duration_seconds=len(audio_data) / (request.sample_rate * 2),
            format="mp3",
            sample_rate=request.sample_rate,
            provider="edge_tts",
        )

    async def synthesize_stream(
        self, request: TTSRequest
    ) -> AsyncIterator[bytes]:
        import edge_tts

        voice = self.VOICE_MAP.get(request.voice, request.voice)
        communicate = edge_tts.Communicate(
            text=request.text,
            voice=voice,
            rate=f"{int((request.speed - 1) * 100):+d}%",
        )
        async for chunk in communicate.stream():
            if chunk["type"] == "audio":
                yield chunk["data"]


class FishSpeechProvider(AbstractTTSProvider):
    """Fish Speech(开源、高保真克隆)适配器"""

    def __init__(self, base_url: str = "http://localhost:8080"):
        self.base_url = base_url

    async def synthesize(self, request: TTSRequest) -> TTSResult:
        import httpx

        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"{self.base_url}/v1/tts",
                json={
                    "text": request.text,
                    "reference_id": request.voice,
                    "format": request.audio_format,
                    "mp3_bitrate": 128,
                    "normalize": True,
                    "latency": "normal",
                },
                timeout=30.0,
            )
            resp.raise_for_status()
            audio_data = resp.content

        return TTSResult(
            audio_data=audio_data,
            duration_seconds=len(audio_data) / (request.sample_rate * 2),
            format=request.audio_format,
            sample_rate=request.sample_rate,
            provider="fish_speech",
        )

    async def synthesize_stream(
        self, request: TTSRequest
    ) -> AsyncIterator[bytes]:
        import httpx

        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                f"{self.base_url}/v1/tts",
                json={
                    "text": request.text,
                    "reference_id": request.voice,
                    "format": "pcm",
                    "streaming": True,
                },
                timeout=60.0,
            ) as resp:
                resp.raise_for_status()
                async for chunk in resp.aiter_bytes(chunk_size=4096):
                    yield chunk

12.5 多语言翻译

from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class TranslationResult:
    translated_text: str
    source_language: str
    target_language: str
    provider: str


class AbstractTranslator(ABC):
    """翻译引擎抽象基类"""

    @abstractmethod
    async def translate(
        self, text: str, source_lang: str = "auto", target_lang: str = "en"
    ) -> TranslationResult:
        ...


class LLMTranslator(AbstractTranslator):
    """基于大语言模型的翻译(复用现有 LLM 基础设施)"""

    def __init__(self, chat_service: ChatService, provider_name: str, api_key: str):
        self.chat_service = chat_service
        self.provider_name = provider_name
        self.api_key = api_key

    async def translate(
        self, text: str, source_lang: str = "auto", target_lang: str = "en"
    ) -> TranslationResult:
        request = ChatRequest(
            model="deepseek-chat",
            messages=[
                ChatMessage(
                    role="system",
                    content=(
                        f"You are a professional translator. "
                        f"Translate the following text to {target_lang}. "
                        f"Output ONLY the translation, nothing else."
                    ),
                ),
                ChatMessage(role="user", content=text),
            ],
            temperature=0.3,
            max_tokens=2048,
        )
        response = await self.chat_service.chat(
            self.provider_name, self.api_key, request
        )
        return TranslationResult(
            translated_text=response.content.strip(),
            source_language=source_lang,
            target_language=target_lang,
            provider="llm",
        )

12.6 多模态网关服务

from fastapi import APIRouter, WebSocket, UploadFile, File
from fastapi.responses import StreamingResponse

router = APIRouter(prefix="/api/v1/multimodal", tags=["multimodal"])


@router.post("/asr/transcribe")
async def transcribe_audio(
    file: UploadFile = File(...),
    language: str = "auto",
    provider: str = "whisper",
):
    """上传音频文件进行语音识别"""
    audio_data = await file.read()
    asr_provider = ASRFactory.create(provider)
    result = await asr_provider.transcribe(audio_data, language)
    return {"code": 0, "data": result.__dict__}


@router.websocket("/asr/stream")
async def stream_asr(websocket: WebSocket, provider: str = "aliyun"):
    """WebSocket 实时语音识别"""
    await websocket.accept()
    asr_provider = ASRFactory.create(provider)

    async def audio_generator():
        while True:
            data = await websocket.receive_bytes()
            if data == b"__END__":
                break
            yield data

    async for result in asr_provider.transcribe_stream(audio_generator()):
        await websocket.send_json({"type": "partial", "text": result.text})

    await websocket.send_json({"type": "final"})
    await websocket.close()


@router.post("/tts/synthesize")
async def synthesize_speech(request: TTSRequest, provider: str = "edge_tts"):
    """文本转语音"""
    tts_provider = TTSFactory.create(provider)
    result = await tts_provider.synthesize(request)
    return StreamingResponse(
        iter([result.audio_data]),
        media_type=f"audio/{result.format}",
        headers={"X-Duration-Seconds": str(result.duration_seconds)},
    )


@router.post("/tts/stream")
async def stream_tts(request: TTSRequest, provider: str = "edge_tts"):
    """流式语音合成"""
    tts_provider = TTSFactory.create(provider)

    async def audio_stream():
        async for chunk in tts_provider.synthesize_stream(request):
            yield chunk

    return StreamingResponse(audio_stream(), media_type="audio/mpeg")

13. 部署与运维架构

13.1 部署架构总览

graph TB
    subgraph 用户接入层
        U[用户/浏览器] --> CDN[CDN 加速]
        CDN --> LB[负载均衡 Nginx Ingress]
    end

    subgraph API网关层
        LB --> GW1[API Gateway Pod 1]
        LB --> GW2[API Gateway Pod 2]
        LB --> GW3[API Gateway Pod 3]
    end

    subgraph 应用服务层[K8s Cluster]
        GW1 --> APP1[Agent Service]
        GW2 --> APP2[Workflow Service]
        GW3 --> APP3[Knowledge Service]

        APP1 --> WS[WebSocket Service]
        APP2 --> WQ[Celery Workers]
        APP3 --> DP[Document Parser]
    end

    subgraph 中间件层
        APP1 --> Redis[(Redis Cluster)]
        APP2 --> Redis
        APP3 --> Redis
        WQ --> Redis
        WS --> Redis

        APP1 --> PG[(PostgreSQL 16)]
        APP2 --> PG
        APP3 --> PG

        APP1 --> MQ[RabbitMQ]
        WQ --> MQ
        DP --> MQ
    end

    subgraph AI服务层
        APP1 --> Milvus[(Milvus 向量数据库)]
        APP3 --> Milvus
        DP --> MinIO[(MinIO 对象存储)]
        APP3 --> MinIO

        APP1 --> LLM[LLM API 集群]
        LLM --> DS[DeepSeek]
        LLM --> QW[通义千问]
        LLM --> DB[豆包]
        LLM --> GLM[ChatGLM]
    end

    subgraph 监控运维层
        Prometheus[Prometheus] --> Grafana[Grafana]
        ELK[ELK Stack] --> Kibana[Kibana]
        Jaeger[Jaeger Tracing] --> JaegerUI[Jaeger UI]
    end

    APP1 -.-> Prometheus
    APP2 -.-> Prometheus
    PG -.-> Prometheus
    Redis -.-> Prometheus

13.2 Docker Compose 开发环境

# docker-compose.dev.yml
version: "3.9"

services:
  # ============ 应用服务 ============
  api:
    build:
      context: ./backend
      dockerfile: Dockerfile.dev
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://kluser:klpass@postgres:5432/kunlun_dev
      - REDIS_URL=redis://redis:6379/0
      - MILVUS_HOST=milvus-standalone
      - MILVUS_PORT=19530
      - MINIO_ENDPOINT=minio:9000
      - MINIO_ACCESS_KEY=minioadmin
      - MINIO_SECRET_KEY=minioadmin
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
      - ENV=development
      - LOG_LEVEL=DEBUG
    volumes:
      - ./backend:/app
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
      milvus-standalone:
        condition: service_healthy
    networks:
      - kunlun-net

  frontend:
    build:
      context: ./frontend
      dockerfile: Dockerfile.dev
    ports:
      - "3000:3000"
    volumes:
      - ./frontend:/app
      - /app/node_modules
    environment:
      - VITE_API_BASE_URL=http://localhost:8000
    depends_on:
      - api
    networks:
      - kunlun-net

  worker:
    build:
      context: ./backend
      dockerfile: Dockerfile.dev
    command: celery -A app.celery_app worker -l info -c 4
    environment:
      - DATABASE_URL=postgresql+asyncpg://kluser:klpass@postgres:5432/kunlun_dev
      - REDIS_URL=redis://redis:6379/0
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
    volumes:
      - ./backend:/app
    depends_on:
      - postgres
      - redis
      - rabbitmq
    networks:
      - kunlun-net

  # ============ 数据服务 ============
  postgres:
    image: postgres:16-alpine
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: kluser
      POSTGRES_PASSWORD: klpass
      POSTGRES_DB: kunlun_dev
    volumes:
      - pg_data:/var/lib/postgresql/data
      - ./migrations/postgres:/docker-entrypoint-initdb.d
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U kluser"]
      interval: 5s
      timeout: 5s
      retries: 5
    networks:
      - kunlun-net

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 3s
      retries: 5
    networks:
      - kunlun-net

  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    networks:
      - kunlun-net

  # ============ 向量数据库 ============
  etcd:
    image: quay.io/coreos/etcd:v3.5.14
    environment:
      ETCD_AUTO_COMPACTION_MODE: revision
      ETCD_AUTO_COMPACTION_RETENTION: "1000"
      ETCD_QUOTA_BACKEND_BYTES: "4294967296"
    command: etcd --advertise-client-urls=http://127.0.0.1:2379 --listen-client-urls=http://0.0.0.0:2379 --data-dir=/etcd
    volumes:
      - etcd_data:/etcd
    networks:
      - kunlun-net

  milvus-standalone:
    image: milvusdb/milvus:v2.5.4
    ports:
      - "19530:19530"
      - "9091:9091"
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    depends_on:
      - etcd
      - minio
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
      interval: 10s
      timeout: 5s
      retries: 5
    volumes:
      - milvus_data:/var/lib/milvus
    networks:
      - kunlun-net

  # ============ 对象存储 ============
  minio:
    image: minio/minio:latest
    ports:
      - "9100:9000"
      - "9101:9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    command: server /data --console-address ":9001"
    volumes:
      - minio_data:/data
    networks:
      - kunlun-net

  # ============ 监控(可选) ============
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - kunlun-net

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3001:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin123
    volumes:
      - grafana_data:/var/lib/grafana
      - ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
    depends_on:
      - prometheus
    networks:
      - kunlun-net

volumes:
  pg_data:
  redis_data:
  rabbitmq_data:
  etcd_data:
  milvus_data:
  minio_data:
  prometheus_data:
  grafana_data:

networks:
  kunlun-net:
    driver: bridge

13.3 K8s Helm Chart 生产部署

values.yaml(核心配置)

# helm/kunlun-platform/values.yaml

replicaCount: 3

image:
  repository: registry.kunlun-ai.com/kunlun-platform
  tag: "2.1.0"
  pullPolicy: IfNotPresent

service:
  type: ClusterIP
  port: 8000

ingress:
  enabled: true
  className: nginx
  annotations:
    nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
    cert-manager.io/cluster-issuer: letsencrypt-prod
  hosts:
    - host: api.kunlun-ai.com
      paths:
        - path: /
          pathType: Prefix
  tls:
    - secretName: kunlun-tls
      hosts:
        - api.kunlun-ai.com

resources:
  requests:
    cpu: 500m
    memory: 512Mi
  limits:
    cpu: 2000m
    memory: 2Gi

autoscaling:
  enabled: true
  minReplicas: 3
  maxReplicas: 20
  targetCPUUtilizationPercentage: 70
  targetMemoryUtilizationPercentage: 80

postgresql:
  enabled: true
  auth:
    username: kluser
    database: kunlun_prod
    existingSecret: kunlun-db-secret
  primary:
    persistence:
      size: 100Gi
      storageClass: gp3
    resources:
      requests:
        cpu: 1000m
        memory: 4Gi
      limits:
        cpu: 4000m
        memory: 8Gi

redis:
  enabled: true
  architecture: replication
  auth:
    existingSecret: kunlun-redis-secret
  replica:
    replicaCount: 3

milvus:
  enabled: true
  cluster:
    enabled: true
  proxy:
    replicas: 2
  queryNode:
    replicas: 3
  dataNode:
    replicas: 2

minio:
  enabled: true
  mode: distributed
  replicas: 4

rabbitmq:
  enabled: true
  replicaCount: 3
  auth:
    existingPasswordSecret: kunlun-rabbitmq-secret

Deployment 模板(核心片段)

# helm/kunlun-platform/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ .Release.Name }}-api
  labels:
    app: kunlun-api
spec:
  replicas: {{ .Values.replicaCount }}
  selector:
    matchLabels:
      app: kunlun-api
  template:
    metadata:
      labels:
        app: kunlun-api
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      containers:
        - name: api
          image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
          ports:
            - containerPort: 8000
          env:
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: kunlun-db-secret
                  key: url
            - name: REDIS_URL
              valueFrom:
                secretKeyRef:
                  name: kunlun-redis-secret
                  key: url
            - name: ENV
              value: "production"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 15
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 5
          resources:
            {{- toYaml .Values.resources | nindent 12 }}

13.4 CI/CD 流水线

# .github/workflows/deploy.yml
name: Build & Deploy Kunlun Platform

on:
  push:
    branches: [main, release/*]
    tags: ["v*"]
  pull_request:
    branches: [main]

env:
  REGISTRY: registry.kunlun-ai.com
  IMAGE_NAME: kunlun-platform

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:16-alpine
        env:
          POSTGRES_USER: test
          POSTGRES_PASSWORD: test
          POSTGRES_DB: test_db
        ports: ["5432:5432"]
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install dependencies
        run: |
          python -m pip install uv
          uv sync --frozen

      - name: Run linting
        run: |
          uv run ruff check .
          uv run mypy app/

      - name: Run tests
        env:
          DATABASE_URL: postgresql+asyncpg://test:test@localhost:5432/test_db
        run: |
          uv run pytest tests/ -v --cov=app --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v4
        with:
          file: coverage.xml

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.event_name == 'push'
    outputs:
      image_tag: ${{ steps.meta.outputs.tags }}
    steps:
      - uses: actions/checkout@v4

      - name: Docker metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
          tags: |
            type=ref,event=branch
            type=semver,pattern={{version}}
            type=sha,prefix=sha-

      - name: Login to Registry
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ secrets.REGISTRY_USERNAME }}
          password: ${{ secrets.REGISTRY_PASSWORD }}

      - name: Build and push
        uses: docker/build-push-action@v6
        with:
          context: ./backend
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max
          platforms: linux/amd64

  deploy-staging:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    environment: staging
    steps:
      - uses: actions/checkout@v4

      - name: Configure kubectl
        uses: azure/k8s-set-context@v4
        with:
          kubeconfig: ${{ secrets.KUBE_CONFIG_STAGING }}

      - name: Deploy to staging
        run: |
          helm upgrade --install kunlun-staging ./helm/kunlun-platform \
            --namespace staging \
            --set image.tag=${{ github.sha }} \
            --values ./helm/kunlun-platform/values-staging.yaml \
            --wait --timeout 5m

      - name: Run smoke tests
        run: |
          kubectl -n staging run smoke-test --rm -i --restart=Never \
            --image=curlimages/curl -- \
            curl -sf http://kunlun-staging-api/health

  deploy-production:
    needs: build
    runs-on: ubuntu-latest
    if: startsWith(github.ref, 'refs/tags/v')
    environment: production
    steps:
      - uses: actions/checkout@v4

      - name: Configure kubectl
        uses: azure/k8s-set-context@v4
        with:
          kubeconfig: ${{ secrets.KUBE_CONFIG_PROD }}

      - name: Deploy to production
        run: |
          VERSION=${GITHUB_REF#refs/tags/v}
          helm upgrade --install kunlun-prod ./helm/kunlun-platform \
            --namespace production \
            --set image.tag=${VERSION} \
            --values ./helm/kunlun-platform/values-production.yaml \
            --wait --timeout 10m

      - name: Verify rollout
        run: |
          kubectl -n production rollout status deployment/kunlun-prod-api --timeout=300s

      - name: Notify deployment
        if: always()
        run: |
          curl -X POST "${{ secrets.DINGTALK_WEBHOOK }}" \
            -H "Content-Type: application/json" \
            -d '{"msgtype": "markdown", "markdown": {"title": "部署通知", "text": "极简视界 v${VERSION} 部署${{ job.status }}"}}'

13.5 监控体系

13.5.1 Prometheus 指标

# app/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest

# 请求计数
http_requests_total = Counter(
    "kunlun_http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status_code"],
)

# 请求延迟
http_request_duration_seconds = Histogram(
    "kunlun_http_request_duration_seconds",
    "HTTP request duration in seconds",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)

# LLM 调用指标
llm_requests_total = Counter(
    "kunlun_llm_requests_total",
    "Total LLM API calls",
    ["provider", "model", "status"],
)

llm_request_duration_seconds = Histogram(
    "kunlun_llm_request_duration_seconds",
    "LLM request duration",
    ["provider", "model"],
    buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0],
)

llm_tokens_total = Counter(
    "kunlun_llm_tokens_total",
    "Total tokens consumed",
    ["provider", "model", "type"],  # type: prompt/completion
)

# 对话指标
conversations_active = Gauge(
    "kunlun_conversations_active",
    "Currently active conversations",
)

messages_processed_total = Counter(
    "kunlun_messages_processed_total",
    "Total messages processed",
    ["agent_type", "role"],
)

# 知识库指标
embedding_duration_seconds = Histogram(
    "kunlun_embedding_duration_seconds",
    "Embedding operation duration",
    ["provider", "operation"],  # operation: encode/search
)

knowledge_base_chunks = Gauge(
    "kunlun_knowledge_base_chunks",
    "Total chunks in knowledge bases",
    ["knowledge_base_id"],
)

# RAG 检索指标
rag_search_duration_seconds = Histogram(
    "kunlun_rag_search_duration_seconds",
    "RAG search duration",
    ["search_type"],  # vector/hybrid/keyword
)

rag_relevance_score = Histogram(
    "kunlun_rag_relevance_score",
    "RAG search relevance scores",
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
)

13.5.2 Grafana Dashboard 配置

{
  "dashboard": {
    "title": "极简视界 - 平台运营监控",
    "panels": [
      {
        "title": "QPS 实时趋势",
        "type": "timeseries",
        "targets": [
          {
            "expr": "rate(kunlun_http_requests_total[1m])",
            "legendFormat": "{{method}} {{endpoint}}"
          }
        ]
      },
      {
        "title": "LLM 调用延迟(P95)",
        "type": "timeseries",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(kunlun_llm_request_duration_seconds_bucket[5m]))",
            "legendFormat": "{{provider}}/{{model}}"
          }
        ]
      },
      {
        "title": "Token 消耗趋势",
        "type": "timeseries",
        "targets": [
          {
            "expr": "rate(kunlun_llm_tokens_total[5m]) * 300",
            "legendFormat": "{{provider}} {{type}}"
          }
        ]
      },
      {
        "title": "活跃对话数",
        "type": "stat",
        "targets": [
          {
            "expr": "kunlun_conversations_active"
          }
        ]
      },
      {
        "title": "RAG 检索延迟(P50/P95)",
        "type": "timeseries",
        "targets": [
          {
            "expr": "histogram_quantile(0.50, rate(kunlun_rag_search_duration_seconds_bucket[5m]))",
            "legendFormat": "P50"
          },
          {
            "expr": "histogram_quantile(0.95, rate(kunlun_rag_search_duration_seconds_bucket[5m]))",
            "legendFormat": "P95"
          }
        ]
      }
    ]
  }
}

13.6 日志体系

13.6.1 结构化日志配置

# app/logging_config.py
import structlog
import logging
import sys


def setup_logging(env: str = "production"):
    """配置结构化日志"""

    shared_processors = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
    ]

    if env == "development":
        renderer = structlog.dev.ConsoleRenderer(colors=True)
    else:
        renderer = structlog.processors.JSONRenderer()

    structlog.configure(
        processors=[
            *shared_processors,
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )

    formatter = structlog.stdlib.ProcessorFormatter(
        processors=[
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            renderer,
        ],
    )

    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(formatter)

    root_logger = logging.getLogger()
    root_logger.handlers.clear()
    root_logger.addHandler(handler)
    root_logger.setLevel(logging.DEBUG if env == "development" else logging.INFO)


# 使用示例
logger = structlog.get_logger()

async def chat_handler(agent_id: str, user_id: str):
    log = logger.bind(agent_id=agent_id, user_id=user_id)
    log.info("chat_started")

    # ... 处理对话 ...

    log.info(
        "chat_completed",
        model="deepseek-chat",
        prompt_tokens=156,
        completion_tokens=89,
        latency_ms=1234,
        tool_calls=2,
    )

13.6.2 对话日志分析管道

# app/services/conversation_analytics.py
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass
from sqlalchemy import select, func


@dataclass
class DailyAnalytics:
    date: str
    total_conversations: int
    total_messages: int
    total_tokens: int
    avg_latency_ms: float
    avg_user_rating: float
    estimated_cost: float
    top_agents: list[dict]
    top_questions: list[str]


class ConversationAnalyticsService:
    """对话日志分析与运营报表服务"""

    def __init__(self, db_session, redis_client):
        self.db = db_session
        self.redis = redis_client

    async def get_daily_overview(self, date: datetime) -> DailyAnalytics:
        """获取每日运营概览"""
        # 从 Redis 缓存尝试读取
        cache_key = f"analytics:daily:{date.strftime('%Y-%m-%d')}"
        cached = await self.redis.get(cache_key)
        if cached:
            return DailyAnalytics(**json.loads(cached))

        # 从 conversation_logs 聚合查询
        query = select(
            func.count(func.distinct(ConversationLog.conversation_id))
                .label("total_conversations"),
            func.count(ConversationLog.id).label("total_messages"),
            func.sum(ConversationLog.total_tokens).label("total_tokens"),
            func.avg(ConversationLog.latency_ms).label("avg_latency"),
            func.avg(ConversationLog.user_rating).label("avg_rating"),
            func.sum(ConversationLog.estimated_cost).label("total_cost"),
        ).where(
            func.date(ConversationLog.created_at) == date.date()
        )

        result = await self.db.execute(query)
        row = result.one()

        overview = DailyAnalytics(
            date=date.strftime("%Y-%m-%d"),
            total_conversations=row.total_conversations,
            total_messages=row.total_messages,
            total_tokens=row.total_tokens or 0,
            avg_latency_ms=float(row.avg_latency or 0),
            avg_user_rating=float(row.avg_rating or 0),
            estimated_cost=float(row.total_cost or 0),
            top_agents=await self._get_top_agents(date),
            top_questions=await self._get_top_questions(date),
        )

        # 缓存 24 小时
        await self.redis.setex(
            cache_key, 86400, json.dumps(overview.__dict__, default=str)
        )
        return overview

    async def _get_top_agents(self, date: datetime, limit: int = 10) -> list[dict]:
        """获取当日最热门智能体"""
        query = (
            select(
                ConversationLog.agent_id,
                Agent.name,
                func.count(ConversationLog.id).label("msg_count"),
            )
            .join(Agent, Agent.id == ConversationLog.agent_id)
            .where(func.date(ConversationLog.created_at) == date.date())
            .group_by(ConversationLog.agent_id, Agent.name)
            .order_by(func.count(ConversationLog.id).desc())
            .limit(limit)
        )
        result = await self.db.execute(query)
        return [
            {"agent_id": str(row.agent_id), "name": row.name, "message_count": row.msg_count}
            for row in result.all()
        ]

    async def _get_top_questions(self, date: datetime, limit: int = 20) -> list[str]:
        """获取当日高频问题(基于用户消息聚类)"""
        # 简化实现:提取高频用户消息
        query = (
            select(Message.content, func.count(Message.id).label("cnt"))
            .where(
                Message.role == "user",
                func.date(Message.created_at) == date.date(),
            )
            .group_by(Message.content)
            .order_by(func.count(Message.id).desc())
            .limit(limit)
        )
        result = await self.db.execute(query)
        return [row.content for row in result.all()]

13.7 限流与熔断

13.7.1 Redis 令牌桶限流

# app/middleware/rate_limiter.py
import time
import redis.asyncio as redis
from fastapi import Request, HTTPException


class TokenBucketRateLimiter:
    """基于 Redis + Lua 的分布式令牌桶限流器"""

    # Lua 脚本:原子化执行令牌桶算法
    LUA_SCRIPT = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local rate = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])
    local requested = tonumber(ARGV[4])

    local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
    local tokens = tonumber(bucket[1])
    local last_refill = tonumber(bucket[2])

    if tokens == nil then
        tokens = capacity
        last_refill = now
    end

    -- 补充令牌
    local elapsed = now - last_refill
    local new_tokens = math.min(capacity, tokens + elapsed * rate)

    if new_tokens >= requested then
        new_tokens = new_tokens - requested
        redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
        redis.call('EXPIRE', key, math.ceil(capacity / rate) + 10)
        return {1, math.floor(new_tokens), 0}
    else
        -- 计算等待时间
        local wait_time = (requested - new_tokens) / rate
        redis.call('EXPIRE', key, math.ceil(capacity / rate) + 10)
        return {0, math.floor(new_tokens), math.ceil(wait_time * 1000)}
    end
    """

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self._script = None

    async def _get_script(self):
        if self._script is None:
            self._script = self.redis.register_script(self.LUA_SCRIPT)
        return self._script

    async def acquire(
        self,
        key: str,
        capacity: int = 100,
        rate: float = 10.0,
        tokens: int = 1,
    ) -> tuple[bool, int, int]:
        """
        尝试获取令牌
        Returns: (allowed, remaining_tokens, wait_ms)
        """
        script = await self._get_script()
        result = await script(
            keys=[key],
            args=[capacity, rate, time.time(), tokens],
        )
        allowed = result[0] == 1
        remaining = result[1]
        wait_ms = result[2]
        return allowed, remaining, wait_ms


class RateLimitMiddleware:
    """FastAPI 限流中间件"""

    def __init__(self, redis_client: redis.Redis):
        self.limiter = TokenBucketRateLimiter(redis_client)

    async def __call__(self, request: Request, call_next):
        # 获取限流配置(可按租户/用户/API 粒度)
        tenant_id = request.headers.get("X-Tenant-ID", "anonymous")
        user_id = request.headers.get("X-User-ID", "anonymous")

        # 租户级限流
        tenant_key = f"ratelimit:tenant:{tenant_id}"
        allowed, remaining, wait_ms = await self.limiter.acquire(
            key=tenant_key,
            capacity=1000,   # 桶容量
            rate=100.0 / 60, # 每分钟 100 个请求 => 每秒补充 ~1.67
        )

        if not allowed:
            raise HTTPException(
                status_code=429,
                detail={
                    "code": 100429,
                    "message": "请求过于频繁,请稍后重试",
                    "retry_after_ms": wait_ms,
                },
                headers={
                    "X-RateLimit-Limit": "1000",
                    "X-RateLimit-Remaining": str(remaining),
                    "Retry-After": str(wait_ms // 1000 + 1),
                },
            )

        response = await call_next(request)
        response.headers["X-RateLimit-Remaining"] = str(remaining)
        return response

13.7.2 熔断器

# app/utils/circuit_breaker.py
import time
import asyncio
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Any


class CircuitState(str, Enum):
    CLOSED = "closed"        # 正常状态
    OPEN = "open"            # 熔断状态
    HALF_OPEN = "half_open"  # 半开状态


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5      # 连续失败次数阈值
    recovery_timeout: float = 30.0  # 熔断恢复超时(秒)
    half_open_max_calls: int = 3    # 半开状态最大探测次数
    success_threshold: int = 2      # 半开成功次数阈值


class CircuitBreaker:
    """熔断器 - 防止级联故障"""

    def __init__(self, name: str, config: CircuitBreakerConfig = CircuitBreakerConfig()):
        self.name = name
        self.config = config
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.half_open_calls = 0
        self.last_failure_time = 0.0
        self._lock = asyncio.Lock()

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        async with self._lock:
            if self.state == CircuitState.OPEN:
                if time.time() - self.last_failure_time >= self.config.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.half_open_calls = 0
                    self.success_count = 0
                else:
                    raise CircuitBreakerOpenError(
                        f"熔断器 {self.name} 处于打开状态,请稍后重试"
                    )

            if self.state == CircuitState.HALF_OPEN:
                if self.half_open_calls >= self.config.half_open_max_calls:
                    raise CircuitBreakerOpenError(
                        f"熔断器 {self.name} 半开探测次数已满"
                    )
                self.half_open_calls += 1

        try:
            result = await func(*args, **kwargs)
            await self._on_success()
            return result
        except Exception as e:
            await self._on_failure()
            raise

    async def _on_success(self):
        async with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.config.success_threshold:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
            else:
                self.failure_count = 0

    async def _on_failure(self):
        async with self._lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.OPEN
            elif self.failure_count >= self.config.failure_threshold:
                self.state = CircuitState.OPEN


class CircuitBreakerOpenError(Exception):
    pass

14. 项目亮点与技术难点

14.1 多租户数据隔离

问题描述

平台面向 B 端客户,需在同一套基础设施上为数千家企业提供服务。租户间数据必须严格隔离,同时兼顾资源利用率和运维效率。传统的独立数据库方案成本过高,共享数据库又存在数据泄露风险。

解决方案

采用 共享数据库 + 行级安全(RLS)+ 中间件自动注入 的三层隔离方案:

  1. 数据库层:所有业务表携带 tenant_id,通过 PostgreSQL RLS 策略在数据库层面强制过滤。
  2. 中间件层:FastAPI 中间件自动从 JWT 中提取 tenant_id,通过 SET app.current_tenant_id 注入数据库连接。
  3. 缓存层:Redis Key 统一使用 tenant:{tenant_id}: 前缀,防止跨租户缓存穿透。

技术实现

# 中间件自动设置租户上下文
class TenantMiddleware:
    async def __call__(self, request: Request, call_next):
        user = await get_current_user(request)
        tenant_id = str(user.tenant_id)
        # 在数据库连接池层面设置租户隔离
        async with db_pool.acquire() as conn:
            await conn.execute(f"SET app.current_tenant_id = '{tenant_id}'")
            request.state.db = conn
            response = await call_next(request)
        return response

效果数据

  • 单集群支撑 2000+ 租户,数据零泄露
  • 相比独立数据库方案,基础设施成本降低 75%
  • RLS 策略引入的查询性能损耗 < 3%(通过复合索引优化)

14.2 工作流引擎的状态持久化与断点恢复

问题描述

复杂智能体工作流可能包含数十个节点(LLM 调用、工具调用、条件分支、人工审批),单次执行耗时可达数分钟甚至数小时。网络抖动、服务重启、节点宕机都可能导致执行中断。需要保证工作流可以从断点精确恢复,避免重复执行和状态不一致。

解决方案

设计了 事件溯源 + 快照 的双保险持久化机制:

  1. 事件日志:每个节点的输入、输出、状态变更均作为不可变事件追加写入 workflow_executions 表。
  2. 状态快照:每执行完一个节点,将完整的节点状态图(node_states)序列化为 JSONB 快照。
  3. 断点恢复:服务重启后,加载最新的 node_states 快照,从未完成的节点继续执行。

技术实现

class WorkflowEngine:
    async def execute_node(self, execution_id: str, node: WorkflowNode):
        # 1. 执行节点
        result = await node.execute(self.context)

        # 2. 原子化更新状态(事务保护)
        async with self.db.transaction():
            await self.db.execute(
                """UPDATE workflow_executions
                   SET current_node_id = $1,
                       node_states = jsonb_set(node_states, $2, $3),
                       updated_at = now()
                   WHERE id = $4""",
                node.next_id, f'{{{node.id}}}', json.dumps(result), execution_id
            )

        # 3. 如果中断标记存在,支持从 current_node_id 恢复
        # recovery 逻辑会读取 node_states 中已完成节点的结果,
        # 跳过它们直接执行 current_node_id 对应的节点

效果数据

  • 工作流执行中断后恢复成功率 99.7%
  • 支持最长 4 小时 的长时工作流
  • 断点恢复平均耗时 < 200ms(仅需加载 JSONB 快照)

14.3 RAG 混合检索的召回率优化

问题描述

纯向量检索(Semantic Search)在处理精确关键词匹配(如产品编号、专有名词)时表现不佳;纯关键词检索(BM25)又缺乏语义理解能力。单一检索策略导致知识库问答的召回率不稳定,Top-5 召回率仅约 65%。

解决方案

构建 三路检索 + 交叉重排 的混合检索管线:

  1. 向量检索:基于 Milvus 的 HNSW 索引,使用 text-embedding-v3 模型生成 1024 维向量。
  2. 关键词检索:PostgreSQL 全文检索(tsvector + zhparser 中文分词),BM25 排序。
  3. 稀疏向量检索:使用 BGE-M3 的稀疏向量进行补充召回。
  4. 交叉重排:使用 bge-reranker-v2-m3 对三路结果合并去重后重排,输出最终 Top-K。

技术实现

class HybridRetriever:
    async def search(self, query: str, top_k: int = 5) -> list[RetrievalResult]:
        # 并行执行三路检索
        vector_task = asyncio.create_task(self._vector_search(query, top_k * 3))
        keyword_task = asyncio.create_task(self._keyword_search(query, top_k * 3))
        sparse_task = asyncio.create_task(self._sparse_search(query, top_k * 3))

        vector_results, keyword_results, sparse_results = await asyncio.gather(
            vector_task, keyword_task, sparse_task
        )

        # RRF (Reciprocal Rank Fusion) 合并
        candidates = self._rrf_merge(
            vector_results, keyword_results, sparse_results, k=60
        )

        # 交叉编码器重排
        reranked = await self.reranker.rerank(query, candidates[:top_k * 2])
        return reranked[:top_k]

    def _rrf_merge(self, *result_lists, k: int = 60) -> list:
        """Reciprocal Rank Fusion 算法"""
        scores = {}
        for results in result_lists:
            for rank, result in enumerate(results):
                doc_id = result.chunk_id
                scores[doc_id] = scores.get(doc_id, 0) + 1.0 / (k + rank + 1)
        # 按融合得分降序排列
        sorted_ids = sorted(scores, key=scores.get, reverse=True)
        return [self._id_to_result[did] for did in sorted_ids]

效果数据

  • Top-5 召回率从 65% 提升至 92%
  • 重排后 Top-1 精确率从 58% 提升至 87%
  • 混合检索端到端延迟 < 300ms(三路并行 + 异步 I/O)

14.4 插件化模型管理的零代码扩展

问题描述

国内大模型生态碎片化严重(DeepSeek、通义千问、豆包、ChatGLM 等),各厂商 API 格式存在细微差异。业务方频繁要求接入新模型,每次都需要修改核心代码、重新部署,开发效率低且风险大。

解决方案

设计 抽象基类 + 装饰器注册 + 自动发现 的插件化架构:

  1. 定义 AbstractLLMProvider 抽象基类,规范 chat / chat_stream / embedding 三个核心接口。
  2. 各供应商适配器独立为 Python 模块,通过 @ProviderRegistry.register 装饰器自注册。
  3. 应用启动时自动扫描 providers/ 目录,零配置加载所有适配器。
  4. 管理后台支持在线配置 API Key 和模型参数,热生效无需重启。

技术实现

新增一个供应商仅需创建一个文件:

# app/providers/new_provider.py
@ProviderRegistry.register
class NewModelProvider(AbstractLLMProvider):
    @property
    def provider_name(self) -> str:
        return "new_model"

    @property
    def capabilities(self):
        return [ModelCapability.CHAT, ModelCapability.FUNCTION_CALLING]

    async def chat(self, request: ChatRequest) -> ChatResponse:
        # 实现适配逻辑
        ...

    async def chat_stream(self, request: ChatRequest) -> AsyncIterator[StreamChunk]:
        # 实现流式适配
        ...

    async def embedding(self, request: EmbeddingRequest) -> EmbeddingResponse:
        # 实现向量化适配
        ...

放入 providers/ 目录后自动生效,无需修改任何现有代码。

效果数据

  • 新增供应商适配开发时间从 3 天 缩短至 2 小时
  • 平台已预置 6 家 供应商适配器,覆盖国内外主流大模型
  • 零事故热切换:运行时切换模型不影响进行中的对话

14.5 流式对话的 SSE 架构

问题描述

智能体对话涉及 LLM 推理(1-30 秒)、工具调用(1-10 秒)、知识库检索(0.5-3 秒)等多阶段异步操作。传统 HTTP 请求-响应模式导致用户长时间白屏等待,体验极差。WebSocket 方案实现复杂,且不支持 HTTP/2 多路复用和 CDN 缓存。

解决方案

采用 Server-Sent Events (SSE) 实现细粒度的流式推送:

  1. 统一事件协议:定义 start / delta / tool_call / tool_result / knowledge_hit / done 六种事件类型,前端可精确渲染每个阶段。
  2. 背压控制:基于 asyncio.Queue 实现生产者-消费者模式,防止快速生成导致内存溢出。
  3. 断线重连:客户端维护 Last-Event-ID,服务端基于 Redis Stream 缓存最近 N 条事件,支持无缝续传。
  4. 优雅终止:客户端断开时通过 CancellationToken 取消 LLM 请求,避免资源浪费。

技术实现

from fastapi.responses import StreamingResponse
import asyncio
import json


async def stream_chat(agent_id: str, request: ChatRequest, cancel_token: CancellationToken):
    """SSE 流式对话核心实现"""
    event_queue: asyncio.Queue = asyncio.Queue(maxsize=256)

    # 后台执行对话管道
    async def produce_events():
        try:
            await event_queue.put({"type": "start", "message_id": msg_id})

            async for chunk in agent_engine.stream_chat(request, cancel_token):
                if cancel_token.is_cancelled:
                    break
                if chunk.type == "text_delta":
                    await event_queue.put({"type": "delta", "content": chunk.text})
                elif chunk.type == "tool_call":
                    await event_queue.put({"type": "tool_call", "tool": chunk.name, "arguments": chunk.args})
                elif chunk.type == "tool_result":
                    await event_queue.put({"type": "tool_result", "tool": chunk.name, "result": chunk.result})

            await event_queue.put({"type": "done", "usage": usage_stats})
        finally:
            await event_queue.put(None)  # 终止信号

    task = asyncio.create_task(produce_events())

    async def event_stream():
        while True:
            event = await event_queue.get()
            if event is None:
                break
            yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Nginx 不缓冲
            "Connection": "keep-alive",
        },
    )

效果数据

  • 首 Token 延迟从平均 8 秒降至 < 1.2 秒(用户 1.2 秒内看到第一个字)
  • 对话中断后恢复成功率 99.5%(基于 Redis Stream 续传)
  • 相比 WebSocket 方案,服务端内存占用降低 40%(无长连接开销)

14.6 对话日志与运营分析

问题描述

平台上线后,运营团队需要了解各租户的使用情况、智能体质量、Token 消耗和成本分布。海量对话日志(日均 500 万+ 条)的存储、查询和分析对数据库造成巨大压力,复杂聚合查询响应时间超过 30 秒。

解决方案

构建 分层存储 + 预聚合 + 实时看板 的分析体系:

  1. 写入优化:对话日志通过 RabbitMQ 异步写入 PostgreSQL 分区表(按月分区),写入吞吐量 > 5000 TPS
  2. 预聚合层:Celery 定时任务每小时将原始日志聚合为 hourly_metrics 汇总表,覆盖日/周/月维度的核心指标。
  3. 缓存层:Redis 缓存热点查询结果(日活、日消耗),TTL 按数据新鲜度动态调整。
  4. 可视化:Grafana 接入 Prometheus + PostgreSQL,提供实时运营看板。

技术实现

# 分区表 + 预聚合
class AnalyticsAggregator:
    """定时聚合任务 - 每小时执行"""

    async def aggregate_hourly(self, hour: datetime):
        await db.execute("""
            INSERT INTO hourly_metrics (
                tenant_id, agent_id, hour, conversation_count,
                message_count, total_tokens, total_cost,
                avg_latency_ms, p95_latency_ms
            )
            SELECT
                tenant_id, agent_id,
                date_trunc('hour', created_at) AS hour,
                COUNT(DISTINCT conversation_id),
                COUNT(*),
                SUM(total_tokens),
                SUM(estimated_cost),
                AVG(latency_ms),
                percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms)
            FROM conversation_logs
            WHERE created_at >= $1 AND created_at < $2
            GROUP BY tenant_id, agent_id, date_trunc('hour', created_at)
            ON CONFLICT (tenant_id, agent_id, hour)
            DO UPDATE SET
                conversation_count = EXCLUDED.conversation_count,
                message_count = EXCLUDED.message_count,
                total_tokens = EXCLUDED.total_tokens,
                total_cost = EXCLUDED.total_cost,
                avg_latency_ms = EXCLUDED.avg_latency_ms,
                p95_latency_ms = EXCLUDED.p95_latency_ms
        """, hour, hour + timedelta(hours=1))

效果数据

  • 运营看板核心指标查询延迟从 30 秒降至 < 200ms(预聚合 + 缓存)
  • 日志存储成本降低 60%(分区表 + 冷热分离,90 天以上数据归档至对象存储)
  • 支持日均 500 万+ 条日志的实时写入与分析,系统零故障运行
0

评论区