Dagster:基于资产调度的数据管道编排器——2026年生产环境设置指南

Complete production guide to Dagster 1.13: asset-based orchestration, data-aware scheduling, partitioning, backfills, and self-hosted deployment with Docker Compose.

  • Apache-2.0
  • 更新于 2026-05-19

{{< 资源信息 >}} ## 简介:盲目管道故障的噩梦 凌晨 3:47,您的 Snowflake 表停止更新。 Airflow DAG 显示绿色——每项任务都成功——但下游仪表板显示周二以来的过时数据。 您花了四个小时跟踪任务日志,结果发现上游 CSV 导出为空,并且由于 Airflow 跟踪任务执行而不是数据质量,因此管道“成功”,行数为零。 这是以任务为中心的编排的根本问题:它跟踪作业是否运行,而不是数据是否正确。 输入 Dagster — 以资产为中心的数据编排器,它将您的数据产品(表、模型、文件、ML 模型)视为一等公民。 Dagster 拥有 14,000 多个 GitHub star,并由 Dagster Labs 团队维护,已成为构建现代数据平台的数据团队的首选协调器。 1.13 版本(2026 年初发布)使“dg”CLI 和组件框架普遍可用,巩固了其作为市场上最具数据感知能力的编排器的地位。 在本指南中,您将在五分钟内在本地部署 Dagster,将其连接到 dbt 和 Snowflake,并了解为什么 Stripe、Flexport 和 Vimeo 的团队已将其关键数据管道从 Airflow 迁移到 Dagster。 ## 什么是 Dagster? Dagster 是一个开源数据管道编排器,围绕 软件定义资产 的概念构建 - Python 修饰的函数,表示数据表、ML 模型、文件或任何其他数据工件。 您无需调度恰好生成数据的任务,而是定义数据资产本身,然后 Dagster 协调实现它们所需的计算。 Dagster 由 Elementl(现为 Dagster Labs)团队于 2019 年推出,随着 Components 的 GA 版本和“dg” CLI,Dagster 于 2026 年初达到 1.13。 它根据 Apache-2.0 获得许可,并由 3500 万美元以上的风险投资支持。 该项目位于现代数据堆栈的中心,提供与 dbt、Snowflake、BigQuery、Airbyte、Fivetran 和 40 多个其他工具的本机集成。 ## Dagster 的工作原理:以资产为中心的架构 Apache Airflow 等传统编排器将管道建模为“任务”——有向非循环操作图。 Dagster 翻转了这个模型:核心抽象是资产,而不是任务。 ### 软件定义资产 Dagster 中的资产是一个用“@asset”修饰的 Python 函数,它返回一个数据对象。 资产之间的依赖关系表示为函数参数: 蟒蛇 从 dagster 导入资产,定义 将 pandas 导入为 pd @asset(键=“raw_customers”) def raw_customers(): """从上游 CSV 加载原始客户数据。""" df = pd.read_csv("s3://data-lake/raw/customers.csv") 返回df @asset(键=“cleaned_customers”) def clean_customers(raw_customers): """清理并删除重复的客户记录。""" df = raw_customers.drop_duplicates(subset="email") df["电子邮件"] = df["电子邮件"].str.lower().str.strip() 返回df @asset(key="客户指标") 客户指标(清理后的客户): """汇总客户指标以进行报告。""" 返回 clean_customers.groupby("国家").agg( Total_customers=("customer_id", "count"), avg_lifetime_value=("ltv", "平均值") ).reset_index() # 定义资产存储库 defs = 定义(资产=[raw_customers、cleaned_customers、customer_metrics]) Dagster 自动根据这些函数签名构建依赖关系图。 当请求“cleaned_customers”时,Dagster 知道它必须首先具体化“raw_customers”。 无需显式 DAG 接线。 ### 数据感知调度 Dagster 的调度程序了解数据依赖性,而不仅仅是时间。 可以安排资产: 蟒蛇 从 dagster 导入 AssetSelection、define_asset_job、ScheduleDefinition # 每天早上 6 点(世界标准时间)运行 日常工作 = 定义资产工作( 名称=“daily_customer_pipeline”, 选择=AssetSelection.all() ) daily_schedule = ScheduleDefinition( 工作=每日工作, cron_schedule="0 6 * * *", # 世界标准时间 (UTC) 每天上午 6 点 default_status=DefaultScheduleStatus.RUNNING ) 更重要的是,资产可以通过自动实现策略自动触发下游运行: 蟒蛇 从 dagster 导入 AutoMaterializePolicy @资产( auto_materialize_policy=AutoMaterializePolicy.eager() ) def customer_metrics(cleaned_customers): """只要上游数据发生变化,就会自动重建。""" 返回 clean_customers.groupby("country").agg(...) 通过“AutoMaterializePolicy.eager()”,只要“cleaned_customers”更新,“customer_metrics”就会自动重建,无需手动计划管理。 ### 资产检查和数据质量 Dagster 将数据质量检查融入到资产模型中: ````蟒蛇 从 dagster 导入 asset_check, AssetCheckResult @asset_check(资产=raw_customers) def no_empty_customers(raw_customers): “““验证客户表不为空。””” row_count = len(raw_customers) 返回资产检查结果( 通过=row_count > 0, 元数据={“row_count”: row_count} ) @asset_check(资产=cleaned_customers) def unique_emails(cleaned_customers): “““重复数据删除后验证电子邮件的唯一性。””” duplicated_count = clean_customers[“email”].duplicated().sum() 返回资产检查结果( 通过=duplicate_count == 0, 元数据={“duplicate_emails”:duplicate_count} )

- 点或紫外线
- Docker(用于本地开发UI) ### 第 1 步:安装 Dagster ````
bas
h
# 创建虚拟环境
python -m venv .venv
源 .venv/bin/activate # 安装 Dagster 和网络服务器
pip install dagster dagster-webserver dagster-graphql # 验证安装
达格斯特——版本
# 达格斯特,版本 1.13.2
```` ### 步骤 2:使用 dg CLI 搭建新项目 Dagster 1.13 引入了用于项目脚手架的“dg”CLI: ````
bas
h
# 安装 dg CLI 工具
pip 安装 dagster-dg # 搭建一个新项目
dg 脚手架项目 my_data_platform --python-版本 3.11
cd my_data_platform # 脚手架创建:
# 我的数据平台/
# ├── my_data_platform/
# │ ├── __init__.py
# │ ├── 定义.py
# │ └── 资产.py
# ├── pyproject.toml
# └── setup.py
```` ### 第 3 步:定义您的第一个资产 ````蟒蛇
# my_data_platform/assets.py
从 dagster 导入资产,定义
将 pandas 导入为 pd @资产
def hello_world(): """第一个资产:创建示例数据集。""" 返回 pd.DataFrame({ “名字”:[“爱丽丝”,“鲍勃”,“查理”], “分数”:[85,92,78] }) defs = 定义(资产=[hello_world])
```` ### 步骤 4:启动开发服务器 ````
bas
h
# 从项目根目录开始
达格斯特开发-h 0.0.0.0-p 3000
```` 在浏览器中打开“http://localhost:3000”。 您将看到带有资产图表的 Dagster UI,随时可以实现。 ### 步骤 5:用于生产本地开发的 Docker Compose ````
yam
l
# docker-compose.yml
版本:“3.8”
服务: 达格斯特-postgres: 图片:postgres:15-alpine 环境: POSTGRES_USER:达格斯特 POSTGRES_PASSWORD:达格斯特 POSTGRES_DB:达格斯特 卷: - postgres_data:/var/lib/postgresql/data dagster 守护进程: 建造: 。 命令:dagster-daemon 运行 环境: DAGSTER_POSTGRES_USER:达格斯特 DAGSTER_POSTGRES_PASSWORD:达格斯特 DAGSTER_POSTGRES_DB:达格斯特 DAGSTER_POSTGRES_HOST:dagster-postgres 取决于: - 达格斯特-postgres dagster-网络服务器: 建造: 。 命令:dagster-webserver -h 0.0.0.0 -p 3000 端口: - “3000:3000” 环境: DAGSTER_POSTGRES_USER:达格斯特 DAGSTER_POSTGRES_PASSWORD:达格斯特 DAGSTER_POSTGRES_DB:达格斯特 DAGSTER_POSTGRES_HOST:dagster-postgres 取决于: - 达格斯特-postgres 卷: postgres_数据:
```` 构建并启动: ````
bas
h
docker-compose up --build -d
```` 您的 Dagster 实例现在正在使用持久 PostgreSQL 存储运行,用于存储运行历史记录、事件日志和计划。 ## 与现代数据堆栈集成 ### dbt 集成(一流) Dagster 的 dbt 集成是编排领域最深入的。 资产直接从您的“manifest.json”生成: ````蟒蛇
# 将 dbt 模型集成为 Dagster 资产
从 dagster_dbt 导入 DbtProject、dbt_assets
从 dagster 导入 AssetExecutionContext dbt_project = DbtProject( project_dir="./dbt_project", profile_dir="./dbt_project/profiles"
) @dbt_assets(清单=dbt_project.manifest_path)
def dbt_models(上下文:AssetExecutionContext,dbt:DbtCliResource): """每个 dbt 模型都会自动成为 Dagster 资产。""" 从 dbt.cli(["build"], context=context).stream() 中产生
```` 这为您提供了:列级沿袭、映射到 dbt 测试的资产检查以及分区感知回填 — 所有这些都无需编写任何 YAML 行。 ### 雪花/BigQuery 集成 ````蟒蛇
从 dagster_snowflake 导入 SnowflakeResource
从 dagster 导入资产,定义 @资产
def Snowflake_raw_orders(上下文,雪花:SnowflakeResource): """查询 Snowflake 的原始订单。""" 以 Snowflake.get_connection() 作为 conn: return conn.execute("SELECT * FROM RAW.ORDERS").fetch_pandas_all() defs = 定义( 资产=[snowflake_raw_orders], 资源={ “雪花”:雪花资源( 帐户=“xyz123”, 用户=“ETL_USER”, 密码={"env": "SNOWFLAKE_PASSWORD"}, 数据库=“分析”, 仓库=“ETL_WH” ) }
)
```` ### Airbyte / Fivetran 同步触发器 ````蟒蛇
从 dagster_airbyte 导入 AirbyteResource,sync_assets 空气字节 = 空气字节资源( 主机=“本地主机”, 端口=“8000”, 用户名=“airbyte”, 密码={"env": "AIRBYTE_PASSWORD"}
) # 从 Airbyte 连接生成资产
空气字节_资产=同步_资产( 连接id =“123e4567-e89b-12d3-a456-426614174000”, 空字节=空字节
)
```` ### 集成汇总表 | 工具| 集成类型| 主要特点|
|

💬 留言讨论