141 lines
5.4 KiB
Python
141 lines
5.4 KiB
Python
"""
|
||
Demo: Data Assets (formerly "Datasets") in Apache Airflow 3
|
||
|
||
Data Assets let you create data-driven scheduling: one DAG produces/updates
|
||
an asset, and another DAG is automatically triggered when that asset changes.
|
||
|
||
This file defines:
|
||
- A "producer" DAG that updates two data assets
|
||
- A "consumer" DAG that is scheduled to run whenever BOTH assets are updated
|
||
- A "single asset consumer" DAG triggered by a single asset update
|
||
|
||
Key concepts:
|
||
- Asset: Represents a logical data artifact (file, table, topic, etc.)
|
||
- schedule=<Asset(...)>: Triggers the DAG when the asset is updated
|
||
- outlet: Marks a task as producing/updating an asset
|
||
|
||
Note on Data Assets vs. S3/external Triggers:
|
||
- Data Assets are for Airflow-internal coordination: DAG A produces data,
|
||
DAG B reacts. No polling, no cloud API calls, zero cost.
|
||
- S3KeySensor / external triggers are for watching files created by
|
||
processes OUTSIDE of Airflow (e.g., a third-party ETL tool drops a file
|
||
into S3). Those do involve polling and incur API costs.
|
||
|
||
See: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html
|
||
"""
|
||
|
||
import json
|
||
import pendulum
|
||
|
||
from airflow.sdk import Asset, dag, task
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Data Assets
|
||
#
|
||
# Assets are identified by a URI. The URI is purely a LOGICAL LABEL –
|
||
# Airflow never actually connects to, reads, or polls this URI!
|
||
# It is only used as a key to link producer tasks to consumer DAGs.
|
||
#
|
||
# When a producer task (with outlets=[...]) completes successfully, the
|
||
# Airflow scheduler internally records "this asset was updated" in its
|
||
# metadata database. Consumer DAGs scheduled on that asset are then
|
||
# triggered automatically – entirely internal, zero external API calls,
|
||
# no polling, no cloud costs.
|
||
#
|
||
# The URI could just as well be "my-orders" or "daily-report" – using
|
||
# URIs like "s3://..." is just a convention to document where the data
|
||
# actually lives, but Airflow itself ignores the scheme/path.
|
||
# ---------------------------------------------------------------------------
|
||
orders_asset = Asset("s3://my-stackit-bucket/orders/daily.parquet")
|
||
customers_asset = Asset("s3://my-stackit-bucket/customers/latest.csv")
|
||
|
||
|
||
# ===========================================================================
|
||
# Producer DAG – updates both assets
|
||
# ===========================================================================
|
||
@dag(
|
||
schedule=None, # triggered manually or by an external system
|
||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||
catchup=False,
|
||
tags=["demo", "data-assets"],
|
||
dag_id="stackit_15a_data_assets_producer",
|
||
)
|
||
def data_assets_producer():
|
||
@task(outlets=[orders_asset])
|
||
def produce_orders():
|
||
"""Simulate producing order data and updating the orders asset."""
|
||
orders = [
|
||
{"order_id": 1001, "amount": 250.00, "customer": "Alice"},
|
||
{"order_id": 1002, "amount": 125.50, "customer": "Bob"},
|
||
{"order_id": 1003, "amount": 340.75, "customer": "Charlie"},
|
||
]
|
||
# In a real scenario you would write to S3, a database, etc.
|
||
print(f"Produced {len(orders)} orders:")
|
||
print(json.dumps(orders, indent=2))
|
||
return orders
|
||
|
||
@task(outlets=[customers_asset])
|
||
def produce_customers():
|
||
"""Simulate producing customer data and updating the customers asset."""
|
||
customers = [
|
||
{"name": "Alice", "email": "alice@example.com"},
|
||
{"name": "Bob", "email": "bob@example.com"},
|
||
{"name": "Charlie", "email": "charlie@example.com"},
|
||
]
|
||
print(f"Produced {len(customers)} customers:")
|
||
print(json.dumps(customers, indent=2))
|
||
return customers
|
||
|
||
produce_orders() >> produce_customers()
|
||
|
||
|
||
data_assets_producer()
|
||
|
||
|
||
# ===========================================================================
|
||
# Consumer DAG 1 – triggered when the orders asset is updated
|
||
# ===========================================================================
|
||
@dag(
|
||
schedule=orders_asset, # runs whenever orders_asset is updated
|
||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||
catchup=False,
|
||
tags=["demo", "data-assets"],
|
||
dag_id="stackit_15b_data_assets_consumer_orders",
|
||
)
|
||
def data_assets_consumer_orders():
|
||
@task()
|
||
def process_orders():
|
||
"""React to new order data being available."""
|
||
print("New order data detected! Running order processing pipeline...")
|
||
print("Aggregating daily totals...")
|
||
print("Order processing complete.")
|
||
|
||
process_orders()
|
||
|
||
|
||
data_assets_consumer_orders()
|
||
|
||
|
||
# ===========================================================================
|
||
# Consumer DAG 2 – triggered when BOTH assets have been updated
|
||
# ===========================================================================
|
||
@dag(
|
||
schedule=(orders_asset & customers_asset), # requires BOTH assets to be updated
|
||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||
catchup=False,
|
||
tags=["demo", "data-assets"],
|
||
dag_id="stackit_15c_data_assets_consumer_combined",
|
||
)
|
||
def data_assets_consumer_combined():
|
||
@task()
|
||
def join_orders_and_customers():
|
||
"""React when both orders and customers data are refreshed."""
|
||
print("Both orders AND customers data updated!")
|
||
print("Joining datasets for the combined analytics report...")
|
||
print("Combined report generated successfully.")
|
||
|
||
join_orders_and_customers()
|
||
|
||
|
||
data_assets_consumer_combined()
|