""" Demo: Data Assets (formerly "Datasets") in Apache Airflow 3 Data Assets let you create data-driven scheduling: one DAG produces/updates an asset, and another DAG is automatically triggered when that asset changes. This file defines: - A "producer" DAG that updates two data assets - A "consumer" DAG that is scheduled to run whenever BOTH assets are updated - A "single asset consumer" DAG triggered by a single asset update Key concepts: - Asset: Represents a logical data artifact (file, table, topic, etc.) - schedule=: Triggers the DAG when the asset is updated - outlet: Marks a task as producing/updating an asset Note on Data Assets vs. S3/external Triggers: - Data Assets are for Airflow-internal coordination: DAG A produces data, DAG B reacts. No polling, no cloud API calls, zero cost. - S3KeySensor / external triggers are for watching files created by processes OUTSIDE of Airflow (e.g., a third-party ETL tool drops a file into S3). Those do involve polling and incur API costs. See: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html """ import json import pendulum from airflow.sdk import Asset, dag, task # --------------------------------------------------------------------------- # Data Assets # # Assets are identified by a URI. The URI is purely a LOGICAL LABEL – # Airflow never actually connects to, reads, or polls this URI! # It is only used as a key to link producer tasks to consumer DAGs. # # When a producer task (with outlets=[...]) completes successfully, the # Airflow scheduler internally records "this asset was updated" in its # metadata database. Consumer DAGs scheduled on that asset are then # triggered automatically – entirely internal, zero external API calls, # no polling, no cloud costs. # # The URI could just as well be "my-orders" or "daily-report" – using # URIs like "s3://..." is just a convention to document where the data # actually lives, but Airflow itself ignores the scheme/path. # --------------------------------------------------------------------------- orders_asset = Asset("s3://my-stackit-bucket/orders/daily.parquet") customers_asset = Asset("s3://my-stackit-bucket/customers/latest.csv") # =========================================================================== # Producer DAG – updates both assets # =========================================================================== @dag( schedule=None, # triggered manually or by an external system start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["demo", "data-assets"], dag_id="stackit_15a_data_assets_producer", ) def data_assets_producer(): @task(outlets=[orders_asset]) def produce_orders(): """Simulate producing order data and updating the orders asset.""" orders = [ {"order_id": 1001, "amount": 250.00, "customer": "Alice"}, {"order_id": 1002, "amount": 125.50, "customer": "Bob"}, {"order_id": 1003, "amount": 340.75, "customer": "Charlie"}, ] # In a real scenario you would write to S3, a database, etc. print(f"Produced {len(orders)} orders:") print(json.dumps(orders, indent=2)) return orders @task(outlets=[customers_asset]) def produce_customers(): """Simulate producing customer data and updating the customers asset.""" customers = [ {"name": "Alice", "email": "alice@example.com"}, {"name": "Bob", "email": "bob@example.com"}, {"name": "Charlie", "email": "charlie@example.com"}, ] print(f"Produced {len(customers)} customers:") print(json.dumps(customers, indent=2)) return customers produce_orders() >> produce_customers() data_assets_producer() # =========================================================================== # Consumer DAG 1 – triggered when the orders asset is updated # =========================================================================== @dag( schedule=orders_asset, # runs whenever orders_asset is updated start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["demo", "data-assets"], dag_id="stackit_15b_data_assets_consumer_orders", ) def data_assets_consumer_orders(): @task() def process_orders(): """React to new order data being available.""" print("New order data detected! Running order processing pipeline...") print("Aggregating daily totals...") print("Order processing complete.") process_orders() data_assets_consumer_orders() # =========================================================================== # Consumer DAG 2 – triggered when BOTH assets have been updated # =========================================================================== @dag( schedule=(orders_asset & customers_asset), # requires BOTH assets to be updated start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["demo", "data-assets"], dag_id="stackit_15c_data_assets_consumer_combined", ) def data_assets_consumer_combined(): @task() def join_orders_and_customers(): """React when both orders and customers data are refreshed.""" print("Both orders AND customers data updated!") print("Joining datasets for the combined analytics report...") print("Combined report generated successfully.") join_orders_and_customers() data_assets_consumer_combined()