侧边栏壁纸
  • 累计撰写 75 篇文章
  • 累计创建 62 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Langflow 源码深度学习教程

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

📚 本教程基于 Langflow v1.10.x 源码,帮助你从零开始理解这个强大的 AI 工作流平台的核心架构和实现原理。

1. 项目概述与架构总览

1.1 什么是 Langflow?

Langflow 是一个开源的、基于 Python 的可视化 AI 工作流构建平台。它允许开发者通过拖拽组件的方式快速构建 AI 应用,同时提供完整的 API 接口供外部调用。

核心特性:

  • 🎨 可视化流程编辑器(基于 React Flow)
  • 🔧 丰富的组件库(LLM、向量数据库、工具等)
  • 🚀 一键部署为 API 或 MCP Server
  • 🤖 多智能体编排能力
  • 📊 可观测性集成(LangSmith、LangFuse 等)

1.2 三层架构设计

┌─────────────────────────────────────────────────┐
│              langflow (分发层)                    │
│         CLI、配置管理、集成打包                   │
├─────────────────────────────────────────────────┤
│           langflow-base (平台层)                 │
│    API、认证、持久化、多用户、数据库               │
├─────────────────────────────────────────────────┤
│              lfx (运行时层)                      │
│      组件系统、图引擎、执行器                     │
└─────────────────────────────────────────────────┘

依赖方向: langflowlangflow-baselfx(单向依赖)

1.3 技术栈概览

层级 技术选型
后端框架 FastAPI + Pydantic v2
数据库 PostgreSQL / SQLite + SQLModel (SQLAlchemy)
缓存 Redis / 内存缓存
任务队列 Celery / AnyIO
前端框架 React 18 + TypeScript
UI 库 MUI (Material-UI), React Flow
状态管理 Zustand
构建工具 Vite
包管理 uv (推荐)
ORM 迁移 Alembic

2. 环境搭建与开发准备

2.1 前置要求

# Python 版本要求
Python >= 3.10, < 3.15

# 安装 uv 包管理器(推荐)
# Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

# macOS/Linux:
curl -LsSf https://astral.sh/uv/install.sh | sh

# Node.js >= 18 (前端开发需要)
node --version

2.2 从源码安装

# 克隆仓库
git clone https://github.com/langflow-ai/langflow.git
cd langflow

# 使用 uv 创建虚拟环境并安装依赖
uv sync

# 或者使用 make 命令(推荐用于开发)
make run_cli

2.3 开发模式启动

方式一:完整启动(前后端一体)

uv run langflow run
# 访问 http://localhost:7860

方式二:分离模式(推荐开发时使用)

# 终端 1: 启动后端
cd src/backend/base
uv run python -m langflow.__main__ run --backend-only

# 终端 2: 启动前端
cd src/frontend
npm install
npm run dev
# 访问 http://localhost:3000(前端会代理到后端 7860)

2.4 开发工具配置

VSCode 推荐扩展:

  • Python (Microsoft)
  • Pylance (Microsoft)
  • ESLint
  • TypeScript Importer
  • Thunder Client (API 测试)

.vscode/settings.json 配置示例:

{
    "python.defaultInterpreterPath": ".venv\\Scripts\\python.exe",
    "python.formatting.provider": "ruff",
    "editor.formatOnSave": true,
    "[python]": {
        "editor.codeActionsOnSave": {
            "source.organizeImports": "explicit"
        }
    }
}

3. 项目目录结构详解

langflow/
├── src/                          # 源代码根目录
│   ├── backend/                  # 后端代码
│   │   └── base/                 # langflow-base 核心包
│   │       └── langflow/
│   │           ├── __main__.py   # CLI 入口点
│   │           ├── main.py       # FastAPI 应用创建与配置
│   │           ├── api/          # API 路由层
│   │           │   ├── v1/       # V1 版本 API
│   │           │   └── v2/       # V2 版本 API
│   │           ├── agentic/      # Agent 相关功能
│   │           │   ├── services/ # Agent 服务实现
│   │           │   ├── mcp/      # MCP Server 实现
│   │           │   └── flows/    # 预置 Agent 流程
│   │           ├── services/     # 业务服务层 ⭐核心
│   │           │   ├── auth/     # 认证服务
│   │           │   ├── database/ # 数据库服务
│   │           │   ├── cache/    # 缓存服务
│   │           │   ├── storage/  # 存储服务(本地/S3)
│   │           │   ├── chat/     # 聊天服务
│   │           │   ├── flow/     # 流执行服务
│   │           │   ├── store/    # 组件商店
│   │           │   └── ...       # 其他服务
│   │           ├── processing/   # 图处理逻辑
│   │           ├── helpers/      # 辅助函数
│   │           └── alembic/      # 数据库迁移
│   │
│   ├── lfx/                      # LFX 运行时 ⭐核心
│   │   └── src/lfx/
│   │       ├── base/             # 基础类和接口
│   │       │   ├── agents/       # Agent 基类
│   │       │   ├── models/       # 模型统一接口
│   │       │   ├── tools/        # 工具基类
│   │       │   ├── vectorstores/ # 向量存储基类
│   │       │   └── embeddings/   # 嵌入模型基类
│   │       ├── components/       # 具体组件实现 ⭐
│   │       │   ├── openai/       # OpenAI 组件
│   │       │   ├── anthropic/    # Anthropic 组件
│   │       │   ├── chroma/       # ChromaDB 组件
│   │       │   └── ...           # 更多组件
│   │       ├── graph/            # 图引擎 ⭐
│   │       │   ├── graph.py      # Graph 类定义
│   │       │   ├── vertex.py    # 节点定义
│   │       │   └── edge.py      # 边定义
│   │       └── cli/              # CLI 命令
│   │
│   ├── frontend/                 # 前端代码
│   │   └── src/
│   │       ├── components/       # React 组件
│   │       ├── pages/           # 页面组件
│   │       ├── hooks/           # 自定义 Hooks
│   │       ├── store/           # Zustand 状态管理
│   │       ├── types/           # TypeScript 类型定义
│   │       ├── utils/           # 工具函数
│   │       └── api/             # API 调用封装
│   │
│   └── sdk/                      # Python SDK 客户端
│       └── src/langflow_sdk/
│           ├── client.py        # 同步客户端
│           ├── _async_client.py # 异步客户端
│           └── models.py        # 数据模型
│
├── docs/                        # 文档
├── docker/                      # Docker 配置
├── tests/                       # 测试代码
├── pyproject.toml               # 项目配置
└── Makefile                     # 常用命令快捷方式

4. 核心架构分层

4.1 分层职责说明

第一层:LFX(运行时 Runtime)

位置: src/lfx/

职责:

  • 组件注册与管理
  • 图(DAG)数据结构
  • 图执行引擎
  • 各类 AI 组件的具体实现

核心概念:

# Graph - 有向无环图,代表一个工作流
class Graph:
    vertices: List[Vertex]  # 节点列表
    edges: List[Edge]       # 边列表
    
# Vertex - 图中的节点,对应一个组件实例
class Vertex:
    id: str
    data: ComponentData
    is_input: bool          # 是否是输入节点
    is_output: bool         # 是否是输出节点
    
# Edge - 连接两个节点的边
class Edge:
    source_id: str          # 源节点 ID
    target_id: str          # 目标节点 ID
    source_handle: str      # 源输出端口
    target_handle: str      # 目标输入端口

第二层:Langflow-base(平台 Platform)

位置: src/backend/base/langflow/

职责:

  • HTTP API 接口(FastAPI)
  • 用户认证与授权
  • 数据持久化(数据库)
  • 文件存储管理
  • 会话管理
  • 缓存策略
  • 多租户支持

第三层:Langflow(分发 Distribution)

位置: 根目录 pyproject.toml

职责:

  • CLI 命令行工具
  • 配置文件解析
  • 依赖整合
  • 打包发布

4.2 关键设计原则

根据 PHILOSOPHY.md

  1. 视觉优先: 所有功能都应体现在可视化编辑器中
  2. 单向依赖: 严格遵守 langflow → langflow-base → lfx
  3. 组件即代码: 每个组件都可以查看和修改其 Python 源码
  4. 配置外置: 不使用隐藏的全局配置标志

5. 启动流程分析

5.1 完整启动链路

用户执行: uv run langflow run
         ↓
[1] __main__.py 入口
    - macOS fork-safety 处理
    - 初始化 Typer CLI 应用
    - 注册子命令(run, serve 等)
         ↓
[2] 执行 run 命令
    - 加载 .env 环境变量
    - 解析命令行参数
    - 启动 ProcessManager
         ↓
[3] setup_app() 在 main.py 中
    - 配置日志系统
    - 初始化 Sentry(可选)
    - initialize_services() ← 关键步骤
    - 创建 FastAPI 应用实例
    - 注册中间件
    - 注册路由
    - 挂载静态文件(前端)
         ↓
[4] Uvicorn 启动服务器
    - 监听端口(默认 7860)
    - 处理 HTTP 请求

5.2 核心入口代码解读

文件: __main__.py

# 第 1-20 行:macOS 安全处理
if __name__ == "__main__":
    if _platform.system() == "Darwin":
        # 解决 Gunicorn 在 macOS 上的 fork-safety 问题
        _os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
        _os.execv(_sys.executable, [_sys.executable, "-m", "langflow.__main__", *_sys.argv[1:]])

# 第 50-80 行:CLI 应用定义
app = typer.Typer(no_args_is_help=True)

# 注册 LFX 子命令
from lfx.cli.commands import serve_command
app.add_typer(lfx_app, name="lfx")

文件: main.py - setup_app() 函数

async def setup_app():
    # 1. 配置日志
    configure()
    
    # 2. 初始化遥测(可选)
    if settings.sentry_dsn:
        init_sentry(settings.sentry_dsn)
    
    # 3. 初始化所有服务 ⭐ 最重要
    await initialize_services(fix_migration=fix_migration)
    
    # 4. 创建 FastAPI 应用
    application = FastAPI(
        title="Langflow",
        version=get_version(),
        lifespan=lifespan,
    )
    
    # 5. 注册 CORS 中间件
    application.add_middleware(CORSMiddleware, ...)
    
    # 6. 注册 API 路由
    include_routers(application)
    
    # 7. 挂载前端静态文件(非 backend-only 模式)
    if not settings.backend_only:
        mount_static_files(application)
    
    return application

5.3 服务初始化详解

文件: services/utils.py

async def initialize_services(fix_migration=False):
    """按顺序初始化所有服务"""
    
    # 1. 设置服务(必须最先初始化)
    initialize_settings_service()
    
    # 2. 缓存服务(依赖设置服务)
    initialize_cache_service()
    
    # 3. 会话服务(依赖缓存服务)
    initialize_session_service()
    
    # 4. 数据库服务(执行迁移)
    await initialize_database_service(fix_migration=fix_migration)
    
    # 5. 存储服务
    initialize_storage_service()
    
    # 6. 认证服务
    initialize_auth_service()
    
    # 7. 其他业务服务...
    initialize_chat_service()
    initialize_variable_service()
    initialize_store_service()
    # ...

服务注册机制:

# 使用工厂模式注册服务
def get_service(service_type: ServiceType, factory: ServiceFactory):
    """获取或创建服务实例"""
    service_manager.register(service_type, factory)
    
def get_db_service() -> DatabaseService:
    """便捷方法:获取数据库服务"""
    return get_service(ServiceType.DATABASE_SERVICE)

6. 服务层深度解析

6.1 服务架构概览

Langflow 采用服务定位器模式(Service Locator Pattern),所有服务通过 ServiceManager 统一管理。

ServiceManager
├── SettingsService      # 配置管理
├── DatabaseService      # 数据库操作
├── CacheService         # 缓存(Redis/内存)
├── SessionService       # 用户会话
├── AuthService          # JWT/API Key 认证
├── StorageService       # 文件存储(本地/S3)
├── ChatService          # 聊天历史管理
├── VariableService      # 全局变量
├── StoreService         # 组件商店
├── TaskService          # 异步任务
├── TelemetryService     # 遥测收集
└── TracingService       # 分布式追踪

6.2 核心服务详解

6.2.1 数据库服务 (DatabaseService)

文件位置: services/database/

特点:

  • 使用 SQLModel(SQLAlchemy + Pydantic)
  • 支持 PostgreSQL 和 SQLite
  • 自动迁移(Alembic)

核心模型:

# services/database/models/flow/model.py
class Flow(SQLModel, table=True):
    id: UUID = Field(default_factory=uuid4, primary_key=True)
    name: str
    data: dict  # JSON 格式的流程数据
    user_id: Optional[str]
    folder_id: Optional[str]
    created_at: datetime
    updated_at: datetime

# services/database/models/user/model.py  
class User(SQLModel, table=True):
    id: UUID = Field(default_factory=uuid4, primary_key=True)
    username: str
    password: str  # 加密存储
    is_active: bool = True
    is_superuser: bool = False

# services/database/models/api_key/model.py
class ApiKey(SQLModel, table=True):
    id: UUID = Field(default_factory=uuid4, primary_key=True)
    user_id: UUID
    name: str
    api_key: str  # hash 存储
    created_at: datetime
    expires_at: Optional[datetime]

使用示例:

async with session_scope() as session:
    # 创建流程
    flow = Flow(name="My Flow", data={...}, user_id=user.id)
    session.add(flow)
    await session.commit()
    
    # 查询流程
    result = await session.exec(select(Flow).where(Flow.user_id == user.id))
    flows = result.all()

6.2.2 认证服务 (AuthService)

文件位置: services/auth/

支持的认证方式:

  1. 用户名/密码登录 → 返回 JWT Token
  2. API Key 认证 → 用于 API 调用
  3. SSO 单点登录(企业版)

认证流程:

# 登录获取 Token
POST /api/v1/login
{
    "username": "admin",
    "password": "password"
}
→ Response: { "access_token": "jwt_token...", "token_type": "bearer" }

# 使用 Token 访问 API
GET /api/v1/flows
Headers: { "Authorization": "Bearer jwt_token..." }

# 或使用 API Key
GET /api/v1/flows
Headers: { "x-api-key": "sk_..." }

JWT 配置:

# services/auth/constants.py
ACCESS_TOKEN_EXPIRE_MINUTES = 1440  # 24 小时
ALGORITHM = "HS256"
SECRET_KEY = os.getenv("LANGFLOW_SECRET_KEY", "your-secret-key")

6.2.3 存储服务 (StorageService)

文件位置: services/storage/

支持的存储后端:

  • LocalStorageService: 本地文件系统
  • S3StorageService: AWS S3 / MinIO / 兼容对象存储

接口定义:

class StorageService(ABC):
    async def save(self, path: str, data: bytes) -> None: ...
    async def load(self, path: str) -> bytes: ...
    async def delete(self, path: str) -> None: ...
    async def list_files(self, folder: str) -> List[str]: ...
    async def exists(self, path: str) -> bool: ...

切换存储后端:

# 环境变量配置
LANGFLOW_STORAGE_TYPE=s3  # 或 local
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
S3_BUCKET_NAME=langflow-files
S3_ENDPOINT_URL=https://minio.example.com

6.2.4 缓存服务 (CacheService)

用途:

  • 流程执行状态缓存
  • 会话数据缓存
  • 组件结果缓存

实现:

class ThreadingInMemoryCache(AsyncBaseCacheService):
    """线程安全的内存缓存(默认)"""

class RedisCache(AsyncBaseCacheService):
    """Redis 缓存(生产环境推荐)"""

6.3 服务依赖注入

Langflow 使用 FastAPI 的依赖注入系统:

# api/v1/flows.py
@router.get("/flows")
async def get_flows(
    current_user: User = Depends(get_current_user),  # 认证依赖
    db_service: DatabaseService = Depends(get_db_service),  # 服务依赖
):
    flows = await db_service.list_flows(user_id=current_user.id)
    return flows

自定义依赖示例:

# services/deps.py
async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db_service: DatabaseService = Depends(get_db_service),
) -> User:
    """验证当前用户"""
    token = credentials.credentials
    payload = jwt_decode(token)
    user = await db_service.get_user(payload.sub)
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="Invalid user")
    return user

7. API 路由层

7.1 API 结构概览

/api/v1/                    # V1 API(稳定版)
├── /login                  # 用户登录
├── /users                  # 用户管理
├── /flows                  # 流程 CRUD
├── /flows/{id}/run         # 执行流程
├── /flows/{id}/build       # 构建流程图
├── /files                  # 文件上传下载
├── /projects               # 项目管理
├── /variables              # 全局变量
├── /api-keys               # API 密钥管理
├── /store                  # 组件商店
├── /monitor                # 监控端点
├── /mcp                    # MCP 管理
└── /all                    # OpenAI 兼容接口

/api/v2/                    # V2 API(新版)
└── /files                  # 新版文件 API

7.2 核心 API 示例

流程 CRUD 操作

# api/v1/flows.py

@router.post("/flows")
async def create_flow(
    flow_data: FlowCreateRequest,
    current_user: User = Depends(get_current_user),
):
    """创建新流程"""
    flow = Flow(
        name=flow_data.name,
        data=flow_data.data,
        user_id=current_user.id,
    )
    db_service = get_db_service()
    await db_service.create_flow(flow)
    return {"id": flow.id, "name": flow.name}

@router.get("/flows/{flow_id}")
async def get_flow(flow_id: UUID):
    """获取流程详情"""
    db_service = get_db_service()
    flow = await db_service.get_flow(flow_id)
    if not flow:
        raise HTTPException(404, "Flow not found")
    return flow

@router.put("/flows/{flow_id}")
async def update_flow(
    flow_id: UUID,
    flow_update: FlowUpdateRequest,
):
    """更新流程"""
    db_service = get_db_service()
    await db_service.update_flow(flow_id, flow_update.dict())
    return {"status": "updated"}

流程执行 API

# api/v1/chat.py - Playground 执行

@router.post("/build/{flow_id}/flow")
async def build_and_run_flow(
    flow_id: UUID,
    request: ChatRequest,
):
    """
    构建 Flow 并执行
    这是 Playground 功能的核心 API
    """
    # 1. 从数据库加载流程数据
    flow = await get_flow_from_db(flow_id)
    
    # 2. 创建 Graph 对象
    graph = Graph.from_payload(flow.data)
    
    # 3. 处理 tweaks(参数覆盖)
    process_tweaks(graph, request.tweaks)
    
    # 4. 执行图
    result = await run_graph(
        graph=graph,
        input_value=request.input_value,
        input_type=request.input_type,
        output_type=request.output_type,
        stream=request.stream,
    )
    
    return result

OpenAI 兼容接口

# api/v1/openai_responses.py

@router.post("/responses")
async def openai_compatible_chat(request: OpenAIRequest):
    """
    OpenAI Responses API 兼容接口
    允许使用 OpenAI SDK 直接调用 Langflow
    """
    # 将 OpenAI 格式转换为内部格式
    flow_request = convert_openai_to_langflow(request)
    
    # 执行流程
    response = await execute_flow(flow_request)
    
    # 将响应转换回 OpenAI 格式
    return convert_to_openai_response(response)

使用示例(Python):

from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:7860/api/v1/all",
    api_key="your-langflow-api-key",
)

response = client.responses.create(
    model="your-flow-id",  # 使用 Flow ID 作为模型名
    input="Hello!",
)
print(response.output_text)

7.3 WebSocket 支持

# 实时日志流
@router.websocket("/ws/logs/{flow_id}")
async def websocket_logs(websocket: WebSocket, flow_id: UUID):
    """实时推送流程执行日志"""
    await websocket.accept()
    
    async for log in stream_logs(flow_id):
        await websocket.send_json(log)

8. 图引擎与流执行

8.1 Graph 数据结构

文件位置: src/lfx/src/lfx/graph/graph/graph.py

class Graph:
    """有向无环图(DAG),表示一个工作流"""
    
    def __init__(self):
        self.vertices: List[Vertex] = []  # 节点集合
        self.edges: List[Edge] = []       # 边集合
        self.session_id: str = ""         # 会话 ID
        self.flow_id: str = ""            # 流程 ID
        self.user_id: Optional[str] = None
        
    @classmethod
    def from_payload(cls, payload: dict) -> 'Graph':
        """从前端发送的 JSON 数据创建 Graph 对象"""
        graph = cls()
        
        # 解析节点
        for node_data in payload['data']['nodes']:
            vertex = Vertex.from_dict(node_data)
            graph.vertices.append(vertex)
            
        # 解析边
        for edge_data in payload['data']['edges']:
            edge = Edge.from_dict(edge_data)
            graph.edges.append(edge)
            
        return graph
    
    def topological_sort(self) -> List[str]:
        """拓扑排序,确定节点执行顺序"""
        # Kahn's algorithm 实现
        in_degree = {v.id: 0 for v in self.vertices}
        adj = defaultdict(list)
        
        for edge in self.edges:
            adj[edge.source_id].append(edge.target_id)
            in_degree[edge.target_id] += 1
            
        queue = deque([v for v in self.vertices if in_degree[v.id] == 0])
        order = []
        
        while queue:
            vertex = queue.popleft()
            order.append(vertex.id)
            for neighbor in adj[vertex.id]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
                    
        return order

8.2 Vertex(节点)结构

class Vertex:
    """图中的顶点,对应一个组件实例"""
    
    def __init__(self):
        self.id: str = ""
        self.vertex_type: ComponentEnum = ComponentEnum.COMPONENT
        self.base_type: BaseTypeEnum = BaseTypeEnum.COMPONENT
        self.data: VertexData = VertexData()
        self.is_input: bool = False
        self.is_output: bool = False
        self._built_object: Any = None  # 组件实例缓存
        self._params: Dict = {}         # 参数缓存
        
    async def build(self, *args, **kwargs) -> Any:
        """构建组件实例(懒加载)"""
        if self._built_object is None:
            component_class = get_component_class(self.data.type)
            params = self.get_params()
            self._built_object = component_class(**params)
        return self._built_object
    
    def get_params(self) -> Dict:
        """获取组件参数,处理字段类型转换"""
        param_handler = ParameterHandler(self, get_storage_service())
        field_params, load_from_db_fields = param_handler.process_field_parameters()
        return {**field_params, **load_from_db_fields}

8.3 Edge(边)结构

class Edge:
    """连接两个节点的边"""
    
    def __init__(self):
        self.source_id: str = ""      # 源节点 ID
        self.target_id: str = ""      # 目标节点 ID
        self.source_handle: str = ""  # 源输出端口名称
        self.target_handle: str = ""  # 目标输入端口名称
        self.edge_type: EdgeTypeEnum = EdgeTypeEnum.ACYCLIC
        
    def validate(self, source_vertex: Vertex, target_vertex: Vertex) -> bool:
        """验证边的合法性(类型检查)"""
        output_field = source_vertex.get_output_field(self.source_handle)
        input_field = target_vertex.get_input_field(self.target_handle)
        
        # 检查类型兼容性
        return is_type_compatible(output_field.type, input_field.type)

8.4 流程执行引擎

文件位置: src/lfx/src/lfx/graph/graph/utils.pyservices/flow/flow_runner.py

async def run_graph(
    graph: Graph,
    input_value: str,
    input_type: str = "chat",
    output_type: str = "chat",
    stream: bool = False,
) -> Dict:
    """
    执行工作流图的主函数
    
    执行流程:
    1. 拓扑排序确定执行顺序
    2. 按顺序执行每个节点
    3. 通过边传递数据
    4. 收集最终输出
    """
    execution_order = graph.topological_sort()
    results = {}
    
    for vertex_id in execution_order:
        vertex = graph.get_vertex(vertex_id)
        
        # 准备输入数据
        inputs = collect_inputs_from_edges(vertex, graph.edges, results)
        
        # 如果是输入节点,注入用户输入
        if vertex.is_input:
            inputs['input_value'] = input_value
            inputs['input_type'] = input_type
            
        # 构建并执行组件
        try:
            built_obj = await vertex.build(**inputs)
            result = await execute_component(built_obj, vertex)
            results[vertex_id] = result
        except Exception as e:
            handle_execution_error(e, vertex)
            
    # 收集输出
    final_output = collect_output(results, graph, output_type)
    
    return {
        "result": final_output,
        "session_id": graph.session_id,
        "stream": stream,
    }

async def execute_component(component: Any, vertex: Vertex) -> Any:
    """
    执行单个组件
    
    支持的执行模式:
    1. 同步调用:component.run()
    2. 异步调用:await component.arun()
    3. 生成器:for chunk in component.stream():
    """
    if hasattr(component, 'arun'):
        return await component.arun(**vertex.params)
    elif hasattr(component, 'run'):
        return component.run(**vertex.params)
    else:
        # 默认返回组件本身(用于中间件)
        return component

8.5 Tweaks(参数覆盖)机制

def process_tweaks(graph: Graph, tweaks: Dict[str, Dict]) -> None:
    """
    处理参数覆盖
    
    tweks 格式示例:
    {
        "ChatInput-abc123": {
            "input_value": "Hello World"
        },
        "OpenAIModel-def456": {
            "temperature": 0.7,
            "max_tokens": 1000
        }
    }
    """
    for vertex_id, params in tweaks.items():
        vertex = graph.get_vertex(vertex_id)
        if vertex:
            vertex.update_params(params)

9. 组件系统

9.1 组件架构

Component (抽象基类)
├── InputComponent        # 输入组件(ChatInput, TextInput)
├── OutputComponent       # 输出组件(ChatOutput, TextOutput)
├── LLMComponent          # 大语言模型组件
├── ToolComponent         # 工具组件
├── VectorStoreComponent  # 向量存储组件
├── EmbeddingComponent    # 嵌入组件
├── MemoryComponent       # 记忆组件
├── RetrieverComponent    # 检索组件
└── CustomComponent       # 自定义组件

9.2 组件基类

文件位置: src/lfx/src/lfx/components/base/component.py(假设路径)

class Component(BaseModel):
    """所有组件的基类"""
    
    # 组件元数据
    name: str = "Component"
    description: str = "Base component"
    icon: str = "default"
    display_name: Optional[str] = None
    
    # 输入输出定义
    inputs: ClassVar[List[Field]] = []
    outputs: ClassVar[List[Field]] = []
    
    # 配置选项
    _tool_mode_config: ClassVar[Dict] = {}
    
    def build_config(self) -> Dict[str, Any]:
        """构建组件配置 schema(用于前端渲染表单)"""
        config = {}
        for field in self.inputs:
            config[field.name] = {
                "display_name": field.display_name or field.name,
                "type": field.type,
                "default": field.default,
                "required": field.required,
                "options": field.options,
                "show": field.show,
                "advanced": field.advanced,
                "dynamic": field.dynamic,
                "placeholder": field.placeholder,
            }
        return config
    
    def build(self, **kwargs) -> Any:
        """
        构建组件实例
        子类必须实现此方法
        """
        raise NotImplementedError("Subclasses must implement build()")
    
    async def abuild(self, **kwargs) -> Any:
        """异步构建(可选重写)"""
        return self.build(**kwargs)

9.3 实战:创建自定义组件

示例:翻译组件

# my_components/translator.py
from lfx.base import Component
from pydantic import Field
from typing import Optional

class TranslatorComponent(Component):
    """翻译组件 - 将文本从一种语言翻译成另一种语言"""
    
    # 元数据
    name = "Translator"
    description = "Translate text between languages"
    icon = "Language"
    
    # 输入参数
    input_text: str = Field(
        default="",
        display_name="Input Text",
        description="Text to translate",
    )
    source_language: str = Field(
        default="en",
        display_name="Source Language",
        description="Source language code (e.g., 'en', 'zh', 'ja')",
    )
    target_language: str = Field(
        default="zh",
        display_name="Target Language",
        description="Target language code",
    )
    llm: Any = Field(
        default=None,
        display_name="Language Model",
        description="LLM to use for translation",
    )
    
    # 输出定义
    outputs = [
        Field(name="translated_text", type="str"),
        Field(name="source_language", type="str"),
        Field(name="target_language", type="str"),
    ]
    
    def build_config(self):
        config = super().build_config()
        config["source_language"]["options"] = [
            {"label": "English", "value": "en"},
            {"label": "Chinese", "value": "zh"},
            {"label": "Japanese", "value": "ja"},
            {"label": "Korean", "value": "ko"},
        ]
        config["target_language"]["options"] = config["source_language"]["options"]
        return config
    
    def build(self, **kwargs):
        """构建翻译器"""
        from langchain_core.prompts import PromptTemplate
        from langchain_core.output_parsers import StrOutputParser
        
        prompt = PromptTemplate.from_template(
            "Translate the following text from {source} to {target}. "
            "Only output the translation, no explanations:\n\n{text}"
        )
        
        chain = prompt | self.llm | StrOutputParser()
        
        return chain
    
    async def arun(self, **kwargs):
        """执行翻译"""
        chain = self.build(**kwargs)
        
        result = await chain.ainvoke({
            "text": self.input_text,
            "source": self.source_language,
            "target": self.target_language,
        })
        
        return {
            "translated_text": result,
            "source_language": self.source_language,
            "target_language": self.target_language,
        }

注册自定义组件:

# 在 langflow 配置或初始化时
from my_components.translator import TranslatorComponent

# 方式一:直接注册到组件注册表
from lfx.components import registry
registry.register_component(TranslatorComponent)

# 方式二:通过 CUSTOM_COMPONENTS_DIR 环境变量
# LANGFLOW_CUSTOM_COMPONENTS=/path/to/my_components

9.4 组件分类一览

分类 目录 示例组件
输入/输出 components/io/ ChatInput, ChatOutput, TextInput, TextOutput
大模型 components/openai/, components/anthic/ OpenAI, Anthropic, Ollama, Groq
向量存储 components/chroma/, components/faiss/ Chroma, FAISS, Pinecone, QDrant
嵌入模型 components/embeddings/ OpenAIEmbeddings, HuggingFaceEmbeddings
工具 components/tools/ PythonREPL, Calculator, WebSearch
Agent base/agents/ ReActAgent, PlanAndExecuteAgent
记忆 base/memory/ BufferMemory, SummaryMemory, ConversationBuffer
检索 - VectorStoreRetriever, ContextualCompressionRetriever
文档处理 base/document_transformers/ TextSplitter, RecursiveCharacterTextSplitter

10. 前端架构

10.1 技术栈

  • React 18 + TypeScript (严格模式)
  • Vite 构建工具
  • MUI (Material-UI) UI 组件库
  • React Flow 流程图编辑器
  • Zustand 状态管理
  • TanStack Query v5 数据请求
  • i18next 国际化

10.2 目录结构

frontend/src/
├── components/           # 公共组件
│   ├── inputs/          # 输入组件(ChatInput, TextInput)
│   ├── outputs/         # 输出组件
│   ├── nodes/           # 节点编辑器组件
│   └── shared/          # 共享组件
├── pages/               # 页面组件
│   ├── FlowCanvas/      # 流程画布页面 ⭐
│   ├── Projects/        # 项目列表页
│   └── Settings/        # 设置页
├── hooks/               # 自定义 Hooks
│   ├── useFlow.ts       # 流程操作
│   ├── useGraph.ts      # 图操作
│   └── useApi.ts        # API 调用
├── store/               # Zustand Store
│   ├── flowStore.ts     # 流程状态
│   ├── uiStore.ts       # UI 状态
│   └── authStore.ts     # 认证状态
├── types/               # 类型定义
│   ├── flow.ts          # 流程类型
│   ├── api.ts           # API 类型
│   └── components.ts    # 组件类型
├── utils/               # 工具函数
│   ├── reactflowUtils.ts # React Flow 工具
│   └── mustacheUtils.ts  # 模板处理
├── api/                 # API 封装
│   ├── base.ts          # Axios 实例
│   ├── flows.ts         # 流程 API
│   └── files.ts         # 文件 API
└── constants/           # 常量定义

10.3 核心组件:FlowCanvas(流程画布)

这是最复杂也是最重要的组件,负责可视化编辑流程。

主要功能:

  1. 节点的拖拽与放置
  2. 节点间的连线
  3. 参数编辑面板
  4. Playground 测试区域
  5. 导入/导出功能

状态管理示例(Zustand):

// store/flowStore.ts
import { create } from 'zustand';

interface FlowState {
  // 流程数据
  nodes: Node[];
  edges: Edge[];
  
  // 当前选中
  selectedNodeId: string | null;
  
  // Actions
  addNode: (node: Node) => void;
  removeNode: (nodeId: string) => void;
  updateNodeData: (nodeId: string, data: Partial<NodeData>) => void;
  connectNodes: (sourceId: string, targetId: string, sourceHandle?: string, targetHandle?: string) => void;
}

export const useFlowStore = create<FlowState>((set, get) => ({
  nodes: [],
  edges: [],
  selectedNodeId: null,
  
  addNode: (node) => set((state) => ({
    nodes: [...state.nodes, node],
  })),
  
  removeNode: (nodeId) => set((state) => ({
    nodes: state.nodes.filter(n => n.id !== nodeId),
    edges: state.edges.filter(e => e.source !== nodeId && e.target !== nodeId),
  })),
  
  connectNodes: (sourceId, targetId, sourceHandle, targetHandle) => set((state) => ({
    edges: [...state.edges, {
      id: `${sourceId}-${targetId}`,
      source: sourceId,
      target: targetId,
      sourceHandle: sourceHandle || 'output',
      targetHandle: targetHandle || 'input',
    }],
  })),
}));

10.4 API 调用封装

// api/flows.ts
import axios from './base';
import type { Flow, FlowCreate, FlowUpdate } from '../types';

export const flowsApi = {
  // 获取流程列表
  list: () => 
    axios.get<Flow[]>('/api/v1/flows'),
  
  // 获取单个流程
  get: (id: string) =>
    axios.get<Flow>(`/api/v1/flows/${id}`),
  
  // 创建流程
  create: (data: FlowCreate) =>
    axios.post<Flow>('/api/v1/flows', data),
  
  // 更新流程
  update: (id: string, data: FlowUpdate) =>
    axios.put<Flow>(`/api/v1/flows/${id}`, data),
  
  // 删除流程
  delete: (id: string) =>
    axios.delete(`/api/v1/flows/${id}`),
  
  // 执行流程
  run: (id: string, data: RunRequest) =>
    axios.post(`/api/v1/run/${id}`, data, {
      responseType: 'stream',  // 支持流式响应
    }),
};

10.5 前端开发技巧

热更新开发:

cd src/frontend
npm run dev
# 修改代码后会自动刷新浏览器

组件调试:

// 使用 React DevTools 查看 Zustand 状态
// 安装 Redux DevTools 扩展(Zustand 支持)

// 在代码中临时打印状态
console.log('Current flow:', useFlowStore.getState());

性能优化建议:

  1. 使用 React.memo 包装大型组件
  2. 对于频繁更新的数据使用 useMemouseCallback
  3. 图操作使用虚拟化(只渲染可视区域的节点)
  4. 防抖保存(避免每次修改都触发 API 调用)

11. Agent 与 MCP 支持

11.1 Agent 系统

位置: src/backend/base/langflow/agentic/

Langflow 内置了完整的 Agent 编排系统:

# agentic/services/assistant_service.py
class AssistantService:
    """AI 助手服务 - 管理对话式 Agent"""
    
    async def chat(self, message: str, session_id: str, flow_id: str):
        """
        处理聊天消息
        
        流程:
        1. 加载历史消息
        2. 分类用户意图(Intent Classification)
        3. 选择合适的处理流程
        4. 执行流程并返回结果
        5. 保存对话记录
        """
        # 加载会话上下文
        context = await self.conversation_buffer.get(session_id)
        
        # 意图识别
        intent = await self.classify_intent(message, context)
        
        if intent == "flow_edit":
            # 编辑流程
            return await self.handle_flow_edit(message, flow_id)
        elif intent == "general_question":
            # 通用问答
            return await self.handle_qa(message)
        else:
            # 默认:执行当前流程
            return await self.execute_flow(flow_id, message)

预置 Agent 流程:

  • LangflowAssistant: 帮助用户构建和编辑流程
  • TranslationFlow: 翻译助手
  • SystemMessageGen: 系统消息生成器

12. MCP (Model Context Protocol) 支持

12.1 MCP Server 模式

将 Langflow 流程暴露为 MCP 工具,供其他 AI 应用调用。

启用方式:

# 方法一:CLI 启动
langflow mcp start --project-id your-project-id

# 方法二:在项目中配置
# 项目设置 → MCP Servers → 启用

MCP Server 实现:

# agentic/mcp/server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

class LangflowMCPServer:
    """Langflow MCP Server 实现"""
    
    def __init__(self):
        self.server = Server("langflow-mcp-server")
        self.server.list_tools()(self.list_tools)
        self.server.call_tool()(self.call_tool)
        
    async def list_tools(self) -> List[Tool]:
        """列出所有可用的工具(流程)"""
        flows = await self.get_project_flows()
        tools = []
        for flow in flows:
            tools.append(Tool(
                name=flow.name,
                description=flow.description,
                inputSchema=self.generate_schema(flow),
            ))
        return tools
    
    async def call_tool(self, name: str, arguments: dict) -> List[TextContent]:
        """调用工具(执行流程)"""
        flow = await self.get_flow_by_name(name)
        result = await self.execute_flow(flow, arguments)
        return [TextContent(type="text", text=str(result))]

客户端使用示例(Python):

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def main():
    server_params = StdioServerParameters(
        command="langflow",
        args=["mcp", "start", "--project-id", "your-project"],
    )
    
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            
            # 列出可用工具
            tools = await session.list_tools()
            print(tools)
            
            # 调用工具
            result = await session.call_tool("my-flow", {"input": "Hello"})
            print(result.content)

import asyncio
asyncio.run(main())

12.2 MCP Client 模式

Langflow 也可以作为 MCP Client,调用其他 MCP Server 提供的工具。

配置示例:

{
  "mcpServers": {
    "filesystem": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/dir"]
    },
    "fetch": {
      "command": "uvx",
      "args": ["mcp-server-fetch"]
    }
  }
}

12. 数据库设计

12.1 ER 关系图(简化)

User (用户)
├── id (PK)
├── username
├── password_hash
├── is_active
├── is_superuser
└── profile_image
    │
    ├── 1:N ──────── Flow (流程)
    │               ├── id (PK)
    │               ├── name
    │               ├── data (JSON)
    │               ├── user_id (FK)
    │               ├── folder_id (FK)
    │               ├── tags
    │               └── created_at
    │
    ├── 1:N ──────── ApiKey (API密钥)
    │               ├── id (PK)
    │               ├── user_id (FK)
    │               ├── name
    │               ├── api_key (hash)
    │               └── expires_at
    │
    └── 1:N ──────── Message (消息/聊天记录)
                    ├── id (PK)
                    ├── flow_id (FK)
                    ├── sender
                    ├── content
                    ├── session_id
                    └── timestamp

Folder (文件夹)
├── id (PK)
├── name
├── parent_id (自引用 FK)
└── user_id (FK)

Deployment (部署)
├── id (PK)
├── flow_id (FK)
├── name
├── status
└── endpoint_url

12.2 核心表结构 SQL

-- 用户表
CREATE TABLE "user" (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    username VARCHAR(255) UNIQUE NOT NULL,
    password VARCHAR(255) NOT NULL,
    is_active BOOLEAN DEFAULT TRUE,
    is_superuser BOOLEAN DEFAULT FALSE,
    profile_image TEXT,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- 流程表
CREATE TABLE flow (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    data JSONB NOT NULL,  -- 流程图的完整 JSON 数据
    user_id UUID REFERENCES "user"(id) ON DELETE CASCADE,
    folder_id UUID REFERENCES folder(id) ON DELETE SET NULL,
    description TEXT,
    tags TEXT[],
    status VARCHAR(50) DEFAULT 'active',
    gradient VARCHAR(100),
    icon VARCHAR(100),
    icon_bg_color VARCHAR(20),
    access_type VARCHAR(20) DEFAULT 'private',
    save_path TEXT,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- API 密钥表
CREATE TABLE apikey (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES "user"(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    api_key VARCHAR(512) NOT NULL UNIQUE,  -- 存储 hash
    created_at TIMESTAMP DEFAULT NOW(),
    expires_at TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE
);

-- 部署表
CREATE TABLE deployment (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    flow_id UUID REFERENCES flow(id) ON DELETE CASCADE,
    user_id UUID REFERENCES "user")(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    display_name VARCHAR(255),
    status VARCHAR(50) DEFAULT 'inactive',
    endpoint_name VARCHAR(255) UNIQUE,
    webhook_enabled BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

12.3 数据库迁移

创建新迁移:

cd src/backend/base
alembic revision --autogenerate -m "描述你的变更"
alembic upgrade head

迁移文件示例:

"""add new column to flow table

Revision ID: xxx
Revises: yyy
Create Date: 2024-01-01
"""

import sqlalchemy as sa
from alembic import op

def upgrade():
    op.add_column('flow', sa.Column('new_column', sa.String(), nullable=True))

def downgrade():
    op.drop_column('flow', 'new_column')

13. 实战:添加新功能

场景:为流程添加标签过滤功能

步骤 1:后端 - 添加数据库字段

# services/database/models/flow/model.py
class Flow(SQLModel, table=True):
    # ... 现有字段 ...
    tags: List[str] = Field(default=[], sa_column=Column(JSON))

创建迁移:

alembic revision --autogenerate -m "add tags to flow"
alembic upgrade head

步骤 2:后端 - 更新 API Schema

# api/schemas.py
class FlowCreate(BaseModel):
    name: str
    data: dict
    tags: List[str] = []

class FlowRead(BaseModel):
    id: UUID
    name: str
    tags: List[str]
    # ...

class FlowUpdate(BaseModel):
    name: Optional[str] = None
    tags: Optional[List[str]] = None

步骤 3:后端 - 添加过滤查询

# api/v1/flows.py
@router.get("/flows")
async def list_flows(
    tag: Optional[str] = Query(None, description="按标签过滤"),
    current_user: User = Depends(get_current_user),
):
    query = select(Flow).where(Flow.user_id == current_user.id)
    
    if tag:
        # PostgreSQL 数组包含查询
        query = query.where(Flow.tags.contains([tag]))
    
    result = await session.execute(query)
    flows = result.scalars().all()
    return flows

步骤 4:前端 - 更新类型定义

// types/flow.ts
export interface Flow {
  id: string;
  name: string;
  tags: string[];
  // ...
}

export interface FlowCreate {
  name: string;
  data: Record<string, any>;
  tags?: string[];
}

步骤 5:前端 - 添加 UI 组件

// components/TagFilter.tsx
import React from 'react';
import { Chip } from '@mui/material';

interface TagFilterProps {
  tags: string[];
  selectedTag: string | null;
  onTagSelect: (tag: string | null) => void;
}

export const TagFilter: React.FC<TagFilterProps> = ({ 
  tags, 
  selectedTag, 
  onTagSelect 
}) => (
  <div className="tag-filter">
    <Chip
      label="All"
      color={selectedTag === null ? "primary" : "default"}
      onClick={() => onTagSelect(null)}
    />
    {tags.map(tag => (
      <Chip
        key={tag}
        label={tag}
        color={selectedTag === tag ? "primary" : "default"}
        onClick={() => onTagSelect(tag)}
      />
    ))}
  </div>
);

步骤 6:集成测试

# tests/test_tags.py
@pytest.mark.asyncio
async def test_filter_flows_by_tag(async_client, test_user):
    # 创建带标签的流程
    flow1 = await create_flow(client, user=test_user, tags=["agent"])
    flow2 = await create_flow(client, user=test_user, tags=["rag"])
    
    # 测试过滤
    response = await async_client.get(
        "/api/v1/flows?tag=agent",
        headers={"Authorization": f"Bearer {token}"}
    )
    
    assert response.status_code == 200
    assert len(response.json()) == 1
    assert response.json()[0]["id"] == flow1.id

14. 调试技巧

14.1 后端调试

开启 DEBUG 日志:

# 环境变量
LANGFLOW_LOG_LEVEL=DEBUG

# 或者在代码中
import logging
logging.getLogger("langflow").setLevel(logging.DEBUG)

使用 pdb 断点调试:

def some_function():
    import pdb; pdb.set_trace()  # 在这里暂停
    # 你的代码

VSCode 调试配置(.vscode/launch.json):

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Langflow Backend",
      "type": "debugpy",
      "request": "launch",
      "program": "-m",
      "args": ["langflow", "run", "--backend-only"],
      "console": "integratedTerminal",
      "cwd": "${workspaceFolder}/src/backend/base",
      "envFile": "${workspaceFolder}/.env"
    }
  ]
}

性能分析:

import cProfile
import pstats

def profile_function():
    pr = cProfile.Profile()
    pr.enable()
    
    # 你想分析的代码
    run_graph(...)
    
    pr.disable()
    stats = pstats.Stats(pr)
    stats.sort_stats('cumulative')
    stats.print_stats(20)  # 显示前 20 个最耗时的函数

14.2 前端调试

React DevTools:

  • 安装浏览器扩展
  • 查看 Component 树和 Props
  • 分析 Re-renders

Redux DevTools(Zustand):

  • Zustand 默认支持 Redux DevTools
  • 可以查看 State 变更历史

网络请求调试:

// 添加 Axios 拦截器
axios.interceptors.request.use(config => {
  console.log('API Request:', config.url, config.data);
  return config;
});

axios.interceptors.response.use(response => {
  console.log('API Response:', response.status, response.data);
  return response;
});

14.3 图执行调试

启用详细执行日志:

# 环境变量
LANGFLOW_GRAPH_DEBUG=true

# 代码中
graph.debug = True
await run_graph(graph, ...)  # 会打印每个节点的执行过程

单步执行:

async def debug_run_graph(graph: Graph):
    """手动逐步执行图中的节点"""
    order = graph.topological_sort()
    
    for i, vertex_id in enumerate(order):
        vertex = graph.get_vertex(vertex_id)
        print(f"[{i+1}/{len(order)}] Executing: {vertex.data.name}")
        
        # 手动检查输入
        inputs = collect_inputs(vertex, graph)
        print(f"  Inputs: {inputs}")
        
        # 执行
        result = await vertex.build(**inputs)
        print(f"  Output: {result}")
        
        # 手动继续?
        input("Press Enter to continue...")

15. 常见问题

Q1: 如何解决端口占用问题?

# 查找占用端口的进程
netstat -ano | findstr :7860

# 终止进程
taskkill /PID <PID> /F

# 或指定其他端口启动
langflow run --port 8080

Q2: 数据库迁移失败怎么办?

# 回滚到上一个版本
alembic downgrade -1

# 标记迁移为已完成(谨慎使用)
alembic stamp head

# 查看当前版本
alembic current

# 查看迁移历史
alembic history

Q3: 自定义组件不显示?

检查清单:

  1. 组件类是否继承正确的基类
  2. 是否实现了 build() 方法
  3. 是否正确注册到组件注册表
  4. CUSTOM_COMPONENTS_DIR 环境变量是否设置正确
  5. 重启服务后是否生效
# 调试:查看已注册组件
from lfx.components.registry import get_all_components
components = get_all_components()
print([c.name for c in components])

Q4: 性能优化建议?

后端优化:

  1. 使用 Redis 替代内存缓存
  2. 启用数据库连接池
  3. 使用 Celery 处理耗时任务
  4. 添加适当的索引

前端优化:

  1. 启用 React.memo 和 useMemo
  2. 使用虚拟滚动(react-window)
  3. 代码分割(React.lazy)
  4. 图片懒加载

数据库优化:

-- 为常用查询添加索引
CREATE INDEX idx_flow_user_id ON flow(user_id);
CREATE INDEX idx_flow_folder_id ON flow(folder_id);
CREATE INDEX idx_message_session_id ON message(session_id);

-- 定期清理旧数据
DELETE FROM message WHERE created_at < NOW() - INTERVAL '90 days';
ANALYZE;  -- 更新统计信息

Q5: 如何参与贡献?

  1. Fork 仓库
  2. 创建特性分支 (git checkout -b feature/amazing-feature)
  3. 提交更改 (git commit -m 'Add amazing feature')
  4. 推送到分支 (git push origin feature/amazing-feature)
  5. 创建 Pull Request

代码规范:

  • 使用 ruff 进行代码格式化和 linting
  • 运行测试确保不破坏现有功能
  • 添加适当的单元测试
  • 更新相关文档
# 开发前检查
make lint
make test
make format

📚 学习资源

官方资源

推荐阅读顺序

  1. 入门阶段:

    • 阅读 README.md 了解项目概况
    • 完成 Quickstart 教程
    • 熟悉基本组件的使用
  2. 进阶阶段:

  3. 高级阶段:

    • 深入研究图引擎 (src/lfx/graph/)
    • 学习如何创建自定义组件
    • 阅读 API 实现了解 RESTful 设计
  4. 贡献者阶段:

    • 阅读 CONTRIBUTING.md
    • 研究现有 PR 和 Issue
    • 尝试修复 Bug 或添加小功能

🎯 总结

Langflow 是一个架构清晰、设计精良的开源 AI 工作流平台。通过本教程的学习,你应该已经掌握了:

整体架构: 三层分离设计(langflow → langflow-base → lfx)
启动流程: 从 CLI 到 FastAPI 应用的完整链路
服务层: 服务定位器模式和依赖注入
API 设计: RESTful API 和 OpenAI 兼容接口
图引擎: DAG 数据结构和执行算法
组件系统: 如何创建和使用组件
前端架构: React + Zustand + React Flow
高级特性: Agent 和 MCP 支持

下一步行动建议:

  1. 动手实践:尝试创建一个简单的自定义组件
  2. 阅读源码:选择你感兴趣的部分深入阅读
  3. 参与社区:加入 Discord 讨论,提交 Issue 或 PR
  4. 构建应用:用 Langflow 构建一个实际的 AI 应用

祝你学习愉快!🚀

如果本教程对你有帮助,欢迎给 Langflow 项目 ⭐ Star!

0

评论区