workflows-example-dags/Demo/stackit-01-taskflow.py
2026-05-28 17:44:11 +02:00

40 lines
940 B
Python

import json
import pendulum
from airflow.sdk import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_01_taskflow",
)
def taskflow_api():
@task()
def extract():
data_string = '{"1001": 301.38, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_api()