workflows-example-dags/Demo/stackit-09-dbt.py
2026-05-28 17:44:11 +02:00

68 lines
2.6 KiB
Python

"""
An example DAG that uses Cosmos to render a dbt project into an Airflow DAG.
"""
import os
from datetime import datetime
from pathlib import Path
from cosmos import DbtDag, ProfileConfig, ProjectConfig, ExecutionConfig
from cosmos.profiles import BaseProfileMapping
from airflow.sdk import Connection
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dbt-demo"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
# Retrieve Dremio Credentials
# Airflow Connection "dbt_dremio" must be configured:
# host = <Dremio host URL>
# login = <Dremio username>
# password = <Dremio PAT (Personal Access Token)> or <Password>
# extras:
# dremio_space = <dbt target space in Dremio>
# dremio_space_folder = <folder within the space>
try:
dremio_connection = Connection.get("dbt_dremio")
os.environ["DREMIO_PAT"] = dremio_connection.password
os.environ["DREMIO_HOST"] = dremio_connection.host
os.environ["DREMIO_USER"] = dremio_connection.login
os.environ["DREMIO_SPACE"] = dremio_connection.extra_dejson["dremio_space"]
os.environ["DREMIO_FOLDER"] = dremio_connection.extra_dejson["dremio_space_folder"]
profile_config = ProfileConfig(
profile_name="dbt_demo",
target_name="dev",
profiles_yml_filepath= "/opt/airflow/dags/repo/dbt-demo/dbt_demo/profiles.yml",
)
basic_cosmos_dag = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "dbt_demo",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
dbt_executable_path=f"/app/packages/stackit-workflows/dbt_venv/bin/dbt",
),
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
},
# normal dag parameters
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["demo", "stackit-demo"],
dag_id="stackit_09_dbt_dremio",
default_args={"retries": 2},
)
except Exception:
import warnings
warnings.warn(
"Airflow connection 'dbt_dremio' is not configured. "
"DAG '09_dbt_dremio' will not be registered until the connection is created. "
"Please create an Airflow connection with conn_id='dbt_dremio' with the following fields: "
"host (Dremio host URL), login (username), password (PAT), "
"and extras: dremio_space, dremio_space_folder.",
stacklevel=2,
)