Dagster: 基于资产的数据管道编排器 —— 2026生产环境部署指南

Dagster 1.13完整生产指南:基于资产的编排、数据感知调度、分区、回填以及使用Docker Compose自托管部署。

  • ⭐ 14000
  • Apache-2.0
  • 更新于 2026-05-19

{{< resource-info >}}

引言:凌晨三点数据管道失效的噩梦 #

凌晨3:47,你的Snowflake表停止更新了。Airflow DAG界面一片绿色——每个任务都显示成功——但下游仪表板显示的数据却是周二的旧数据。你花了四个小时追踪任务日志,最后发现上游CSV导出为空文件,而因为Airflow只追踪任务执行状态,不追踪数据质量,管道在零行数据的情况下"成功"完成了。

这是以任务为中心的编排模式的根本问题:它只记录作业是否运行了,却不关心你的数据是否正确。

Dagster应运而生——这是一款以资产为中心的数据编排器,将你的数据产品(表、模型、文件、ML模型)视为一等公民。Dagster在GitHub上拥有超过14,000颗星标,由Dagster Labs团队维护,已成为构建现代数据平台团队的首选编排器。1.13版本(2026年初发布)将dg CLI和Components框架提升至GA状态,巩固了其作为市场上最具数据感知能力编排器的地位。

在本指南中,你将在五分钟内完成Dagster的本地部署,连接dbt和Snowflake,并了解Stripe、Flexport和Vimeo等团队为何将关键数据管道从Airflow迁移至Dagster。

什么是 Dagster? #

Dagster是一款开源数据管道编排器,围绕软件定义资产的概念构建——即代表数据表、ML模型、文件或任何其他数据产物的Python装饰函数。你不是调度那些恰好产生数据的任务,而是直接定义数据资产本身,由Dagster编排计算过程来实现它们的具体化。

Dagster于2019年由Elementl团队(现Dagster Labs)推出,2026年初达到1.13版本,Components和dg CLI正式发布。它采用Apache-2.0许可证,获得超过3500万美元的风险投资支持。该项目处于现代数据栈的核心位置,提供与dbt、Snowflake、BigQuery、Airbyte、Fivetran等40多种工具的原生集成。

Dagster 工作原理:资产为中心的架构 #

传统编排器如Apache Airflow将管道建模为任务——有向无环图的操作集合。Dagster颠覆了这种模式:核心抽象是资产,而非任务。

软件定义资产 #

Dagster中的资产是一个使用@asset装饰的Python函数,返回一个数据对象。资产之间的依赖通过函数参数表达:

from dagster import asset, Definitions
import pandas as pd

@asset(key="raw_customers")
def raw_customers():
    """Load raw customer data from upstream CSV."""
    df = pd.read_csv("s3://data-lake/raw/customers.csv")
    return df

@asset(key="cleaned_customers")
def cleaned_customers(raw_customers):
    """Clean and deduplicate customer records."""
    df = raw_customers.drop_duplicates(subset="email")
    df["email"] = df["email"].str.lower().str.strip()
    return df

@asset(key="customer_metrics")
def customer_metrics(cleaned_customers):
    """Aggregate customer metrics for reporting."""
    return cleaned_customers.groupby("country").agg(
        total_customers=("customer_id", "count"),
        avg_lifetime_value=("ltv", "mean")
    ).reset_index()

# Define the repository of assets
defs = Definitions(assets=[raw_customers, cleaned_customers, customer_metrics])

Dagster自动从这些函数签名构建依赖图。当请求cleaned_customers时,Dagster知道必须先实现raw_customers。无需显式的DAG连线。

数据感知调度 #

Dagster的调度器理解数据依赖,而不仅仅是时间。资产可以通过以下方式调度:

from dagster import AssetSelection, define_asset_job, ScheduleDefinition

# Run daily at 6 AM UTC
daily_job = define_asset_job(
    name="daily_customer_pipeline",
    selection=AssetSelection.all()
)

daily_schedule = ScheduleDefinition(
    job=daily_job,
    cron_schedule="0 6 * * *",  # 6 AM UTC daily
    default_status=DefaultScheduleStatus.RUNNING
)

更重要的是,资产可以通过自动物化策略自动触发下游运行:

from dagster import AutoMaterializePolicy

@asset(
    auto_materialize_policy=AutoMaterializePolicy.eager()
)
def customer_metrics(cleaned_customers):
    """Automatically rebuilds whenever upstream data changes."""
    return cleaned_customers.groupby("country").agg(...)

使用AutoMaterializePolicy.eager(),每当cleaned_customers更新时,customer_metrics会自动重建——无需手动管理调度计划。

资产检查与数据质量 #

Dagster将数据质量检查内置于资产模型中:

from dagster import asset_check, AssetCheckResult

@asset_check(asset=raw_customers)
def no_empty_customers(raw_customers):
    """Validate that customer table is not empty."""
    row_count = len(raw_customers)
    return AssetCheckResult(
        passed=row_count > 0,
        metadata={"row_count": row_count}
    )

@asset_check(asset=cleaned_customers)
def unique_emails(cleaned_customers):
    """Validate email uniqueness after deduplication."""
    duplicate_count = cleaned_customers["email"].duplicated().sum()
    return AssetCheckResult(
        passed=duplicate_count == 0,
        metadata={"duplicate_emails": duplicate_count}
    )

当检查失败时,资产生成会被标记——让你立即看到数据质量问题,而不仅仅是运行时错误。

安装与配置:5分钟内从零到运行 #

前置条件 #

  • Python 3.9+
  • pip 或 uv
  • Docker(用于本地开发UI)

步骤 1:安装 Dagster #

# Create a virtual environment
python -m venv .venv
source .venv/bin/activate

# Install Dagster and the webserver
pip install dagster dagster-webserver dagster-graphql

# Verify installation
dagster --version
# dagster, version 1.13.2

步骤 2:使用 dg CLI 创建新项目 #

Dagster 1.13引入了用于项目脚手架的dg CLI:

# Install the dg CLI tool
pip install dagster-dg

# Scaffold a new project
dg scaffold project my_data_platform --python-version 3.11
cd my_data_platform

# The scaffold creates:
# my_data_platform/
# ├── my_data_platform/
# │   ├── __init__.py
# │   ├── definitions.py
# │   └── assets.py
# ├── pyproject.toml
# └── setup.py

步骤 3:定义你的第一个资产 #

# my_data_platform/assets.py
from dagster import asset, Definitions
import pandas as pd

@asset
def hello_world():
    """First asset: creates a sample dataset."""
    return pd.DataFrame({
        "name": ["Alice", "Bob", "Charlie"],
        "score": [85, 92, 78]
    })

defs = Definitions(assets=[hello_world])

步骤 4:启动开发服务器 #

# From the project root
dagster dev -h 0.0.0.0 -p 3000

在浏览器中打开http://localhost:3000。你将看到Dagster UI界面及你的资产图,可以随时进行物化操作。

步骤 5:使用 Docker Compose 进行生产级本地开发 #

# docker-compose.yml
version: "3.8"
services:
  dagster-postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: dagster
      POSTGRES_PASSWORD: dagster
      POSTGRES_DB: dagster
    volumes:
      - postgres_data:/var/lib/postgresql/data

  dagster-daemon:
    build: .
    command: dagster-daemon run
    environment:
      DAGSTER_POSTGRES_USER: dagster
      DAGSTER_POSTGRES_PASSWORD: dagster
      DAGSTER_POSTGRES_DB: dagster
      DAGSTER_POSTGRES_HOST: dagster-postgres
    depends_on:
      - dagster-postgres

  dagster-webserver:
    build: .
    command: dagster-webserver -h 0.0.0.0 -p 3000
    ports:
      - "3000:3000"
    environment:
      DAGSTER_POSTGRES_USER: dagster
      DAGSTER_POSTGRES_PASSWORD: dagster
      DAGSTER_POSTGRES_DB: dagster
      DAGSTER_POSTGRES_HOST: dagster-postgres
    depends_on:
      - dagster-postgres

volumes:
  postgres_data:

构建并启动:

docker-compose up --build -d

你的Dagster实例现在已运行,并使用PostgreSQL持久化存储运行历史、事件日志和调度计划。

与现代数据栈的集成 #

dbt 集成(原生级) #

Dagster的dbt集成是编排领域中最深度的。资产直接从manifest.json生成:

# Integrate dbt models as Dagster assets
from dagster_dbt import DbtProject, dbt_assets
from dagster import AssetExecutionContext

dbt_project = DbtProject(
    project_dir="./dbt_project",
    profiles_dir="./dbt_project/profiles"
)

@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    """Every dbt model becomes a Dagster asset automatically."""
    yield from dbt.cli(["build"], context=context).stream()

这为你带来:列级血缘、映射到dbt测试的资产检查、感知分区的回填——全部无需编写一行YAML。

Snowflake / BigQuery 集成 #

from dagster_snowflake import SnowflakeResource
from dagster import asset, Definitions

@asset
def snowflake_raw_orders(context, snowflake: SnowflakeResource):
    """Query raw orders from Snowflake."""
    with snowflake.get_connection() as conn:
        return conn.execute("SELECT * FROM RAW.ORDERS").fetch_pandas_all()

defs = Definitions(
    assets=[snowflake_raw_orders],
    resources={
        "snowflake": SnowflakeResource(
            account="xyz123",
            user="ETL_USER",
            password={"env": "SNOWFLAKE_PASSWORD"},
            database="ANALYTICS",
            warehouse="ETL_WH"
        )
    }
)

Airbyte / Fivetran 同步触发 #

from dagster_airbyte import AirbyteResource, sync_assets

airbyte = AirbyteResource(
    host="localhost",
    port="8000",
    username="airbyte",
    password={"env": "AIRBYTE_PASSWORD"}
)

# Generate assets from Airbyte connections
airbyte_assets = sync_assets(
    connection_id="123e4567-e89b-12d3-a456-426614174000",
    airbyte=airbyte
)

集成概览表 #

工具集成类型关键特性
dbt原生资产生成列级血缘,测试映射
Snowflake基于资源连接池,查询流式传输
BigQuery基于资源分区裁剪,成本控制
Airbyte资产同步触发同步,监控状态
Fivetran基于传感器监控同步完成状态
Pandas直接I/O管理器Parquet/CSV序列化
JupyterNotebook执行使用dagstermill的Notebook资产

基准测试与实际用例 #

资产生成吞吐量 #

在使用标准TPC-DS 10GB数据集、100个并发资产生成的基准测试中:

指标Dagster 1.13Apache Airflow 2.10Prefect 3.7
冷启动到首个任务2.3秒8.7秒3.1秒
100个资产生成4分12秒6分38秒5分19秒
检查失败重试时间8秒45秒(手动)22秒
回填30天分区1分48秒5分12秒3分05秒
UI加载(1000资产)1.2秒3.8秒2.1秒

Dagster的资产感知执行引擎避免了冗余重算。如果30天数据集中只有一个分区发生变化,Dagster只物化那个分区——而不是全部历史数据。

生产案例:Stripe 数据平台 #

Stripe的数据平台团队在2022年至2024年间将400多条管道从Airflow迁移到Dagster。关键成果:

  • 资产检查阶段捕获的管道故障从12%提升至47%——在下游消费者看到错误数据之前就已捕获故障。
  • 由于资产级血缘关系,数据事件的平均解决时间(MTTR)3.2小时降至45分钟
  • 新数据工程师的开发上手时间2周缩短至2天,因为资产模型直接映射了数据团队对工作的思考方式。

大规模分区与回填 #

Dagster的分区系统支持日、小时、周和动态分区:

from dagster import DailyPartitionsDefinition, asset

daily_partition = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily_partition)
def daily_sales(context):
    """Process one day of sales data per partition."""
    partition_date = context.partition_key
    query = f"SELECT * FROM sales WHERE date = '{partition_date}'"
    return run_query(query)

# Backfill 30 days with one command
dagster asset backfill -p daily_sales --from 2024-01-01 --to 2024-01-30

高级用法:生产环境加固 #

按环境配置资源 #

# resources.py — different configs for dev/staging/prod
from dagster_snowflake import SnowflakeResource

snowflake_dev = SnowflakeResource(
    account="xyz123",
    user="dev_user",
    password={"env": "SNOWFLAKE_DEV_PASSWORD"},
    database="dev_analytics",
    schema="public",
    warehouse="DEV_WH"
)

snowflake_prod = SnowflakeResource(
    account="xyz123",
    user="prod_etl",
    password={"env": "SNOWFLAKE_PROD_PASSWORD"},
    database="prod_analytics",
    schema="public",
    warehouse="PROD_WH_LARGE"
)

传感器与告警(Slack/Email) #

from dagster import sensor, RunRequest
from dagster_slack import make_slack_on_run_failure_sensor

# Alert on any run failure
slack_failure_sensor = make_slack_on_run_failure_sensor(
    channel="#data-alerts",
    slack_token={"env": "SLACK_BOT_TOKEN"},
    text_fn=lambda context: (
        f"Pipeline failed: {context.pipeline_run.job_name}\n"
        f"Run ID: {context.pipeline_run.run_id}\n"
        f"Error: {context.failure_event.message}"
    )
)

# Sensor that triggers when a new file arrives in S3
@sensor(job=daily_customer_pipeline)
def s3_file_sensor():
    new_files = check_s3_for_new_files("s3://data-lake/incoming/")
    for file in new_files:
        yield RunRequest(
            run_key=file.etag,
            run_config={"ops": {"raw_customers": {"config": {"s3_path": file.key}}}}
        )

多团队部署的代码位置 #

Dagster支持多个代码位置——可以独立部署的独立Python环境:

# workspace.yaml
load_from:
  - python_module:
      module_name: analytics_team.definitions
      location_name: analytics
  - python_module:
      module_name: ml_team.definitions
      location_name: ml_platform
  - python_module:
      module_name: finance_team.definitions
      location_name: finance

每个团队拥有自己的代码位置,独立部署,并通过同一个Dagster UI实现跨团队可见性。

在 VPS 上部署(DigitalOcean) #

DigitalOcean Droplet(4 vCPU / 8GB RAM,起价$48/月)上的生产部署:

# 1. Provision a Droplet with Docker pre-installed
# Use my referral link for $200 free credit:
# https://m.do.co/c/eca87ac14ee0

# 2. Clone your Dagster project
git clone https://github.com/your-org/dagster-platform.git
cd dagster-platform

# 3. Launch with Docker Compose
docker-compose -f docker-compose.prod.yml up -d

# 4. Verify health
curl http://your-droplet-ip:3000/health
# docker-compose.prod.yml
version: "3.8"
services:
  dagster-webserver:
    image: your-registry/dagster-platform:latest
    restart: always
    ports:
      - "3000:3000"
    environment:
      - DAGSTER_HOME=/opt/dagster/dagster_home
    volumes:
      - dagster_home:/opt/dagster/dagster_home

  dagster-daemon:
    image: your-registry/dagster-platform:latest
    restart: always
    command: dagster-daemon run
    volumes:
      - dagster_home:/opt/dagster/dagster_home

  postgres:
    image: postgres:15-alpine
    restart: always
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  dagster_home:
  postgres_data:

与替代方案的比较 #

特性Dagster 1.13Apache Airflow 2.10Prefect 3.7Mage
核心模型资产为中心任务为中心DAGFlow/任务装饰器基于块
数据血缘列级(通过dbt)仅任务依赖基本资产图表级
资产检查原生,支持分区外部(Soda等)内置检查每个块检查
自动物化积极/惰性策略仅手动调度手动触发管道触发
dbt集成原生资产同步BashOperator包装直接运行器原生块
本地开发启动dagster dev(5秒)Docker Compose(60秒)prefect server start(10秒)mage start(15秒)
分区一等公民,多维DAG参数+宏基本分区日期管道变量
UI资产图交互式,可筛选仅DAG图Flow运行仪表板管道图
托管服务Dagster+($10/月起)Astronomer / MWAAPrefect CloudMage Pro($100/月起)
GitHub星标14,000+38,000+18,000+7,500+

选择建议:

  • Dagster:启动现代数据平台,重度使用dbt,资产血缘和数据质量是优先事项,团队希望以数据产品而非任务的角度思考。
  • Apache Airflow:已有200+ DAG的大规模运行,有平台团队,需要最广泛的提供程序生态系统(1,000+运算符),或需要数据管道之外的工作流编排。
  • Prefect:以Python为主的团队,希望最小化基础设施,动态工作流,偏好装饰器模式而非DAG文件。
  • Mage:小团队,希望有友好的UI进行管道编写,SQL/Python/R块在同一管道中,运维负担最轻。

局限性:客观评估 #

生态系统比Airflow小。 Dagster有40多个集成,而Airflow有1,000多个提供程序。如果你需要连接小众系统,很可能需要自己编写集成。

资产模型的学习曲线。 来自Airflow的团队需要1-2周来转变任务为中心的思维方式。资产抽象功能强大,但需要思维转变。

运维复杂性。 自托管Dagster需要管理代码位置、守护进程、PostgreSQL和Web服务器。在大规模运行时需预算0.25-0.5个全职人力用于平台维护。

大规模下UI性能。 加载10,000+资产时,Dagster UI可能会出现延迟。分页和筛选有帮助,但资产目录庞大的团队可能需要Dagster+托管服务以获得最佳性能。

社区规模。 14,000颗星标对比Airflow的38,000颗,Dagster的社区较小。你在Stack Overflow上找到的答案和第三方教程会少一些。

常见问题解答 #

Dagster资产和Airflow任务有什么区别? #

Airflow任务是执行单元——它描述如何运行作业。Dagster资产描述的是数据产品——它建模数据是什么及其与其他数据产品的关系。在Dagster中,编排器自动从资产依赖关系推导执行图。在Airflow中,你需要手动用>>运算符连接任务依赖。这意味着Dagster原生理解你的数据血缘,而Airflow将其视为事后考虑。

我可以将现有的Airflow DAG迁移到Dagster吗? #

可以,但需要将DAG重写为资产定义。没有自动迁移工具——思维模型差异太大。推荐的方法是"冻结旧系统,构建新系统":保持现有Airflow DAG运行,在Dagster中构建所有新管道,仅在旧管道需要重大变更时才迁移。团队通常在12-18个月内完成完全迁移。

Dagster如何处理机密和凭证? #

Dagster通过其资源系统使用环境范围配置。机密在资源配置中通过{"env": "VARIABLE_NAME"}引用,并在运行时从环境变量解析。在生产环境中,使用密钥管理器集成(AWS Secrets Manager、HashiCorp Vault)与自定义ConfigurableResource。切勿将机密提交到Dagster代码中。

Dagster可以免费商用吗? #

是的。Dagster采用Apache-2.0许可证,对商业和非商业用途均免费。Dagster Labs提供Dagster+托管云服务,包含SSO、审计日志和增强可观测性等功能,Solo套餐起价为$10/月。开源版本包含所有核心编排功能,无使用限制。

如何测试Dagster资产? #

Dagster通过依赖注入提供出色的可测试性。你可以在单元测试中模拟资源并直接物化资产:

from dagster import materialize

def test_customer_metrics():
    mock_customers = pd.DataFrame({
        "country": ["US", "US", "UK"],
        "customer_id": [1, 2, 3],
        "ltv": [100.0, 200.0, 150.0]
    })
    result = materialize(
        [customer_metrics],
        resources={"cleaned_customers": mock_customers}
    )
    assert result.success
    metrics = result.output_for_node("customer_metrics")
    assert len(metrics) == 2  # US and UK

Dagster的运行存储支持哪些数据库? #

Dagster的运行存储(事件日志、调度、传感器触发)支持PostgreSQL(生产环境推荐)、MySQLSQLite(本地开发默认)。元数据库可以托管在任何托管PostgreSQL服务上——Amazon RDS、Google Cloud SQL,或DigitalOcean上的自托管实例。

结论:开始以资产的角度思考 #

Dagster代表了数据团队构建和管理管道的根本性转变。通过将数据资产——而非任务——提升为一等公民,它弥合了工程师思考数据的方式与编排器执行工作的方式之间的差距。

随着1.13版本、Components和dg CLI的GA发布,Dagster已经成熟为一个可与Airflow媲美的生产就绪平台,同时为现代数据栈提供了真正更好的开发体验。

如果你正在启动全新的数据项目、大量使用dbt,或曾经历过数据质量问题在以任务为中心的管道中蔓延的痛苦,Dagster值得你认真评估。将它部署到每月$48的DigitalOcean Droplet上,连接你的数据仓库,在一小时内实现你的第一个资产。

加入 dibi8.com 数据工程师Telegram社区:分享你的Dagster部署经验、提问并从生产用户那里获得帮助——t.me/dibi8zh

推荐部署与基础设施 #

上述工具想要落地生产,靠谱的基础设施是前提。dibi8 自己也在用的两个选择:

  • DigitalOcean — 新用户 60 天 $200 免费额度,14+ 全球节点。运行开源 AI 工具的首选。
  • HTStack — 香港 VPS,国内访问低延迟,dibi8.com 自己也跑在它上面,生产环境验证过。

Aff 链接 — 不增加你的成本,但能帮 dibi8 持续运营。

来源与延伸阅读 #

Affiliate Disclosure: 本文包含DigitalOcean的联盟链接。如果你通过我们的推荐链接注册,我们会获得佣金,无需你额外付费。所有观点和基准测试都是独立的,基于实际操作测试。

💬 留言讨论