📚 本教程基于 Langflow v1.10.x 源码,帮助你从零开始理解这个强大的 AI 工作流平台的核心架构和实现原理。
1. 项目概述与架构总览
1.1 什么是 Langflow?
Langflow 是一个开源的、基于 Python 的可视化 AI 工作流构建平台。它允许开发者通过拖拽组件的方式快速构建 AI 应用,同时提供完整的 API 接口供外部调用。
核心特性:
- 🎨 可视化流程编辑器(基于 React Flow)
- 🔧 丰富的组件库(LLM、向量数据库、工具等)
- 🚀 一键部署为 API 或 MCP Server
- 🤖 多智能体编排能力
- 📊 可观测性集成(LangSmith、LangFuse 等)
1.2 三层架构设计
┌─────────────────────────────────────────────────┐
│ langflow (分发层) │
│ CLI、配置管理、集成打包 │
├─────────────────────────────────────────────────┤
│ langflow-base (平台层) │
│ API、认证、持久化、多用户、数据库 │
├─────────────────────────────────────────────────┤
│ lfx (运行时层) │
│ 组件系统、图引擎、执行器 │
└─────────────────────────────────────────────────┘
依赖方向: langflow → langflow-base → lfx(单向依赖)
1.3 技术栈概览
| 层级 | 技术选型 |
|---|---|
| 后端框架 | FastAPI + Pydantic v2 |
| 数据库 | PostgreSQL / SQLite + SQLModel (SQLAlchemy) |
| 缓存 | Redis / 内存缓存 |
| 任务队列 | Celery / AnyIO |
| 前端框架 | React 18 + TypeScript |
| UI 库 | MUI (Material-UI), React Flow |
| 状态管理 | Zustand |
| 构建工具 | Vite |
| 包管理 | uv (推荐) |
| ORM 迁移 | Alembic |
2. 环境搭建与开发准备
2.1 前置要求
# Python 版本要求
Python >= 3.10, < 3.15
# 安装 uv 包管理器(推荐)
# Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
# macOS/Linux:
curl -LsSf https://astral.sh/uv/install.sh | sh
# Node.js >= 18 (前端开发需要)
node --version
2.2 从源码安装
# 克隆仓库
git clone https://github.com/langflow-ai/langflow.git
cd langflow
# 使用 uv 创建虚拟环境并安装依赖
uv sync
# 或者使用 make 命令(推荐用于开发)
make run_cli
2.3 开发模式启动
方式一:完整启动(前后端一体)
uv run langflow run
# 访问 http://localhost:7860
方式二:分离模式(推荐开发时使用)
# 终端 1: 启动后端
cd src/backend/base
uv run python -m langflow.__main__ run --backend-only
# 终端 2: 启动前端
cd src/frontend
npm install
npm run dev
# 访问 http://localhost:3000(前端会代理到后端 7860)
2.4 开发工具配置
VSCode 推荐扩展:
- Python (Microsoft)
- Pylance (Microsoft)
- ESLint
- TypeScript Importer
- Thunder Client (API 测试)
.vscode/settings.json 配置示例:
{
"python.defaultInterpreterPath": ".venv\\Scripts\\python.exe",
"python.formatting.provider": "ruff",
"editor.formatOnSave": true,
"[python]": {
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
}
}
}
3. 项目目录结构详解
langflow/
├── src/ # 源代码根目录
│ ├── backend/ # 后端代码
│ │ └── base/ # langflow-base 核心包
│ │ └── langflow/
│ │ ├── __main__.py # CLI 入口点
│ │ ├── main.py # FastAPI 应用创建与配置
│ │ ├── api/ # API 路由层
│ │ │ ├── v1/ # V1 版本 API
│ │ │ └── v2/ # V2 版本 API
│ │ ├── agentic/ # Agent 相关功能
│ │ │ ├── services/ # Agent 服务实现
│ │ │ ├── mcp/ # MCP Server 实现
│ │ │ └── flows/ # 预置 Agent 流程
│ │ ├── services/ # 业务服务层 ⭐核心
│ │ │ ├── auth/ # 认证服务
│ │ │ ├── database/ # 数据库服务
│ │ │ ├── cache/ # 缓存服务
│ │ │ ├── storage/ # 存储服务(本地/S3)
│ │ │ ├── chat/ # 聊天服务
│ │ │ ├── flow/ # 流执行服务
│ │ │ ├── store/ # 组件商店
│ │ │ └── ... # 其他服务
│ │ ├── processing/ # 图处理逻辑
│ │ ├── helpers/ # 辅助函数
│ │ └── alembic/ # 数据库迁移
│ │
│ ├── lfx/ # LFX 运行时 ⭐核心
│ │ └── src/lfx/
│ │ ├── base/ # 基础类和接口
│ │ │ ├── agents/ # Agent 基类
│ │ │ ├── models/ # 模型统一接口
│ │ │ ├── tools/ # 工具基类
│ │ │ ├── vectorstores/ # 向量存储基类
│ │ │ └── embeddings/ # 嵌入模型基类
│ │ ├── components/ # 具体组件实现 ⭐
│ │ │ ├── openai/ # OpenAI 组件
│ │ │ ├── anthropic/ # Anthropic 组件
│ │ │ ├── chroma/ # ChromaDB 组件
│ │ │ └── ... # 更多组件
│ │ ├── graph/ # 图引擎 ⭐
│ │ │ ├── graph.py # Graph 类定义
│ │ │ ├── vertex.py # 节点定义
│ │ │ └── edge.py # 边定义
│ │ └── cli/ # CLI 命令
│ │
│ ├── frontend/ # 前端代码
│ │ └── src/
│ │ ├── components/ # React 组件
│ │ ├── pages/ # 页面组件
│ │ ├── hooks/ # 自定义 Hooks
│ │ ├── store/ # Zustand 状态管理
│ │ ├── types/ # TypeScript 类型定义
│ │ ├── utils/ # 工具函数
│ │ └── api/ # API 调用封装
│ │
│ └── sdk/ # Python SDK 客户端
│ └── src/langflow_sdk/
│ ├── client.py # 同步客户端
│ ├── _async_client.py # 异步客户端
│ └── models.py # 数据模型
│
├── docs/ # 文档
├── docker/ # Docker 配置
├── tests/ # 测试代码
├── pyproject.toml # 项目配置
└── Makefile # 常用命令快捷方式
4. 核心架构分层
4.1 分层职责说明
第一层:LFX(运行时 Runtime)
位置: src/lfx/
职责:
- 组件注册与管理
- 图(DAG)数据结构
- 图执行引擎
- 各类 AI 组件的具体实现
核心概念:
# Graph - 有向无环图,代表一个工作流
class Graph:
vertices: List[Vertex] # 节点列表
edges: List[Edge] # 边列表
# Vertex - 图中的节点,对应一个组件实例
class Vertex:
id: str
data: ComponentData
is_input: bool # 是否是输入节点
is_output: bool # 是否是输出节点
# Edge - 连接两个节点的边
class Edge:
source_id: str # 源节点 ID
target_id: str # 目标节点 ID
source_handle: str # 源输出端口
target_handle: str # 目标输入端口
第二层:Langflow-base(平台 Platform)
位置: src/backend/base/langflow/
职责:
- HTTP API 接口(FastAPI)
- 用户认证与授权
- 数据持久化(数据库)
- 文件存储管理
- 会话管理
- 缓存策略
- 多租户支持
第三层:Langflow(分发 Distribution)
位置: 根目录 pyproject.toml
职责:
- CLI 命令行工具
- 配置文件解析
- 依赖整合
- 打包发布
4.2 关键设计原则
根据 PHILOSOPHY.md:
- 视觉优先: 所有功能都应体现在可视化编辑器中
- 单向依赖: 严格遵守
langflow → langflow-base → lfx - 组件即代码: 每个组件都可以查看和修改其 Python 源码
- 配置外置: 不使用隐藏的全局配置标志
5. 启动流程分析
5.1 完整启动链路
用户执行: uv run langflow run
↓
[1] __main__.py 入口
- macOS fork-safety 处理
- 初始化 Typer CLI 应用
- 注册子命令(run, serve 等)
↓
[2] 执行 run 命令
- 加载 .env 环境变量
- 解析命令行参数
- 启动 ProcessManager
↓
[3] setup_app() 在 main.py 中
- 配置日志系统
- 初始化 Sentry(可选)
- initialize_services() ← 关键步骤
- 创建 FastAPI 应用实例
- 注册中间件
- 注册路由
- 挂载静态文件(前端)
↓
[4] Uvicorn 启动服务器
- 监听端口(默认 7860)
- 处理 HTTP 请求
5.2 核心入口代码解读
文件: __main__.py
# 第 1-20 行:macOS 安全处理
if __name__ == "__main__":
if _platform.system() == "Darwin":
# 解决 Gunicorn 在 macOS 上的 fork-safety 问题
_os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
_os.execv(_sys.executable, [_sys.executable, "-m", "langflow.__main__", *_sys.argv[1:]])
# 第 50-80 行:CLI 应用定义
app = typer.Typer(no_args_is_help=True)
# 注册 LFX 子命令
from lfx.cli.commands import serve_command
app.add_typer(lfx_app, name="lfx")
文件: main.py - setup_app() 函数
async def setup_app():
# 1. 配置日志
configure()
# 2. 初始化遥测(可选)
if settings.sentry_dsn:
init_sentry(settings.sentry_dsn)
# 3. 初始化所有服务 ⭐ 最重要
await initialize_services(fix_migration=fix_migration)
# 4. 创建 FastAPI 应用
application = FastAPI(
title="Langflow",
version=get_version(),
lifespan=lifespan,
)
# 5. 注册 CORS 中间件
application.add_middleware(CORSMiddleware, ...)
# 6. 注册 API 路由
include_routers(application)
# 7. 挂载前端静态文件(非 backend-only 模式)
if not settings.backend_only:
mount_static_files(application)
return application
5.3 服务初始化详解
async def initialize_services(fix_migration=False):
"""按顺序初始化所有服务"""
# 1. 设置服务(必须最先初始化)
initialize_settings_service()
# 2. 缓存服务(依赖设置服务)
initialize_cache_service()
# 3. 会话服务(依赖缓存服务)
initialize_session_service()
# 4. 数据库服务(执行迁移)
await initialize_database_service(fix_migration=fix_migration)
# 5. 存储服务
initialize_storage_service()
# 6. 认证服务
initialize_auth_service()
# 7. 其他业务服务...
initialize_chat_service()
initialize_variable_service()
initialize_store_service()
# ...
服务注册机制:
# 使用工厂模式注册服务
def get_service(service_type: ServiceType, factory: ServiceFactory):
"""获取或创建服务实例"""
service_manager.register(service_type, factory)
def get_db_service() -> DatabaseService:
"""便捷方法:获取数据库服务"""
return get_service(ServiceType.DATABASE_SERVICE)
6. 服务层深度解析
6.1 服务架构概览
Langflow 采用服务定位器模式(Service Locator Pattern),所有服务通过 ServiceManager 统一管理。
ServiceManager
├── SettingsService # 配置管理
├── DatabaseService # 数据库操作
├── CacheService # 缓存(Redis/内存)
├── SessionService # 用户会话
├── AuthService # JWT/API Key 认证
├── StorageService # 文件存储(本地/S3)
├── ChatService # 聊天历史管理
├── VariableService # 全局变量
├── StoreService # 组件商店
├── TaskService # 异步任务
├── TelemetryService # 遥测收集
└── TracingService # 分布式追踪
6.2 核心服务详解
6.2.1 数据库服务 (DatabaseService)
文件位置: services/database/
特点:
- 使用 SQLModel(SQLAlchemy + Pydantic)
- 支持 PostgreSQL 和 SQLite
- 自动迁移(Alembic)
核心模型:
# services/database/models/flow/model.py
class Flow(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
name: str
data: dict # JSON 格式的流程数据
user_id: Optional[str]
folder_id: Optional[str]
created_at: datetime
updated_at: datetime
# services/database/models/user/model.py
class User(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
username: str
password: str # 加密存储
is_active: bool = True
is_superuser: bool = False
# services/database/models/api_key/model.py
class ApiKey(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
user_id: UUID
name: str
api_key: str # hash 存储
created_at: datetime
expires_at: Optional[datetime]
使用示例:
async with session_scope() as session:
# 创建流程
flow = Flow(name="My Flow", data={...}, user_id=user.id)
session.add(flow)
await session.commit()
# 查询流程
result = await session.exec(select(Flow).where(Flow.user_id == user.id))
flows = result.all()
6.2.2 认证服务 (AuthService)
文件位置: services/auth/
支持的认证方式:
- 用户名/密码登录 → 返回 JWT Token
- API Key 认证 → 用于 API 调用
- SSO 单点登录(企业版)
认证流程:
# 登录获取 Token
POST /api/v1/login
{
"username": "admin",
"password": "password"
}
→ Response: { "access_token": "jwt_token...", "token_type": "bearer" }
# 使用 Token 访问 API
GET /api/v1/flows
Headers: { "Authorization": "Bearer jwt_token..." }
# 或使用 API Key
GET /api/v1/flows
Headers: { "x-api-key": "sk_..." }
JWT 配置:
# services/auth/constants.py
ACCESS_TOKEN_EXPIRE_MINUTES = 1440 # 24 小时
ALGORITHM = "HS256"
SECRET_KEY = os.getenv("LANGFLOW_SECRET_KEY", "your-secret-key")
6.2.3 存储服务 (StorageService)
文件位置: services/storage/
支持的存储后端:
- LocalStorageService: 本地文件系统
- S3StorageService: AWS S3 / MinIO / 兼容对象存储
接口定义:
class StorageService(ABC):
async def save(self, path: str, data: bytes) -> None: ...
async def load(self, path: str) -> bytes: ...
async def delete(self, path: str) -> None: ...
async def list_files(self, folder: str) -> List[str]: ...
async def exists(self, path: str) -> bool: ...
切换存储后端:
# 环境变量配置
LANGFLOW_STORAGE_TYPE=s3 # 或 local
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
S3_BUCKET_NAME=langflow-files
S3_ENDPOINT_URL=https://minio.example.com
6.2.4 缓存服务 (CacheService)
用途:
- 流程执行状态缓存
- 会话数据缓存
- 组件结果缓存
实现:
class ThreadingInMemoryCache(AsyncBaseCacheService):
"""线程安全的内存缓存(默认)"""
class RedisCache(AsyncBaseCacheService):
"""Redis 缓存(生产环境推荐)"""
6.3 服务依赖注入
Langflow 使用 FastAPI 的依赖注入系统:
# api/v1/flows.py
@router.get("/flows")
async def get_flows(
current_user: User = Depends(get_current_user), # 认证依赖
db_service: DatabaseService = Depends(get_db_service), # 服务依赖
):
flows = await db_service.list_flows(user_id=current_user.id)
return flows
自定义依赖示例:
# services/deps.py
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db_service: DatabaseService = Depends(get_db_service),
) -> User:
"""验证当前用户"""
token = credentials.credentials
payload = jwt_decode(token)
user = await db_service.get_user(payload.sub)
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="Invalid user")
return user
7. API 路由层
7.1 API 结构概览
/api/v1/ # V1 API(稳定版)
├── /login # 用户登录
├── /users # 用户管理
├── /flows # 流程 CRUD
├── /flows/{id}/run # 执行流程
├── /flows/{id}/build # 构建流程图
├── /files # 文件上传下载
├── /projects # 项目管理
├── /variables # 全局变量
├── /api-keys # API 密钥管理
├── /store # 组件商店
├── /monitor # 监控端点
├── /mcp # MCP 管理
└── /all # OpenAI 兼容接口
/api/v2/ # V2 API(新版)
└── /files # 新版文件 API
7.2 核心 API 示例
流程 CRUD 操作
# api/v1/flows.py
@router.post("/flows")
async def create_flow(
flow_data: FlowCreateRequest,
current_user: User = Depends(get_current_user),
):
"""创建新流程"""
flow = Flow(
name=flow_data.name,
data=flow_data.data,
user_id=current_user.id,
)
db_service = get_db_service()
await db_service.create_flow(flow)
return {"id": flow.id, "name": flow.name}
@router.get("/flows/{flow_id}")
async def get_flow(flow_id: UUID):
"""获取流程详情"""
db_service = get_db_service()
flow = await db_service.get_flow(flow_id)
if not flow:
raise HTTPException(404, "Flow not found")
return flow
@router.put("/flows/{flow_id}")
async def update_flow(
flow_id: UUID,
flow_update: FlowUpdateRequest,
):
"""更新流程"""
db_service = get_db_service()
await db_service.update_flow(flow_id, flow_update.dict())
return {"status": "updated"}
流程执行 API
# api/v1/chat.py - Playground 执行
@router.post("/build/{flow_id}/flow")
async def build_and_run_flow(
flow_id: UUID,
request: ChatRequest,
):
"""
构建 Flow 并执行
这是 Playground 功能的核心 API
"""
# 1. 从数据库加载流程数据
flow = await get_flow_from_db(flow_id)
# 2. 创建 Graph 对象
graph = Graph.from_payload(flow.data)
# 3. 处理 tweaks(参数覆盖)
process_tweaks(graph, request.tweaks)
# 4. 执行图
result = await run_graph(
graph=graph,
input_value=request.input_value,
input_type=request.input_type,
output_type=request.output_type,
stream=request.stream,
)
return result
OpenAI 兼容接口
# api/v1/openai_responses.py
@router.post("/responses")
async def openai_compatible_chat(request: OpenAIRequest):
"""
OpenAI Responses API 兼容接口
允许使用 OpenAI SDK 直接调用 Langflow
"""
# 将 OpenAI 格式转换为内部格式
flow_request = convert_openai_to_langflow(request)
# 执行流程
response = await execute_flow(flow_request)
# 将响应转换回 OpenAI 格式
return convert_to_openai_response(response)
使用示例(Python):
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:7860/api/v1/all",
api_key="your-langflow-api-key",
)
response = client.responses.create(
model="your-flow-id", # 使用 Flow ID 作为模型名
input="Hello!",
)
print(response.output_text)
7.3 WebSocket 支持
# 实时日志流
@router.websocket("/ws/logs/{flow_id}")
async def websocket_logs(websocket: WebSocket, flow_id: UUID):
"""实时推送流程执行日志"""
await websocket.accept()
async for log in stream_logs(flow_id):
await websocket.send_json(log)
8. 图引擎与流执行
8.1 Graph 数据结构
文件位置: src/lfx/src/lfx/graph/graph/graph.py
class Graph:
"""有向无环图(DAG),表示一个工作流"""
def __init__(self):
self.vertices: List[Vertex] = [] # 节点集合
self.edges: List[Edge] = [] # 边集合
self.session_id: str = "" # 会话 ID
self.flow_id: str = "" # 流程 ID
self.user_id: Optional[str] = None
@classmethod
def from_payload(cls, payload: dict) -> 'Graph':
"""从前端发送的 JSON 数据创建 Graph 对象"""
graph = cls()
# 解析节点
for node_data in payload['data']['nodes']:
vertex = Vertex.from_dict(node_data)
graph.vertices.append(vertex)
# 解析边
for edge_data in payload['data']['edges']:
edge = Edge.from_dict(edge_data)
graph.edges.append(edge)
return graph
def topological_sort(self) -> List[str]:
"""拓扑排序,确定节点执行顺序"""
# Kahn's algorithm 实现
in_degree = {v.id: 0 for v in self.vertices}
adj = defaultdict(list)
for edge in self.edges:
adj[edge.source_id].append(edge.target_id)
in_degree[edge.target_id] += 1
queue = deque([v for v in self.vertices if in_degree[v.id] == 0])
order = []
while queue:
vertex = queue.popleft()
order.append(vertex.id)
for neighbor in adj[vertex.id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return order
8.2 Vertex(节点)结构
class Vertex:
"""图中的顶点,对应一个组件实例"""
def __init__(self):
self.id: str = ""
self.vertex_type: ComponentEnum = ComponentEnum.COMPONENT
self.base_type: BaseTypeEnum = BaseTypeEnum.COMPONENT
self.data: VertexData = VertexData()
self.is_input: bool = False
self.is_output: bool = False
self._built_object: Any = None # 组件实例缓存
self._params: Dict = {} # 参数缓存
async def build(self, *args, **kwargs) -> Any:
"""构建组件实例(懒加载)"""
if self._built_object is None:
component_class = get_component_class(self.data.type)
params = self.get_params()
self._built_object = component_class(**params)
return self._built_object
def get_params(self) -> Dict:
"""获取组件参数,处理字段类型转换"""
param_handler = ParameterHandler(self, get_storage_service())
field_params, load_from_db_fields = param_handler.process_field_parameters()
return {**field_params, **load_from_db_fields}
8.3 Edge(边)结构
class Edge:
"""连接两个节点的边"""
def __init__(self):
self.source_id: str = "" # 源节点 ID
self.target_id: str = "" # 目标节点 ID
self.source_handle: str = "" # 源输出端口名称
self.target_handle: str = "" # 目标输入端口名称
self.edge_type: EdgeTypeEnum = EdgeTypeEnum.ACYCLIC
def validate(self, source_vertex: Vertex, target_vertex: Vertex) -> bool:
"""验证边的合法性(类型检查)"""
output_field = source_vertex.get_output_field(self.source_handle)
input_field = target_vertex.get_input_field(self.target_handle)
# 检查类型兼容性
return is_type_compatible(output_field.type, input_field.type)
8.4 流程执行引擎
文件位置: src/lfx/src/lfx/graph/graph/utils.py 或 services/flow/flow_runner.py
async def run_graph(
graph: Graph,
input_value: str,
input_type: str = "chat",
output_type: str = "chat",
stream: bool = False,
) -> Dict:
"""
执行工作流图的主函数
执行流程:
1. 拓扑排序确定执行顺序
2. 按顺序执行每个节点
3. 通过边传递数据
4. 收集最终输出
"""
execution_order = graph.topological_sort()
results = {}
for vertex_id in execution_order:
vertex = graph.get_vertex(vertex_id)
# 准备输入数据
inputs = collect_inputs_from_edges(vertex, graph.edges, results)
# 如果是输入节点,注入用户输入
if vertex.is_input:
inputs['input_value'] = input_value
inputs['input_type'] = input_type
# 构建并执行组件
try:
built_obj = await vertex.build(**inputs)
result = await execute_component(built_obj, vertex)
results[vertex_id] = result
except Exception as e:
handle_execution_error(e, vertex)
# 收集输出
final_output = collect_output(results, graph, output_type)
return {
"result": final_output,
"session_id": graph.session_id,
"stream": stream,
}
async def execute_component(component: Any, vertex: Vertex) -> Any:
"""
执行单个组件
支持的执行模式:
1. 同步调用:component.run()
2. 异步调用:await component.arun()
3. 生成器:for chunk in component.stream():
"""
if hasattr(component, 'arun'):
return await component.arun(**vertex.params)
elif hasattr(component, 'run'):
return component.run(**vertex.params)
else:
# 默认返回组件本身(用于中间件)
return component
8.5 Tweaks(参数覆盖)机制
def process_tweaks(graph: Graph, tweaks: Dict[str, Dict]) -> None:
"""
处理参数覆盖
tweks 格式示例:
{
"ChatInput-abc123": {
"input_value": "Hello World"
},
"OpenAIModel-def456": {
"temperature": 0.7,
"max_tokens": 1000
}
}
"""
for vertex_id, params in tweaks.items():
vertex = graph.get_vertex(vertex_id)
if vertex:
vertex.update_params(params)
9. 组件系统
9.1 组件架构
Component (抽象基类)
├── InputComponent # 输入组件(ChatInput, TextInput)
├── OutputComponent # 输出组件(ChatOutput, TextOutput)
├── LLMComponent # 大语言模型组件
├── ToolComponent # 工具组件
├── VectorStoreComponent # 向量存储组件
├── EmbeddingComponent # 嵌入组件
├── MemoryComponent # 记忆组件
├── RetrieverComponent # 检索组件
└── CustomComponent # 自定义组件
9.2 组件基类
文件位置: src/lfx/src/lfx/components/base/component.py(假设路径)
class Component(BaseModel):
"""所有组件的基类"""
# 组件元数据
name: str = "Component"
description: str = "Base component"
icon: str = "default"
display_name: Optional[str] = None
# 输入输出定义
inputs: ClassVar[List[Field]] = []
outputs: ClassVar[List[Field]] = []
# 配置选项
_tool_mode_config: ClassVar[Dict] = {}
def build_config(self) -> Dict[str, Any]:
"""构建组件配置 schema(用于前端渲染表单)"""
config = {}
for field in self.inputs:
config[field.name] = {
"display_name": field.display_name or field.name,
"type": field.type,
"default": field.default,
"required": field.required,
"options": field.options,
"show": field.show,
"advanced": field.advanced,
"dynamic": field.dynamic,
"placeholder": field.placeholder,
}
return config
def build(self, **kwargs) -> Any:
"""
构建组件实例
子类必须实现此方法
"""
raise NotImplementedError("Subclasses must implement build()")
async def abuild(self, **kwargs) -> Any:
"""异步构建(可选重写)"""
return self.build(**kwargs)
9.3 实战:创建自定义组件
示例:翻译组件
# my_components/translator.py
from lfx.base import Component
from pydantic import Field
from typing import Optional
class TranslatorComponent(Component):
"""翻译组件 - 将文本从一种语言翻译成另一种语言"""
# 元数据
name = "Translator"
description = "Translate text between languages"
icon = "Language"
# 输入参数
input_text: str = Field(
default="",
display_name="Input Text",
description="Text to translate",
)
source_language: str = Field(
default="en",
display_name="Source Language",
description="Source language code (e.g., 'en', 'zh', 'ja')",
)
target_language: str = Field(
default="zh",
display_name="Target Language",
description="Target language code",
)
llm: Any = Field(
default=None,
display_name="Language Model",
description="LLM to use for translation",
)
# 输出定义
outputs = [
Field(name="translated_text", type="str"),
Field(name="source_language", type="str"),
Field(name="target_language", type="str"),
]
def build_config(self):
config = super().build_config()
config["source_language"]["options"] = [
{"label": "English", "value": "en"},
{"label": "Chinese", "value": "zh"},
{"label": "Japanese", "value": "ja"},
{"label": "Korean", "value": "ko"},
]
config["target_language"]["options"] = config["source_language"]["options"]
return config
def build(self, **kwargs):
"""构建翻译器"""
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = PromptTemplate.from_template(
"Translate the following text from {source} to {target}. "
"Only output the translation, no explanations:\n\n{text}"
)
chain = prompt | self.llm | StrOutputParser()
return chain
async def arun(self, **kwargs):
"""执行翻译"""
chain = self.build(**kwargs)
result = await chain.ainvoke({
"text": self.input_text,
"source": self.source_language,
"target": self.target_language,
})
return {
"translated_text": result,
"source_language": self.source_language,
"target_language": self.target_language,
}
注册自定义组件:
# 在 langflow 配置或初始化时
from my_components.translator import TranslatorComponent
# 方式一:直接注册到组件注册表
from lfx.components import registry
registry.register_component(TranslatorComponent)
# 方式二:通过 CUSTOM_COMPONENTS_DIR 环境变量
# LANGFLOW_CUSTOM_COMPONENTS=/path/to/my_components
9.4 组件分类一览
| 分类 | 目录 | 示例组件 |
|---|---|---|
| 输入/输出 | components/io/ |
ChatInput, ChatOutput, TextInput, TextOutput |
| 大模型 | components/openai/, components/anthic/ |
OpenAI, Anthropic, Ollama, Groq |
| 向量存储 | components/chroma/, components/faiss/ |
Chroma, FAISS, Pinecone, QDrant |
| 嵌入模型 | components/embeddings/ |
OpenAIEmbeddings, HuggingFaceEmbeddings |
| 工具 | components/tools/ |
PythonREPL, Calculator, WebSearch |
| Agent | base/agents/ |
ReActAgent, PlanAndExecuteAgent |
| 记忆 | base/memory/ |
BufferMemory, SummaryMemory, ConversationBuffer |
| 检索 | - | VectorStoreRetriever, ContextualCompressionRetriever |
| 文档处理 | base/document_transformers/ |
TextSplitter, RecursiveCharacterTextSplitter |
10. 前端架构
10.1 技术栈
- React 18 + TypeScript (严格模式)
- Vite 构建工具
- MUI (Material-UI) UI 组件库
- React Flow 流程图编辑器
- Zustand 状态管理
- TanStack Query v5 数据请求
- i18next 国际化
10.2 目录结构
frontend/src/
├── components/ # 公共组件
│ ├── inputs/ # 输入组件(ChatInput, TextInput)
│ ├── outputs/ # 输出组件
│ ├── nodes/ # 节点编辑器组件
│ └── shared/ # 共享组件
├── pages/ # 页面组件
│ ├── FlowCanvas/ # 流程画布页面 ⭐
│ ├── Projects/ # 项目列表页
│ └── Settings/ # 设置页
├── hooks/ # 自定义 Hooks
│ ├── useFlow.ts # 流程操作
│ ├── useGraph.ts # 图操作
│ └── useApi.ts # API 调用
├── store/ # Zustand Store
│ ├── flowStore.ts # 流程状态
│ ├── uiStore.ts # UI 状态
│ └── authStore.ts # 认证状态
├── types/ # 类型定义
│ ├── flow.ts # 流程类型
│ ├── api.ts # API 类型
│ └── components.ts # 组件类型
├── utils/ # 工具函数
│ ├── reactflowUtils.ts # React Flow 工具
│ └── mustacheUtils.ts # 模板处理
├── api/ # API 封装
│ ├── base.ts # Axios 实例
│ ├── flows.ts # 流程 API
│ └── files.ts # 文件 API
└── constants/ # 常量定义
10.3 核心组件:FlowCanvas(流程画布)
这是最复杂也是最重要的组件,负责可视化编辑流程。
主要功能:
- 节点的拖拽与放置
- 节点间的连线
- 参数编辑面板
- Playground 测试区域
- 导入/导出功能
状态管理示例(Zustand):
// store/flowStore.ts
import { create } from 'zustand';
interface FlowState {
// 流程数据
nodes: Node[];
edges: Edge[];
// 当前选中
selectedNodeId: string | null;
// Actions
addNode: (node: Node) => void;
removeNode: (nodeId: string) => void;
updateNodeData: (nodeId: string, data: Partial<NodeData>) => void;
connectNodes: (sourceId: string, targetId: string, sourceHandle?: string, targetHandle?: string) => void;
}
export const useFlowStore = create<FlowState>((set, get) => ({
nodes: [],
edges: [],
selectedNodeId: null,
addNode: (node) => set((state) => ({
nodes: [...state.nodes, node],
})),
removeNode: (nodeId) => set((state) => ({
nodes: state.nodes.filter(n => n.id !== nodeId),
edges: state.edges.filter(e => e.source !== nodeId && e.target !== nodeId),
})),
connectNodes: (sourceId, targetId, sourceHandle, targetHandle) => set((state) => ({
edges: [...state.edges, {
id: `${sourceId}-${targetId}`,
source: sourceId,
target: targetId,
sourceHandle: sourceHandle || 'output',
targetHandle: targetHandle || 'input',
}],
})),
}));
10.4 API 调用封装
// api/flows.ts
import axios from './base';
import type { Flow, FlowCreate, FlowUpdate } from '../types';
export const flowsApi = {
// 获取流程列表
list: () =>
axios.get<Flow[]>('/api/v1/flows'),
// 获取单个流程
get: (id: string) =>
axios.get<Flow>(`/api/v1/flows/${id}`),
// 创建流程
create: (data: FlowCreate) =>
axios.post<Flow>('/api/v1/flows', data),
// 更新流程
update: (id: string, data: FlowUpdate) =>
axios.put<Flow>(`/api/v1/flows/${id}`, data),
// 删除流程
delete: (id: string) =>
axios.delete(`/api/v1/flows/${id}`),
// 执行流程
run: (id: string, data: RunRequest) =>
axios.post(`/api/v1/run/${id}`, data, {
responseType: 'stream', // 支持流式响应
}),
};
10.5 前端开发技巧
热更新开发:
cd src/frontend
npm run dev
# 修改代码后会自动刷新浏览器
组件调试:
// 使用 React DevTools 查看 Zustand 状态
// 安装 Redux DevTools 扩展(Zustand 支持)
// 在代码中临时打印状态
console.log('Current flow:', useFlowStore.getState());
性能优化建议:
- 使用
React.memo包装大型组件 - 对于频繁更新的数据使用
useMemo和useCallback - 图操作使用虚拟化(只渲染可视区域的节点)
- 防抖保存(避免每次修改都触发 API 调用)
11. Agent 与 MCP 支持
11.1 Agent 系统
位置: src/backend/base/langflow/agentic/
Langflow 内置了完整的 Agent 编排系统:
# agentic/services/assistant_service.py
class AssistantService:
"""AI 助手服务 - 管理对话式 Agent"""
async def chat(self, message: str, session_id: str, flow_id: str):
"""
处理聊天消息
流程:
1. 加载历史消息
2. 分类用户意图(Intent Classification)
3. 选择合适的处理流程
4. 执行流程并返回结果
5. 保存对话记录
"""
# 加载会话上下文
context = await self.conversation_buffer.get(session_id)
# 意图识别
intent = await self.classify_intent(message, context)
if intent == "flow_edit":
# 编辑流程
return await self.handle_flow_edit(message, flow_id)
elif intent == "general_question":
# 通用问答
return await self.handle_qa(message)
else:
# 默认:执行当前流程
return await self.execute_flow(flow_id, message)
预置 Agent 流程:
- LangflowAssistant: 帮助用户构建和编辑流程
- TranslationFlow: 翻译助手
- SystemMessageGen: 系统消息生成器
12. MCP (Model Context Protocol) 支持
12.1 MCP Server 模式
将 Langflow 流程暴露为 MCP 工具,供其他 AI 应用调用。
启用方式:
# 方法一:CLI 启动
langflow mcp start --project-id your-project-id
# 方法二:在项目中配置
# 项目设置 → MCP Servers → 启用
MCP Server 实现:
# agentic/mcp/server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
class LangflowMCPServer:
"""Langflow MCP Server 实现"""
def __init__(self):
self.server = Server("langflow-mcp-server")
self.server.list_tools()(self.list_tools)
self.server.call_tool()(self.call_tool)
async def list_tools(self) -> List[Tool]:
"""列出所有可用的工具(流程)"""
flows = await self.get_project_flows()
tools = []
for flow in flows:
tools.append(Tool(
name=flow.name,
description=flow.description,
inputSchema=self.generate_schema(flow),
))
return tools
async def call_tool(self, name: str, arguments: dict) -> List[TextContent]:
"""调用工具(执行流程)"""
flow = await self.get_flow_by_name(name)
result = await self.execute_flow(flow, arguments)
return [TextContent(type="text", text=str(result))]
客户端使用示例(Python):
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
server_params = StdioServerParameters(
command="langflow",
args=["mcp", "start", "--project-id", "your-project"],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# 列出可用工具
tools = await session.list_tools()
print(tools)
# 调用工具
result = await session.call_tool("my-flow", {"input": "Hello"})
print(result.content)
import asyncio
asyncio.run(main())
12.2 MCP Client 模式
Langflow 也可以作为 MCP Client,调用其他 MCP Server 提供的工具。
配置示例:
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/dir"]
},
"fetch": {
"command": "uvx",
"args": ["mcp-server-fetch"]
}
}
}
12. 数据库设计
12.1 ER 关系图(简化)
User (用户)
├── id (PK)
├── username
├── password_hash
├── is_active
├── is_superuser
└── profile_image
│
├── 1:N ──────── Flow (流程)
│ ├── id (PK)
│ ├── name
│ ├── data (JSON)
│ ├── user_id (FK)
│ ├── folder_id (FK)
│ ├── tags
│ └── created_at
│
├── 1:N ──────── ApiKey (API密钥)
│ ├── id (PK)
│ ├── user_id (FK)
│ ├── name
│ ├── api_key (hash)
│ └── expires_at
│
└── 1:N ──────── Message (消息/聊天记录)
├── id (PK)
├── flow_id (FK)
├── sender
├── content
├── session_id
└── timestamp
Folder (文件夹)
├── id (PK)
├── name
├── parent_id (自引用 FK)
└── user_id (FK)
Deployment (部署)
├── id (PK)
├── flow_id (FK)
├── name
├── status
└── endpoint_url
12.2 核心表结构 SQL
-- 用户表
CREATE TABLE "user" (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username VARCHAR(255) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
is_superuser BOOLEAN DEFAULT FALSE,
profile_image TEXT,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- 流程表
CREATE TABLE flow (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
data JSONB NOT NULL, -- 流程图的完整 JSON 数据
user_id UUID REFERENCES "user"(id) ON DELETE CASCADE,
folder_id UUID REFERENCES folder(id) ON DELETE SET NULL,
description TEXT,
tags TEXT[],
status VARCHAR(50) DEFAULT 'active',
gradient VARCHAR(100),
icon VARCHAR(100),
icon_bg_color VARCHAR(20),
access_type VARCHAR(20) DEFAULT 'private',
save_path TEXT,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- API 密钥表
CREATE TABLE apikey (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES "user"(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
api_key VARCHAR(512) NOT NULL UNIQUE, -- 存储 hash
created_at TIMESTAMP DEFAULT NOW(),
expires_at TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
);
-- 部署表
CREATE TABLE deployment (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
flow_id UUID REFERENCES flow(id) ON DELETE CASCADE,
user_id UUID REFERENCES "user")(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
display_name VARCHAR(255),
status VARCHAR(50) DEFAULT 'inactive',
endpoint_name VARCHAR(255) UNIQUE,
webhook_enabled BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
12.3 数据库迁移
创建新迁移:
cd src/backend/base
alembic revision --autogenerate -m "描述你的变更"
alembic upgrade head
迁移文件示例:
"""add new column to flow table
Revision ID: xxx
Revises: yyy
Create Date: 2024-01-01
"""
import sqlalchemy as sa
from alembic import op
def upgrade():
op.add_column('flow', sa.Column('new_column', sa.String(), nullable=True))
def downgrade():
op.drop_column('flow', 'new_column')
13. 实战:添加新功能
场景:为流程添加标签过滤功能
步骤 1:后端 - 添加数据库字段
# services/database/models/flow/model.py
class Flow(SQLModel, table=True):
# ... 现有字段 ...
tags: List[str] = Field(default=[], sa_column=Column(JSON))
创建迁移:
alembic revision --autogenerate -m "add tags to flow"
alembic upgrade head
步骤 2:后端 - 更新 API Schema
# api/schemas.py
class FlowCreate(BaseModel):
name: str
data: dict
tags: List[str] = []
class FlowRead(BaseModel):
id: UUID
name: str
tags: List[str]
# ...
class FlowUpdate(BaseModel):
name: Optional[str] = None
tags: Optional[List[str]] = None
步骤 3:后端 - 添加过滤查询
# api/v1/flows.py
@router.get("/flows")
async def list_flows(
tag: Optional[str] = Query(None, description="按标签过滤"),
current_user: User = Depends(get_current_user),
):
query = select(Flow).where(Flow.user_id == current_user.id)
if tag:
# PostgreSQL 数组包含查询
query = query.where(Flow.tags.contains([tag]))
result = await session.execute(query)
flows = result.scalars().all()
return flows
步骤 4:前端 - 更新类型定义
// types/flow.ts
export interface Flow {
id: string;
name: string;
tags: string[];
// ...
}
export interface FlowCreate {
name: string;
data: Record<string, any>;
tags?: string[];
}
步骤 5:前端 - 添加 UI 组件
// components/TagFilter.tsx
import React from 'react';
import { Chip } from '@mui/material';
interface TagFilterProps {
tags: string[];
selectedTag: string | null;
onTagSelect: (tag: string | null) => void;
}
export const TagFilter: React.FC<TagFilterProps> = ({
tags,
selectedTag,
onTagSelect
}) => (
<div className="tag-filter">
<Chip
label="All"
color={selectedTag === null ? "primary" : "default"}
onClick={() => onTagSelect(null)}
/>
{tags.map(tag => (
<Chip
key={tag}
label={tag}
color={selectedTag === tag ? "primary" : "default"}
onClick={() => onTagSelect(tag)}
/>
))}
</div>
);
步骤 6:集成测试
# tests/test_tags.py
@pytest.mark.asyncio
async def test_filter_flows_by_tag(async_client, test_user):
# 创建带标签的流程
flow1 = await create_flow(client, user=test_user, tags=["agent"])
flow2 = await create_flow(client, user=test_user, tags=["rag"])
# 测试过滤
response = await async_client.get(
"/api/v1/flows?tag=agent",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert len(response.json()) == 1
assert response.json()[0]["id"] == flow1.id
14. 调试技巧
14.1 后端调试
开启 DEBUG 日志:
# 环境变量
LANGFLOW_LOG_LEVEL=DEBUG
# 或者在代码中
import logging
logging.getLogger("langflow").setLevel(logging.DEBUG)
使用 pdb 断点调试:
def some_function():
import pdb; pdb.set_trace() # 在这里暂停
# 你的代码
VSCode 调试配置(.vscode/launch.json):
{
"version": "0.2.0",
"configurations": [
{
"name": "Langflow Backend",
"type": "debugpy",
"request": "launch",
"program": "-m",
"args": ["langflow", "run", "--backend-only"],
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/src/backend/base",
"envFile": "${workspaceFolder}/.env"
}
]
}
性能分析:
import cProfile
import pstats
def profile_function():
pr = cProfile.Profile()
pr.enable()
# 你想分析的代码
run_graph(...)
pr.disable()
stats = pstats.Stats(pr)
stats.sort_stats('cumulative')
stats.print_stats(20) # 显示前 20 个最耗时的函数
14.2 前端调试
React DevTools:
- 安装浏览器扩展
- 查看 Component 树和 Props
- 分析 Re-renders
Redux DevTools(Zustand):
- Zustand 默认支持 Redux DevTools
- 可以查看 State 变更历史
网络请求调试:
// 添加 Axios 拦截器
axios.interceptors.request.use(config => {
console.log('API Request:', config.url, config.data);
return config;
});
axios.interceptors.response.use(response => {
console.log('API Response:', response.status, response.data);
return response;
});
14.3 图执行调试
启用详细执行日志:
# 环境变量
LANGFLOW_GRAPH_DEBUG=true
# 代码中
graph.debug = True
await run_graph(graph, ...) # 会打印每个节点的执行过程
单步执行:
async def debug_run_graph(graph: Graph):
"""手动逐步执行图中的节点"""
order = graph.topological_sort()
for i, vertex_id in enumerate(order):
vertex = graph.get_vertex(vertex_id)
print(f"[{i+1}/{len(order)}] Executing: {vertex.data.name}")
# 手动检查输入
inputs = collect_inputs(vertex, graph)
print(f" Inputs: {inputs}")
# 执行
result = await vertex.build(**inputs)
print(f" Output: {result}")
# 手动继续?
input("Press Enter to continue...")
15. 常见问题
Q1: 如何解决端口占用问题?
# 查找占用端口的进程
netstat -ano | findstr :7860
# 终止进程
taskkill /PID <PID> /F
# 或指定其他端口启动
langflow run --port 8080
Q2: 数据库迁移失败怎么办?
# 回滚到上一个版本
alembic downgrade -1
# 标记迁移为已完成(谨慎使用)
alembic stamp head
# 查看当前版本
alembic current
# 查看迁移历史
alembic history
Q3: 自定义组件不显示?
检查清单:
- 组件类是否继承正确的基类
- 是否实现了
build()方法 - 是否正确注册到组件注册表
CUSTOM_COMPONENTS_DIR环境变量是否设置正确- 重启服务后是否生效
# 调试:查看已注册组件
from lfx.components.registry import get_all_components
components = get_all_components()
print([c.name for c in components])
Q4: 性能优化建议?
后端优化:
- 使用 Redis 替代内存缓存
- 启用数据库连接池
- 使用 Celery 处理耗时任务
- 添加适当的索引
前端优化:
- 启用 React.memo 和 useMemo
- 使用虚拟滚动(react-window)
- 代码分割(React.lazy)
- 图片懒加载
数据库优化:
-- 为常用查询添加索引
CREATE INDEX idx_flow_user_id ON flow(user_id);
CREATE INDEX idx_flow_folder_id ON flow(folder_id);
CREATE INDEX idx_message_session_id ON message(session_id);
-- 定期清理旧数据
DELETE FROM message WHERE created_at < NOW() - INTERVAL '90 days';
ANALYZE; -- 更新统计信息
Q5: 如何参与贡献?
- Fork 仓库
- 创建特性分支 (
git checkout -b feature/amazing-feature) - 提交更改 (
git commit -m 'Add amazing feature') - 推送到分支 (
git push origin feature/amazing-feature) - 创建 Pull Request
代码规范:
- 使用 ruff 进行代码格式化和 linting
- 运行测试确保不破坏现有功能
- 添加适当的单元测试
- 更新相关文档
# 开发前检查
make lint
make test
make format
📚 学习资源
官方资源
- 官方文档: https://docs.langflow.org
- GitHub 仓库: https://github.com/langflow-ai/langflow
- Discord 社区: https://discord.gg/EqksyE2EXf
- YouTube 频道: https://www.youtube.com/@Langflow
推荐阅读顺序
-
入门阶段:
- 阅读 README.md 了解项目概况
- 完成 Quickstart 教程
- 熟悉基本组件的使用
-
进阶阶段:
- 阅读 ARCHITECTURE.md
- 研究 PHILOSOPHY.md
- 理解服务层的实现
-
高级阶段:
- 深入研究图引擎 (
src/lfx/graph/) - 学习如何创建自定义组件
- 阅读 API 实现了解 RESTful 设计
- 深入研究图引擎 (
-
贡献者阶段:
- 阅读 CONTRIBUTING.md
- 研究现有 PR 和 Issue
- 尝试修复 Bug 或添加小功能
🎯 总结
Langflow 是一个架构清晰、设计精良的开源 AI 工作流平台。通过本教程的学习,你应该已经掌握了:
✅ 整体架构: 三层分离设计(langflow → langflow-base → lfx)
✅ 启动流程: 从 CLI 到 FastAPI 应用的完整链路
✅ 服务层: 服务定位器模式和依赖注入
✅ API 设计: RESTful API 和 OpenAI 兼容接口
✅ 图引擎: DAG 数据结构和执行算法
✅ 组件系统: 如何创建和使用组件
✅ 前端架构: React + Zustand + React Flow
✅ 高级特性: Agent 和 MCP 支持
下一步行动建议:
- 动手实践:尝试创建一个简单的自定义组件
- 阅读源码:选择你感兴趣的部分深入阅读
- 参与社区:加入 Discord 讨论,提交 Issue 或 PR
- 构建应用:用 Langflow 构建一个实际的 AI 应用
祝你学习愉快!🚀
如果本教程对你有帮助,欢迎给 Langflow 项目 ⭐ Star!
评论区