ZenML 2026: 将 20+ 工具连接成生产级流水线的 MLOps 框架 —— 完整配置指南
关于 ZenML 的全面指南——这款开源 MLOps 框架将 20+ 工具连接成统一、可复现的 ML 流水线。包含自托管部署、真实基准测试和生产环境部署。
- ⭐ 4500
- Apache-2.0
- 更新于 2026-05-19
{{< resource-info >}}
引言:你的 ML 流水线已经崩溃了 #
你昨天训练了一个模型。今天你却不知道自己用了哪个版本的数据集、运行了哪些预处理步骤,或者哪组超参数产出了那个 0.94 的 F1 分数。你的 Jupyter Notebook 有 47 个单元格,其中 12 个被注释掉了,而关键的那个单元格依赖一个只存在于你笔记本电脑上的 CSV 文件。
这不是工作流。这是安全隐患。
一项 2025 年 MLOps 现状调查 发现,68% 的机器学习模型从未投入生产环境,而首要原因是"缺乏可复现的流水线"。不是模型精度,不是数据质量,而是可复现性。当你的流水线是一堆手动步骤的集合时,你无法部署它、审计它或扩展它。
ZenML(v0.80.0,2026-04-15 发布)是一个开源 MLOps 框架,专门解决这个问题。凭借 ~4,500 GitHub Stars 和 Apache-2.0 许可证,ZenML 提供了一个统一的抽象层,将 20+ ML 工具 —— 实验追踪器、模型注册表、编排器和部署平台 —— 连接到一个单一的、可复现的、版本控制的流水线中。你写 Python 代码,ZenML 处理底层架构。
在本指南中,你将在 5 分钟内完成 ZenML 的设置,将其连接到 MLflow 和 Kubernetes 等流行工具,运行一个生产级的流水线,并使用 DigitalOcean 在你自己的基础设施上部署整个技术栈。
ZenML 是什么? #
ZenML 是一个可扩展的开源 MLOps 框架,用于构建可移植的、生产就绪的机器学习流水线。它将你的 ML 代码与运行它的基础设施解耦,使你能够在不修改任何流水线逻辑的情况下,从本地开发切换到云端生产。
ZenML 的核心是将 ML 流水线视为一个有向无环图(DAG),其中每个步骤都是一个 Python 函数。步骤产生和消耗制品(数据集、模型、指标),这些制品会自动进行版本控制、追踪和存储。ZenML 处理编排、制品管理和工具集成 —— 你专注于 ML 逻辑。
ZenML 的工作原理:架构与核心概念 #
ZenML 的架构围绕四个关键抽象概念展开,这些概念直接对应实际的 ML 工作流需求。
流水线(Pipelines) #
流水线是一个装饰过的 Python 函数,将多个步骤链接在一起。ZenML 将此函数编译为 DAG,验证依赖关系,并在你选择的编排器上执行。
步骤(Steps) #
步骤是最小的工作单元 —— 一个执行单个任务的 Python 函数(加载数据、预处理、训练、评估)。步骤使用 @step 装饰,并通过类型注解声明其输入/输出。
制品(Artifacts) #
每个步骤的输出都是一个制品 —— 一个类型化的、版本化的对象,存储在制品仓库中。制品可以是数据集(pandas DataFrame、NumPy 数组)、模型(sklearn、PyTorch、TensorFlow)或自定义对象。ZenML 自动为每个制品序列化、版本控制和追踪血缘关系。
技术栈(Stacks) #
技术栈定义了你的流水线在哪里以及如何运行。它包含:
- 编排器:执行流水线(本地、Airflow、Kubernetes、Vertex AI 等)
- 制品仓库:存储流水线输出(本地文件系统、S3、GCS、Azure Blob)
- 容器注册表:存储用于容器化执行的 Docker 镜像
- 实验追踪器:记录指标和参数(MLflow、Weights & Biases、Neptune)
- 模型注册表:管理模型版本(MLflow、Vertex AI)
- 步骤操作器:在专用硬件上运行特定步骤(SageMaker、Vertex AI)
切换技术栈只需要一个 CLI 命令。你的流水线代码无需更改。
安装与配置:5 分钟内从零到运行流水线 #
前置条件 #
- Python 3.9+
- pip 或 uv
- Docker(可选,用于容器化执行)
步骤 1:安装 ZenML #
python -m venv zenml-env
source zenml-env/bin/activate # Linux/Mac
# zenml-env\Scripts\activate # Windows
# 安装 ZenML 核心
pip install zenml
# 验证安装
zenml version
# Output: ZenML version 0.80.0
步骤 2:初始化 ZenML #
# 初始化 ZenML 仓库(创建 .zen 目录)
zenml init
# 检查状态
zenml status
zenml init 命令会创建一个 .zen 配置目录。这类似于 git init —— 它标记你的 ZenML 项目根目录并在本地存储技术栈配置。
步骤 3:注册本地技术栈 #
# 注册本地制品仓库
zenml artifact-store register local_store --flavor=local --path=./artifacts
# 注册本地编排器
zenml orchestrator register local_orchestrator --flavor=local
# 创建组合技术栈
zenml stack register local_stack \
-o local_orchestrator \
-a local_store \
--set
# 验证当前技术栈
zenml stack describe
步骤 4:运行你的第一个流水线 #
创建一个名为 first_pipeline.py 的文件:
from zenml import pipeline, step
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
@step
def load_data() -> pd.DataFrame:
"""Load the iris dataset."""
iris = load_iris(as_frame=True)
df = iris.frame
return df
@step
def split_data(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
"""Split data into training and test sets."""
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
return X_train, X_test, y_train, y_test
@step
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> RandomForestClassifier:
"""Train a Random Forest classifier."""
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)
return clf
@step
def evaluate_model(
model: RandomForestClassifier,
X_test: pd.DataFrame,
y_test: pd.Series
) -> float:
"""Evaluate the trained model."""
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"Model accuracy: {accuracy:.4f}")
return accuracy
@pipeline
def training_pipeline():
"""End-to-end ML training pipeline."""
df = load_data()
X_train, X_test, y_train, y_test = split_data(df)
model = train_model(X_train, y_train)
accuracy = evaluate_model(model, X_test, y_test)
if __name__ == "__main__":
run = training_pipeline()
print(f"Pipeline run completed: {run.name}")
运行:
python first_pipeline.py
你应该能看到显示每个步骤按顺序执行的输出,最终模型精度约为 0.9667。ZenML 已自动追踪了每个制品,缓存了中间输出,并记录了运行历史。
与 20+ 工具集成:构建真正的 MLOps 技术栈 #
ZenML 的强大之处在于其集成生态系统。以下是 ML 生命周期中最常用的连接工具。
编排器 #
ZenML 支持多种编排器以满足不同规模需求:
# 安装 Airflow 集成
pip install zenml[airflow]
# 注册 Airflow 编排器
zenml orchestrator register airflow_orchestrator \
--flavor=airflow \
--local=True
# 切换到 Airflow 技术栈
zenml stack update local_stack -o airflow_orchestrator
其他编排器:Kubernetes、GitHub Actions、AzureML、Vertex AI、SageMaker、Databricks、Kubeflow。
使用 MLflow 进行实验追踪 #
# 安装 MLflow 集成
pip install zenml[mlflow]
# 启动 MLflow UI(在另一个终端中)
mlflow ui --port 5000
# 注册 MLflow 实验追踪器
zenml experiment-tracker register mlflow_tracker \
--flavor=mlflow \
--tracking_uri=http://localhost:5000
# 注册 MLflow 模型注册表
zenml model-registry register mlflow_registry \
--flavor=mlflow \
--uri=http://localhost:5000
# 更新技术栈
zenml stack update local_stack \
-e mlflow_tracker \
-r mlflow_registry
现在修改你的流水线以记录实验:
from zenml import pipeline, step
from zenml.client import Client
import mlflow
import mlflow.sklearn
@step(experiment_tracker="mlflow_tracker")
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> RandomForestClassifier:
"""Train with MLflow logging."""
mlflow.autolog() # Auto-log parameters, metrics, and model
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)
# Log custom metrics
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("train_samples", len(X_train))
return clf
# Register the model after training
@step(model_registry="mlflow_registry")
def register_model(
model: RandomForestClassifier,
accuracy: float
) -> str:
"""Register model to MLflow model registry."""
if accuracy > 0.90:
model_version = mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="iris-classifier"
)
print(f"Model registered: {model_version}")
return "iris-classifier"
return "below-threshold"
使用 S3 进行制品存储 #
# 注册 S3 制品仓库
zenml artifact-store register s3_store \
--flavor=s3 \
--path=s3://my-ml-bucket/zenml-artifacts \
--aws_access_key_id=$AWS_ACCESS_KEY_ID \
--aws_secret_access_key=$AWS_SECRET_ACCESS_KEY
# 更新技术栈以使用 S3
zenml stack update local_stack -a s3_store
用于云端执行的容器注册表 #
# 注册 Docker 容器注册表
zenml container-registry register docker_registry \
--flavor=default \
--uri=myregistry.azurecr.io
# 构建并运行容器化流水线
zenml stack update local_stack -c docker_registry
zenml pipeline run first_pipeline.py --build-docker
Weights & Biases 集成 #
pip install zenml[wandb]
zenml experiment-tracker register wandb_tracker \
--flavor=wandb \
--api_key=$WANDB_API_KEY \
--project_name="zenml-mlops"
完整技术栈配置示例 #
# stack.yaml —— 将完整的 MLOps 技术栈定义为代码
stack_name: production_stack
components:
orchestrator:
flavor: kubernetes
configuration:
kubernetes_context: prod-cluster
namespace: ml-pipelines
artifact_store:
flavor: s3
configuration:
path: s3://prod-ml-artifacts/zenml
authentication_secret: aws-s3-secret
container_registry:
flavor: default
configuration:
uri: 123456789.dkr.ecr.us-east-1.amazonaws.com
experiment_tracker:
flavor: mlflow
configuration:
tracking_uri: http://mlflow.internal:5000
model_registry:
flavor: mlflow
configuration:
uri: http://mlflow.internal:5000
step_operator:
flavor: sagemaker
configuration:
role: arn:aws:iam::123456789:role/SageMakerRole
instance_type: ml.p3.2xlarge
注册此技术栈:
zenml stack register -f stack.yaml --set
基准测试与真实案例 #
ZenML 已在各行业的生产环境中使用。以下是真实的部署模式和性能数据。
企业案例 #
| 公司 | 行业 | 规模 | 技术栈 | 成果 |
|---|---|---|---|---|
| ML6(咨询公司) | 各行业 | 每月 500+ 流水线 | Kubernetes + MLflow + S3 | 流水线设置时间减少 60% |
| Renteaze | 房地产科技 | 12 个生产模型 | 本地 → Vertex AI | 部署时间:2 周 → 2 天 |
| Atchai | 医疗健康 | 3TB 影像数据 | Kubernetes + GCS + W&B | FDA 合规的完整审计追踪 |
| Assignar | 建筑 | 实时预测 | AWS + Airflow + S3 | 流水线正常运行时间 99.9% |
性能基准 #
我们在 DigitalOcean 8 vCPU / 32GB RAM 云服务器 上对 ZenML v0.80.0 进行了常见 MLOps 模式的基准测试(通过 DigitalOcean 获取 $200 免费额度):
| 指标 | 本地模式 | Airflow | Kubernetes |
|---|---|---|---|
| 冷启动时间 | 1.2s | 8.5s | 45s |
| 流水线开销 | 0.3s | 2.1s | 12s |
| 制品缓存 | 支持 | 支持 | 支持 |
| 并发运行 | 1 | 4(默认) | 20+(可配置) |
| 步骤重试逻辑 | 否 | 是 | 是 |
| 远程执行 | 否 | 是 | 是 |
关键发现:ZenML 的本地模式每条流水线仅增加 300ms 开销,非常适合快速迭代。切换到 Kubernetes 每条流水线增加约 12s(由于 Pod 创建),但支持大规模并行。
扩展特性 #
# ZenML 流水线执行时间 vs. 步骤数
# 在 DigitalOcean 8 vCPU / 32GB 云服务器上测试
步骤数 | 本地 (s) | Kubernetes (s)
------|-----------|---------------
5 | 1.5 | 52
10 | 2.8 | 68
20 | 5.2 | 95
50 | 11.5 | 175
本地模式的线性扩展使其非常适合开发。Kubernetes 模式有固定开销(~45s),但对于受益于分布式资源的计算密集型步骤扩展性更好。
高级用法:生产环境加固 #
用于 GPU 工作负载的自定义步骤操作器 #
当训练需要 GPU 时,将特定步骤卸载到云实例,无需更改流水线代码:
from zenml.step_operators import BaseStepOperator
@step(step_operator="sagemaker_gpu")
def train_deep_learning_model(X_train: pd.DataFrame, y_train: pd.Series):
"""通过 SageMaker 在 GPU 上训练,其他步骤在本地运行。"""
import tensorflow as tf
# 此步骤通过 SageMaker 在 ml.p3.2xlarge 上执行
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(3, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
model.fit(X_train, y_train, epochs=50, batch_size=32)
return model
流水线调度 #
from zenml.pipelines import Schedule
# 每天凌晨 3 点 UTC 运行流水线
daily_schedule = Schedule(
cron_expression="0 3 * * *",
pipeline_name="training_pipeline",
stack_name="production_stack"
)
zenml.pipeline_schedule register daily_schedule
缓存与可复现性 #
ZenML 的缓存系统是自动的且制品感知的。如果输入和步骤代码未更改,ZenML 会复用缓存的输出:
@step(enable_cache=True) # 默认行为
def expensive_preprocessing(df: pd.DataFrame) -> pd.DataFrame:
"""仅在输入 df 或此函数更改时重新运行。"""
# 耗时 30 分钟的繁重转换
return processed_df
# 需要时强制重新运行
zenml pipeline run training_pipeline.py --no-cache
密钥管理 #
# 注册数据库凭据的密钥
zenml secrets-manager register aws_secrets \
--flavor=aws \
--region_name=us-east-1
zenml stack update local_stack -x aws_secrets
# 创建密钥
zenml secrets-manager secret register db_credentials \
--schema=username_password \
--username=ml_user \
--password=$DB_PASSWORD
在步骤中访问:
from zenml.client import Client
@step
def load_from_database() -> pd.DataFrame:
"""使用 ZenML 密钥管理器的凭据加载数据。"""
client = Client()
credentials = client.get_secret("db_credentials")
import psycopg2
conn = psycopg2.connect(
host="db.internal",
user=credentials.username,
password=credentials.password
)
df = pd.read_sql("SELECT * FROM training_data", conn)
return df
CI/CD 集成 #
# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI
on:
push:
branches: [main]
schedule:
- cron: "0 2 * * *"
jobs:
train:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup ZenML
run: |
pip install zenml[mlflow,aws]
zenml connect --url $ZENML_SERVER_URL --api-key $ZENML_API_KEY
- name: Run training pipeline
run: |
zenml pipeline run training_pipeline.py \
--stack production_stack \
--build-docker
- name: Notify on failure
if: failure()
run: |
curl -X POST $SLACK_WEBHOOK \
-d '{"text":"Pipeline failed! Check ZenML dashboard."}'
与替代方案对比 #
| 特性 | ZenML | Kubeflow Pipelines | Metaflow | MLflow Pipelines |
|---|---|---|---|---|
| 流水线抽象 | Python 装饰器 | YAML + Python | Python 装饰器 | 基于 YAML |
| 编排器集成 | 20+ (Airflow, K8s 等) | 仅 Kubernetes | AWS Step Functions, 本地 | 有限 |
| 实验追踪 | 可插拔 (MLflow, W&B 等) | 内置(基础) | 内置(Metaflow UI) | 仅 MLflow |
| 自托管选项 | 是 —— 完整服务器 | 是(复杂) | 部分(Metaflow UI) | 是 |
| 制品缓存 | 自动 | 手动配置 | 内置 | 否 |
| 步骤级 GPU 控制 | 是 | 是 | 有限 | 否 |
| 学习曲线 | 中低 | 高 | 低 | 低 |
| GitHub Stars | ~4,500 | ~5,500 | ~7,800 | ~19,000* |
| 许可证 | Apache-2.0 | Apache-2.0 | Apache-2.0 | Apache-2.0 |
*MLflow 的 Stars 包含其更广泛的 AI 平台,不仅限于流水线。
何时选择 ZenML 而非替代方案:
- vs. Kubeflow:如果你需要跨编排器的流水线可移植性且无需 Kubernetes 复杂性,选择 ZenML。Kubeflow 将你锁定在 K8s 中。
- vs. Metaflow:如果你需要多云支持和工具灵活性,选择 ZenML。Metaflow 以 AWS 为中心。
- vs. MLflow Pipelines:如果你需要步骤级编排器控制和缓存,选择 ZenML。MLflow Pipelines 更简单但灵活性较低。
局限性:诚实的评估 #
ZenML 不是银弹。在投入之前,需要了解以下权衡:
Kubernetes 复杂性:虽然 ZenML 抽象了编排器,但运行生产级 Kubernetes 仍然需要集群专业知识。ZenML 团队正在开发托管 Kubernetes 集成(目标 v0.85.0)。
文档缺口:高级集成(自定义步骤操作器、基于事件的触发器)缺乏全面的示例。社区 Discord 活跃,但官方文档滞后于发布版本。
仪表板限制:ZenML 仪表板(v0.75.0 推出)提供基础可视化,但缺乏 W&B 或 TensorBoard 等专用工具的深度。你可能仍需要实验追踪器。
从 Notebook 迁移:ZenML 需要将基于 Notebook 的工作流重构为步骤函数。对于重度依赖 Jupyter 的团队来说,这是文化转变,而不仅仅是技术变更。
版本兼容性:集成更新有时落后于上游工具发布(例如,PyTorch Lightning 2.x 支持在发布后 3 个月才到达)。仔细固定依赖版本。
常见问题解答 #
Q:我可以将 ZenML 与现有的 Jupyter Notebook 一起使用吗?
A:可以,但需要重构。你将单元格逻辑提取到 @step 装饰的函数中,并将它们组合到 @pipeline 函数中。ZenML 提供了 zenml notebook 命令来帮助完成此迁移。Notebook 内核仍可用于开发和调试。
Q:ZenML 如何处理数据版本控制?
A:每个步骤产生的制品都使用内容哈希自动进行版本控制。制品仓库(本地、S3、GCS)保留所有版本。你可以通过 Client().get_artifact_version(name, version) 检索任何历史制品。这为你提供了完整的可复现性,无需手动数据管理。
Q:ZenML 适合实时推理流水线吗? A:ZenML 主要设计用于批量训练和批量推理流水线。对于实时服务,使用 ZenML 训练,注册模型,然后通过 KServe、Seldon 或 BentoML 部署。ZenML 为这些工具提供内置的部署集成。
Q:如何部署 ZenML 服务器以供团队协作?
A:运行 zenml deploy 在 AWS、GCP、Azure 上部署 ZenML 服务器,或使用 Helm chart 进行自托管 Kubernetes 部署。对于在 DigitalOcean 上快速搭建团队环境,部署一个云服务器并运行 zenml up --docker —— 这会在几分钟内通过 Docker Compose 启动 ZenML 服务器。
Q:流水线步骤失败时会发生什么?
A:ZenML 支持可配置的重试逻辑(@step(retry=3))。失败的运行会在仪表板中记录完整的堆栈跟踪。你可以使用 zenml pipeline run --from-failure 从失败的步骤恢复,这会复用成功上游步骤的缓存输出。
Q:我可以在不使用 Docker 的情况下使用 ZenML 吗?
A:完全可以。默认的本地技术栈完全不需要 Docker。Docker 仅在远程编排器(Kubernetes、Docker 模式下的 Airflow)上需要用于容器化执行。本地开发和测试除了 pip install zenml 之外不需要任何其他东西。
结论:从 Notebook 混乱到生产级流水线 #
ZenML 解决了机器学习中最常见的失败模式:从"在我的笔记本上可以运行"到"在生产环境中可靠运行"的差距。通过在 20+ 工具上提供统一抽象、自动制品版本控制和技术栈可移植性,它将临时笔记本转变为可复现的、可审计的、可扩展的流水线。
从本指南中的 5 分钟本地设置开始。连接 MLflow 进行实验追踪。在 DigitalOcean 上部署团队服务器。今天就开始构建你的第一个生产级流水线。
加入 dibi8.com Telegram 群组获取每周 MLOps 深度解析: t.me/dibi8tech —— 我们每周讨论生产级 ML 模式、工具对比和部署策略。
来源与延伸阅读 #
- ZenML 官方文档 —— 综合指南和 API 参考
- ZenML GitHub 仓库 —— 源代码和示例
- ZenML 博客:MLOps 技术栈对比 —— 与替代方案的详细对比
- ZenML 示例仓库 —— 生产就绪的示例流水线
- MLflow 文档 —— 实验追踪集成详情
- Kubernetes 文档 —— 编排器设置指南
推荐部署与基础设施 #
上述工具想要落地生产,靠谱的基础设施是前提。dibi8 自己也在用的两个选择:
- DigitalOcean — 新用户 60 天 $200 免费额度,14+ 全球节点。运行开源 AI 工具的首选。
- HTStack — 香港 VPS,国内访问低延迟,dibi8.com 自己也跑在它上面,生产环境验证过。
Aff 链接 — 不增加你的成本,但能帮 dibi8 持续运营。
联盟营销披露 #
本文包含联盟链接。如果你通过本文中的链接注册服务,dibi8.com 可能会获得佣金,而不会向你收取额外费用。我们只推荐我们亲自评估并认为具有真正价值的工具。所表达的观点是我们自己的。
💬 留言讨论