import pendulum from airflow.sdk import dag, task, Connection from stackit_workflows.airflow_plugin.decorators import stackit from stackit_workflows import config from kubernetes.client import models as K8S default_kwargs = { "image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.6-0.4.0rc1" } # Dremio-as-a-service setup # Airflow Connection "lakehouse-rest-daas" must be configured for Dremio 26 / Polaris: # host = https://dremio-internal-v26.… (Dremio/OAuth host) # password = or # extras: # token_endpoint = https:///oauth/token (optional, derived from host if absent) # warehouse = default (optional, defaults to "default") # catalog_name = stackit (optional, defaults to conn_id) # catalog_uri = https://catalog.$host/ (optional, derived from host if absent) try: lakehouse_connection = Connection.get("lakehouse-rest-daas") except Exception: import warnings warnings.warn( "Airflow connection 'lakehouse-rest-daas' is not configured. " "This DAG will be visible but cannot be run until the connection is created. " "Please create an Airflow connection with conn_id='lakehouse-rest-daas' — " "see the comments at the top of this file for the required fields.", stacklevel=2, ) lakehouse_connection = None @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["demo","stackit-demo"], dag_id="stackit_06daas_lakehouse", ) def lakehouse(): # When using the stackit operator or decorator, lakehouse connections can be passed as a list. # Spark is pre-configured to use the specified lakehouse connection. # Check the documentation or the 05-spark-full-configuration.py dag for more information. @stackit.spark_kubernetes_task( **default_kwargs, dremio_connections=["lakehouse-rest-daas"] ) def write_to_lakehouse(): import stackit_spark import pandas as pd import os for k, v in os.environ.items(): print(f"{k}: {v}") catalog_name_in_spark = "stackit" target_table = "DEMO.user" spark = stackit_spark.get_spark() spark.sql(f"USE `{catalog_name_in_spark}`") spark.sql("CREATE NAMESPACE IF NOT EXISTS DEMO") data = pd.DataFrame([[1, "Alice"], [2, "Bob"]], columns=["id", "name"]) sdf = spark.createDataFrame(data) sdf.write.format("iceberg").mode("overwrite").saveAsTable(target_table) return target_table @stackit.spark_kubernetes_task( **default_kwargs, dremio_connections=["lakehouse-rest-daas"] ) def read_from_lakehouse(target_table): import stackit_spark # Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog catalog_name_in_spark = "stackit" spark = stackit_spark.get_spark() spark.sql(f"USE `{catalog_name_in_spark}`") sdf = spark.sql(f"SELECT * FROM {target_table}") sdf.show() @stackit.spark_kubernetes_task( **default_kwargs, dremio_connections=["lakehouse-rest-daas"], ephemeral_gb=1, executor_ephemeral_gb=2, cpu=1, executor_cpu=1, executors=2, ) def ephemeral_volumes_cluster(target_table): import stackit_spark import time import subprocess print(subprocess.run(["df", "-h"], capture_output=True, text=True).stdout) # Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog catalog_name_in_spark = "stackit" spark = stackit_spark.get_spark() spark.sql(f"USE `{catalog_name_in_spark}`") sdf = spark.sql(f"SELECT * FROM {target_table}") sdf.show() time.sleep(60) print("Done!") @stackit.spark_kubernetes_task( **default_kwargs, dremio_connections=["lakehouse-rest-daas"], ephemeral_gb=1, cpu=1, ) def ephemeral_volumes_driver(target_table): import stackit_spark import time import subprocess print(subprocess.run(["df", "-h"], capture_output=True, text=True).stdout) # Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog catalog_name_in_spark = "stackit" spark = stackit_spark.get_spark() spark.sql(f"USE `{catalog_name_in_spark}`") sdf = spark.sql(f"SELECT * FROM {target_table}") sdf.show() time.sleep(60) print("Done!") @stackit.spark_kubernetes_task( **default_kwargs, dremio_connections=["lakehouse-rest-daas"], ) def read_from_lakehouse_long_running(target_table): import stackit_spark import time # Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog catalog_name_in_spark = "stackit" spark = stackit_spark.get_spark() spark.sql(f"USE `{catalog_name_in_spark}`") sdf = spark.sql(f"SELECT * FROM {target_table}") sdf.show() # Re-run every minute and show that its still alive vor 70 Minutes for i in range(3 * 70): time.sleep(60) print(f"Running for {i} minutes", flush=True) sdf = spark.sql(f"SELECT * FROM {target_table}") sdf.show() @stackit.spark_kubernetes_task(**default_kwargs) def read_from_lakehouse_manual(target_table, host, lh_tokenendpoint, lh_password, lh_catalog_uri): import stackit_spark # Dremio 26 / Polaris: use the OAuth2Manager for token exchange. # This requires the authmgr-oauth2-runtime JAR in addition to the Iceberg JARs. # The token endpoint and catalog URI come from the 'lakehouse-rest-daas' connection extras: # token_endpoint = https:///oauth/token # catalog_uri = https:///api/catalog catalog_name_in_spark = "stackit" spark = stackit_spark.get_spark( additional_config={ "spark.jars.packages": ( "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1," "org.apache.iceberg:iceberg-aws-bundle:1.10.1," "com.dremio.iceberg.authmgr:authmgr-oauth2-runtime:1.0.0" ), "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", f"spark.sql.catalog.{catalog_name_in_spark}": "org.apache.iceberg.spark.SparkCatalog", f"spark.sql.catalog.{catalog_name_in_spark}.cache-enabled": "false", f"spark.sql.catalog.{catalog_name_in_spark}.type": "rest", f"spark.sql.catalog.{catalog_name_in_spark}.warehouse": "default", f"spark.sql.catalog.{catalog_name_in_spark}.uri": lh_catalog_uri, f"spark.sql.catalog.{catalog_name_in_spark}.header.X-Iceberg-Access-Delegation": "vended-credentials", f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.type": "com.dremio.iceberg.authmgr.oauth2.OAuth2Manager", f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-endpoint": lh_tokenendpoint, f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.grant-type": "urn:ietf:params:oauth:grant-type:token-exchange", f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.client-id": "dremio", f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.client-auth": "none", f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.scope": "dremio.all", f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-exchange.subject-token": lh_password, f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-exchange.subject-token-type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token", } ) spark.sql(f"USE {catalog_name_in_spark}") sdf = spark.sql(f"SELECT * FROM {target_table}") sdf.show() lh_host = lakehouse_connection.host if lakehouse_connection else None lh_password = lakehouse_connection.password if lakehouse_connection else None lh_tokenendpoint = lakehouse_connection.extra_dejson.get("token_endpoint") if lakehouse_connection else None lh_catalog_uri = lakehouse_connection.extra_dejson.get("catalog_uri") if lakehouse_connection else None target_table = write_to_lakehouse() ephemeral_volumes_cluster(target_table) ephemeral_volumes_driver(target_table) read_from_lakehouse_long_running(target_table) read_from_lakehouse(target_table) read_from_lakehouse_manual(target_table, lh_host, lh_tokenendpoint, lh_password, lh_catalog_uri) lakehouse()