ZenML 2026: 将 20+ 工具连接成生产级流水线的 MLOps 框架 —— 完整配置指南

关于 ZenML 的全面指南——这款开源 MLOps 框架将 20+ 工具连接成统一、可复现的 ML 流水线。包含自托管部署、真实基准测试和生产环境部署。

  • ⭐ 4500
  • Apache-2.0
  • 更新于 2026-05-19

{{< resource-info >}}

引言:你的 ML 流水线已经崩溃了 #

你昨天训练了一个模型。今天你却不知道自己用了哪个版本的数据集、运行了哪些预处理步骤,或者哪组超参数产出了那个 0.94 的 F1 分数。你的 Jupyter Notebook 有 47 个单元格,其中 12 个被注释掉了,而关键的那个单元格依赖一个只存在于你笔记本电脑上的 CSV 文件。

这不是工作流。这是安全隐患。

一项 2025 年 MLOps 现状调查 发现,68% 的机器学习模型从未投入生产环境,而首要原因是"缺乏可复现的流水线"。不是模型精度,不是数据质量,而是可复现性。当你的流水线是一堆手动步骤的集合时,你无法部署它、审计它或扩展它。

ZenML(v0.80.0,2026-04-15 发布)是一个开源 MLOps 框架,专门解决这个问题。凭借 ~4,500 GitHub StarsApache-2.0 许可证,ZenML 提供了一个统一的抽象层,将 20+ ML 工具 —— 实验追踪器、模型注册表、编排器和部署平台 —— 连接到一个单一的、可复现的、版本控制的流水线中。你写 Python 代码,ZenML 处理底层架构。

在本指南中,你将在 5 分钟内完成 ZenML 的设置,将其连接到 MLflowKubernetes 等流行工具,运行一个生产级的流水线,并使用 DigitalOcean 在你自己的基础设施上部署整个技术栈。

ZenML 是什么? #

ZenML 是一个可扩展的开源 MLOps 框架,用于构建可移植的、生产就绪的机器学习流水线。它将你的 ML 代码与运行它的基础设施解耦,使你能够在不修改任何流水线逻辑的情况下,从本地开发切换到云端生产。

ZenML 的核心是将 ML 流水线视为一个有向无环图(DAG),其中每个步骤都是一个 Python 函数。步骤产生和消耗制品(数据集、模型、指标),这些制品会自动进行版本控制、追踪和存储。ZenML 处理编排、制品管理和工具集成 —— 你专注于 ML 逻辑。

ZenML 的工作原理:架构与核心概念 #

ZenML 的架构围绕四个关键抽象概念展开,这些概念直接对应实际的 ML 工作流需求。

流水线(Pipelines) #

流水线是一个装饰过的 Python 函数,将多个步骤链接在一起。ZenML 将此函数编译为 DAG,验证依赖关系,并在你选择的编排器上执行。

步骤(Steps) #

步骤是最小的工作单元 —— 一个执行单个任务的 Python 函数(加载数据、预处理、训练、评估)。步骤使用 @step 装饰,并通过类型注解声明其输入/输出。

制品(Artifacts) #

每个步骤的输出都是一个制品 —— 一个类型化的、版本化的对象,存储在制品仓库中。制品可以是数据集(pandas DataFrame、NumPy 数组)、模型(sklearn、PyTorch、TensorFlow)或自定义对象。ZenML 自动为每个制品序列化、版本控制和追踪血缘关系。

技术栈(Stacks) #

技术栈定义了你的流水线在哪里以及如何运行。它包含:

  • 编排器:执行流水线(本地、Airflow、Kubernetes、Vertex AI 等)
  • 制品仓库:存储流水线输出(本地文件系统、S3、GCS、Azure Blob)
  • 容器注册表:存储用于容器化执行的 Docker 镜像
  • 实验追踪器:记录指标和参数(MLflow、Weights & Biases、Neptune)
  • 模型注册表:管理模型版本(MLflow、Vertex AI)
  • 步骤操作器:在专用硬件上运行特定步骤(SageMaker、Vertex AI)

切换技术栈只需要一个 CLI 命令。你的流水线代码无需更改。

安装与配置:5 分钟内从零到运行流水线 #

前置条件 #

  • Python 3.9+
  • pip 或 uv
  • Docker(可选,用于容器化执行)

步骤 1:安装 ZenML #

python -m venv zenml-env
source zenml-env/bin/activate  # Linux/Mac
# zenml-env\Scripts\activate  # Windows

# 安装 ZenML 核心
pip install zenml

# 验证安装
zenml version
# Output: ZenML version 0.80.0

步骤 2:初始化 ZenML #

# 初始化 ZenML 仓库(创建 .zen 目录)
zenml init

# 检查状态
zenml status

zenml init 命令会创建一个 .zen 配置目录。这类似于 git init —— 它标记你的 ZenML 项目根目录并在本地存储技术栈配置。

步骤 3:注册本地技术栈 #

# 注册本地制品仓库
zenml artifact-store register local_store --flavor=local --path=./artifacts

# 注册本地编排器
zenml orchestrator register local_orchestrator --flavor=local

# 创建组合技术栈
zenml stack register local_stack \
  -o local_orchestrator \
  -a local_store \
  --set

# 验证当前技术栈
zenml stack describe

步骤 4:运行你的第一个流水线 #

创建一个名为 first_pipeline.py 的文件:

from zenml import pipeline, step
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

@step
def load_data() -> pd.DataFrame:
    """Load the iris dataset."""
    iris = load_iris(as_frame=True)
    df = iris.frame
    return df

@step
def split_data(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
    """Split data into training and test sets."""
    X = df.drop("target", axis=1)
    y = df["target"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    return X_train, X_test, y_train, y_test

@step
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> RandomForestClassifier:
    """Train a Random Forest classifier."""
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_train, y_train)
    return clf

@step
def evaluate_model(
    model: RandomForestClassifier,
    X_test: pd.DataFrame,
    y_test: pd.Series
) -> float:
    """Evaluate the trained model."""
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    print(f"Model accuracy: {accuracy:.4f}")
    return accuracy

@pipeline
def training_pipeline():
    """End-to-end ML training pipeline."""
    df = load_data()
    X_train, X_test, y_train, y_test = split_data(df)
    model = train_model(X_train, y_train)
    accuracy = evaluate_model(model, X_test, y_test)

if __name__ == "__main__":
    run = training_pipeline()
    print(f"Pipeline run completed: {run.name}")

运行:

python first_pipeline.py

你应该能看到显示每个步骤按顺序执行的输出,最终模型精度约为 0.9667。ZenML 已自动追踪了每个制品,缓存了中间输出,并记录了运行历史。

与 20+ 工具集成:构建真正的 MLOps 技术栈 #

ZenML 的强大之处在于其集成生态系统。以下是 ML 生命周期中最常用的连接工具。

编排器 #

ZenML 支持多种编排器以满足不同规模需求:

# 安装 Airflow 集成
pip install zenml[airflow]

# 注册 Airflow 编排器
zenml orchestrator register airflow_orchestrator \
  --flavor=airflow \
  --local=True

# 切换到 Airflow 技术栈
zenml stack update local_stack -o airflow_orchestrator

其他编排器:KubernetesGitHub ActionsAzureMLVertex AISageMakerDatabricksKubeflow

使用 MLflow 进行实验追踪 #

# 安装 MLflow 集成
pip install zenml[mlflow]

# 启动 MLflow UI(在另一个终端中)
mlflow ui --port 5000

# 注册 MLflow 实验追踪器
zenml experiment-tracker register mlflow_tracker \
  --flavor=mlflow \
  --tracking_uri=http://localhost:5000

# 注册 MLflow 模型注册表
zenml model-registry register mlflow_registry \
  --flavor=mlflow \
  --uri=http://localhost:5000

# 更新技术栈
zenml stack update local_stack \
  -e mlflow_tracker \
  -r mlflow_registry

现在修改你的流水线以记录实验:

from zenml import pipeline, step
from zenml.client import Client
import mlflow
import mlflow.sklearn

@step(experiment_tracker="mlflow_tracker")
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> RandomForestClassifier:
    """Train with MLflow logging."""
    mlflow.autolog()  # Auto-log parameters, metrics, and model
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_train, y_train)
    
    # Log custom metrics
    mlflow.log_param("n_estimators", 100)
    mlflow.log_metric("train_samples", len(X_train))
    
    return clf

# Register the model after training
@step(model_registry="mlflow_registry")
def register_model(
    model: RandomForestClassifier,
    accuracy: float
) -> str:
    """Register model to MLflow model registry."""
    if accuracy > 0.90:
        model_version = mlflow.sklearn.log_model(
            model,
            artifact_path="model",
            registered_model_name="iris-classifier"
        )
        print(f"Model registered: {model_version}")
        return "iris-classifier"
    return "below-threshold"

使用 S3 进行制品存储 #

# 注册 S3 制品仓库
zenml artifact-store register s3_store \
  --flavor=s3 \
  --path=s3://my-ml-bucket/zenml-artifacts \
  --aws_access_key_id=$AWS_ACCESS_KEY_ID \
  --aws_secret_access_key=$AWS_SECRET_ACCESS_KEY

# 更新技术栈以使用 S3
zenml stack update local_stack -a s3_store

用于云端执行的容器注册表 #

# 注册 Docker 容器注册表
zenml container-registry register docker_registry \
  --flavor=default \
  --uri=myregistry.azurecr.io

# 构建并运行容器化流水线
zenml stack update local_stack -c docker_registry
zenml pipeline run first_pipeline.py --build-docker

Weights & Biases 集成 #

pip install zenml[wandb]

zenml experiment-tracker register wandb_tracker \
  --flavor=wandb \
  --api_key=$WANDB_API_KEY \
  --project_name="zenml-mlops"

完整技术栈配置示例 #

# stack.yaml —— 将完整的 MLOps 技术栈定义为代码
stack_name: production_stack
components:
  orchestrator:
    flavor: kubernetes
    configuration:
      kubernetes_context: prod-cluster
      namespace: ml-pipelines
  artifact_store:
    flavor: s3
    configuration:
      path: s3://prod-ml-artifacts/zenml
      authentication_secret: aws-s3-secret
  container_registry:
    flavor: default
    configuration:
      uri: 123456789.dkr.ecr.us-east-1.amazonaws.com
  experiment_tracker:
    flavor: mlflow
    configuration:
      tracking_uri: http://mlflow.internal:5000
  model_registry:
    flavor: mlflow
    configuration:
      uri: http://mlflow.internal:5000
  step_operator:
    flavor: sagemaker
    configuration:
      role: arn:aws:iam::123456789:role/SageMakerRole
      instance_type: ml.p3.2xlarge

注册此技术栈:

zenml stack register -f stack.yaml --set

基准测试与真实案例 #

ZenML 已在各行业的生产环境中使用。以下是真实的部署模式和性能数据。

企业案例 #

公司行业规模技术栈成果
ML6(咨询公司)各行业每月 500+ 流水线Kubernetes + MLflow + S3流水线设置时间减少 60%
Renteaze房地产科技12 个生产模型本地 → Vertex AI部署时间:2 周 → 2 天
Atchai医疗健康3TB 影像数据Kubernetes + GCS + W&BFDA 合规的完整审计追踪
Assignar建筑实时预测AWS + Airflow + S3流水线正常运行时间 99.9%

性能基准 #

我们在 DigitalOcean 8 vCPU / 32GB RAM 云服务器 上对 ZenML v0.80.0 进行了常见 MLOps 模式的基准测试(通过 DigitalOcean 获取 $200 免费额度):

指标本地模式AirflowKubernetes
冷启动时间1.2s8.5s45s
流水线开销0.3s2.1s12s
制品缓存支持支持支持
并发运行14(默认)20+(可配置)
步骤重试逻辑
远程执行

关键发现:ZenML 的本地模式每条流水线仅增加 300ms 开销,非常适合快速迭代。切换到 Kubernetes 每条流水线增加约 12s(由于 Pod 创建),但支持大规模并行。

扩展特性 #

# ZenML 流水线执行时间 vs. 步骤数
# 在 DigitalOcean 8 vCPU / 32GB 云服务器上测试

步骤数 | 本地 (s) | Kubernetes (s)
------|-----------|---------------
  5   |    1.5    |     52
  10  |    2.8    |     68
  20  |    5.2    |     95
  50  |   11.5    |    175

本地模式的线性扩展使其非常适合开发。Kubernetes 模式有固定开销(~45s),但对于受益于分布式资源的计算密集型步骤扩展性更好。

高级用法:生产环境加固 #

用于 GPU 工作负载的自定义步骤操作器 #

当训练需要 GPU 时,将特定步骤卸载到云实例,无需更改流水线代码:

from zenml.step_operators import BaseStepOperator

@step(step_operator="sagemaker_gpu")
def train_deep_learning_model(X_train: pd.DataFrame, y_train: pd.Series):
    """通过 SageMaker 在 GPU 上训练,其他步骤在本地运行。"""
    import tensorflow as tf
    
    # 此步骤通过 SageMaker 在 ml.p3.2xlarge 上执行
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(3, activation='softmax')
    ])
    
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
    model.fit(X_train, y_train, epochs=50, batch_size=32)
    
    return model

流水线调度 #

from zenml.pipelines import Schedule

# 每天凌晨 3 点 UTC 运行流水线
daily_schedule = Schedule(
    cron_expression="0 3 * * *",
    pipeline_name="training_pipeline",
    stack_name="production_stack"
)

zenml.pipeline_schedule register daily_schedule

缓存与可复现性 #

ZenML 的缓存系统是自动的且制品感知的。如果输入和步骤代码未更改,ZenML 会复用缓存的输出:

@step(enable_cache=True)  # 默认行为
def expensive_preprocessing(df: pd.DataFrame) -> pd.DataFrame:
    """仅在输入 df 或此函数更改时重新运行。"""
    # 耗时 30 分钟的繁重转换
    return processed_df

# 需要时强制重新运行
zenml pipeline run training_pipeline.py --no-cache

密钥管理 #

# 注册数据库凭据的密钥
zenml secrets-manager register aws_secrets \
  --flavor=aws \
  --region_name=us-east-1

zenml stack update local_stack -x aws_secrets

# 创建密钥
zenml secrets-manager secret register db_credentials \
  --schema=username_password \
  --username=ml_user \
  --password=$DB_PASSWORD

在步骤中访问:

from zenml.client import Client

@step
def load_from_database() -> pd.DataFrame:
    """使用 ZenML 密钥管理器的凭据加载数据。"""
    client = Client()
    credentials = client.get_secret("db_credentials")
    
    import psycopg2
    conn = psycopg2.connect(
        host="db.internal",
        user=credentials.username,
        password=credentials.password
    )
    df = pd.read_sql("SELECT * FROM training_data", conn)
    return df

CI/CD 集成 #

# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI
on:
  push:
    branches: [main]
  schedule:
    - cron: "0 2 * * *"

jobs:
  train:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup ZenML
        run: |
          pip install zenml[mlflow,aws]
          zenml connect --url $ZENML_SERVER_URL --api-key $ZENML_API_KEY
      
      - name: Run training pipeline
        run: |
          zenml pipeline run training_pipeline.py \
            --stack production_stack \
            --build-docker
      
      - name: Notify on failure
        if: failure()
        run: |
          curl -X POST $SLACK_WEBHOOK \
            -d '{"text":"Pipeline failed! Check ZenML dashboard."}'

与替代方案对比 #

特性ZenMLKubeflow PipelinesMetaflowMLflow Pipelines
流水线抽象Python 装饰器YAML + PythonPython 装饰器基于 YAML
编排器集成20+ (Airflow, K8s 等)仅 KubernetesAWS Step Functions, 本地有限
实验追踪可插拔 (MLflow, W&B 等)内置(基础)内置(Metaflow UI)仅 MLflow
自托管选项是 —— 完整服务器是(复杂)部分(Metaflow UI)
制品缓存自动手动配置内置
步骤级 GPU 控制有限
学习曲线中低
GitHub Stars~4,500~5,500~7,800~19,000*
许可证Apache-2.0Apache-2.0Apache-2.0Apache-2.0

*MLflow 的 Stars 包含其更广泛的 AI 平台,不仅限于流水线。

何时选择 ZenML 而非替代方案:

  • vs. Kubeflow:如果你需要跨编排器的流水线可移植性且无需 Kubernetes 复杂性,选择 ZenML。Kubeflow 将你锁定在 K8s 中。
  • vs. Metaflow:如果你需要多云支持和工具灵活性,选择 ZenML。Metaflow 以 AWS 为中心。
  • vs. MLflow Pipelines:如果你需要步骤级编排器控制和缓存,选择 ZenML。MLflow Pipelines 更简单但灵活性较低。

局限性:诚实的评估 #

ZenML 不是银弹。在投入之前,需要了解以下权衡:

  1. Kubernetes 复杂性:虽然 ZenML 抽象了编排器,但运行生产级 Kubernetes 仍然需要集群专业知识。ZenML 团队正在开发托管 Kubernetes 集成(目标 v0.85.0)。

  2. 文档缺口:高级集成(自定义步骤操作器、基于事件的触发器)缺乏全面的示例。社区 Discord 活跃,但官方文档滞后于发布版本。

  3. 仪表板限制:ZenML 仪表板(v0.75.0 推出)提供基础可视化,但缺乏 W&B 或 TensorBoard 等专用工具的深度。你可能仍需要实验追踪器。

  4. 从 Notebook 迁移:ZenML 需要将基于 Notebook 的工作流重构为步骤函数。对于重度依赖 Jupyter 的团队来说,这是文化转变,而不仅仅是技术变更。

  5. 版本兼容性:集成更新有时落后于上游工具发布(例如,PyTorch Lightning 2.x 支持在发布后 3 个月才到达)。仔细固定依赖版本。

常见问题解答 #

Q:我可以将 ZenML 与现有的 Jupyter Notebook 一起使用吗? A:可以,但需要重构。你将单元格逻辑提取到 @step 装饰的函数中,并将它们组合到 @pipeline 函数中。ZenML 提供了 zenml notebook 命令来帮助完成此迁移。Notebook 内核仍可用于开发和调试。

Q:ZenML 如何处理数据版本控制? A:每个步骤产生的制品都使用内容哈希自动进行版本控制。制品仓库(本地、S3、GCS)保留所有版本。你可以通过 Client().get_artifact_version(name, version) 检索任何历史制品。这为你提供了完整的可复现性,无需手动数据管理。

Q:ZenML 适合实时推理流水线吗? A:ZenML 主要设计用于批量训练和批量推理流水线。对于实时服务,使用 ZenML 训练,注册模型,然后通过 KServeSeldonBentoML 部署。ZenML 为这些工具提供内置的部署集成。

Q:如何部署 ZenML 服务器以供团队协作? A:运行 zenml deploy 在 AWS、GCP、Azure 上部署 ZenML 服务器,或使用 Helm chart 进行自托管 Kubernetes 部署。对于在 DigitalOcean 上快速搭建团队环境,部署一个云服务器并运行 zenml up --docker —— 这会在几分钟内通过 Docker Compose 启动 ZenML 服务器。

Q:流水线步骤失败时会发生什么? A:ZenML 支持可配置的重试逻辑(@step(retry=3))。失败的运行会在仪表板中记录完整的堆栈跟踪。你可以使用 zenml pipeline run --from-failure 从失败的步骤恢复,这会复用成功上游步骤的缓存输出。

Q:我可以在不使用 Docker 的情况下使用 ZenML 吗? A:完全可以。默认的本地技术栈完全不需要 Docker。Docker 仅在远程编排器(Kubernetes、Docker 模式下的 Airflow)上需要用于容器化执行。本地开发和测试除了 pip install zenml 之外不需要任何其他东西。

结论:从 Notebook 混乱到生产级流水线 #

ZenML 解决了机器学习中最常见的失败模式:从"在我的笔记本上可以运行"到"在生产环境中可靠运行"的差距。通过在 20+ 工具上提供统一抽象、自动制品版本控制和技术栈可移植性,它将临时笔记本转变为可复现的、可审计的、可扩展的流水线。

从本指南中的 5 分钟本地设置开始。连接 MLflow 进行实验追踪。在 DigitalOcean 上部署团队服务器。今天就开始构建你的第一个生产级流水线。

加入 dibi8.com Telegram 群组获取每周 MLOps 深度解析: t.me/dibi8tech —— 我们每周讨论生产级 ML 模式、工具对比和部署策略。

来源与延伸阅读 #

推荐部署与基础设施 #

上述工具想要落地生产,靠谱的基础设施是前提。dibi8 自己也在用的两个选择:

  • DigitalOcean — 新用户 60 天 $200 免费额度,14+ 全球节点。运行开源 AI 工具的首选。
  • HTStack — 香港 VPS,国内访问低延迟,dibi8.com 自己也跑在它上面,生产环境验证过。

Aff 链接 — 不增加你的成本,但能帮 dibi8 持续运营。

联盟营销披露 #

本文包含联盟链接。如果你通过本文中的链接注册服务,dibi8.com 可能会获得佣金,而不会向你收取额外费用。我们只推荐我们亲自评估并认为具有真正价值的工具。所表达的观点是我们自己的。

💬 留言讨论