213 lines
9 KiB
Python
213 lines
9 KiB
Python
import pendulum
|
|
|
|
from airflow.sdk import dag, task, Connection
|
|
from stackit_workflows.airflow_plugin.decorators import stackit
|
|
from stackit_workflows import config
|
|
from kubernetes.client import models as K8S
|
|
|
|
default_kwargs = {
|
|
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.6-0.4.0rc1"
|
|
}
|
|
|
|
# SNA / "Data Platform Innovation Team" Instances setup
|
|
# Airflow Connection "lakehouse-rest" must be configured for Dremio 26 / Polaris:
|
|
# host = https://dremio-internal-v26.… (Dremio/OAuth host)
|
|
# password = <Dremio PAT (Personal Access Token)> or <Password>
|
|
# extras:
|
|
# token_endpoint = https://<host>/oauth/token (optional, derived from host if absent)
|
|
# warehouse = default (optional, defaults to "default")
|
|
# catalog_name = stackit (optional, defaults to conn_id)
|
|
# catalog_uri= https://iceberg-catalog-internal-v26.…/api/catalog for Dremio 26+ or
|
|
# https://iceberg-catalog-internal-v26.…/api/v3/iceberg for Dremio 25 and Nessie
|
|
# Note: catalog_uri *must* be set. It will otherwise default to the setting for Dremio-as-a-Service
|
|
# which does *not* work for SNA instances
|
|
try:
|
|
lakehouse_connection = Connection.get("lakehouse-rest")
|
|
except Exception:
|
|
import warnings
|
|
warnings.warn(
|
|
"Airflow connection 'lakehouse-rest' is not configured. "
|
|
"This DAG will be visible but cannot be run until the connection is created. "
|
|
"Please create an Airflow connection with conn_id='lakehouse-rest' — "
|
|
"see the comments at the top of this file for the required fields.",
|
|
stacklevel=2,
|
|
)
|
|
lakehouse_connection = None
|
|
|
|
|
|
@dag(
|
|
schedule=None,
|
|
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
|
catchup=False,
|
|
tags=["demo","stackit-demo"],
|
|
dag_id="stackit_06_lakehouse",
|
|
)
|
|
def lakehouse():
|
|
|
|
# When using the stackit operator or decorator, lakehouse connections can be passed as a list.
|
|
# Spark is pre-configured to use the specified lakehouse connection.
|
|
# Check the documentation or the 05-spark-full-configuration.py dag for more information.
|
|
@stackit.spark_kubernetes_task(
|
|
**default_kwargs, dremio_connections=["lakehouse-rest"]
|
|
)
|
|
def write_to_lakehouse():
|
|
import stackit_spark
|
|
import pandas as pd
|
|
import os
|
|
|
|
for k, v in os.environ.items():
|
|
print(f"{k}: {v}")
|
|
|
|
catalog_name_in_spark = "stackit"
|
|
target_table = "DEMO.user"
|
|
|
|
spark = stackit_spark.get_spark()
|
|
spark.sql(f"USE `{catalog_name_in_spark}`")
|
|
|
|
spark.sql("CREATE NAMESPACE IF NOT EXISTS DEMO")
|
|
data = pd.DataFrame([[1, "Alice"], [2, "Bob"]], columns=["id", "name"])
|
|
sdf = spark.createDataFrame(data)
|
|
sdf.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
|
|
|
|
return target_table
|
|
|
|
@stackit.spark_kubernetes_task(
|
|
**default_kwargs, dremio_connections=["lakehouse-rest"]
|
|
)
|
|
def read_from_lakehouse(target_table):
|
|
import stackit_spark
|
|
|
|
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
|
|
catalog_name_in_spark = "stackit"
|
|
spark = stackit_spark.get_spark()
|
|
spark.sql(f"USE `{catalog_name_in_spark}`")
|
|
|
|
sdf = spark.sql(f"SELECT * FROM {target_table}")
|
|
sdf.show()
|
|
|
|
@stackit.spark_kubernetes_task(
|
|
**default_kwargs,
|
|
dremio_connections=["lakehouse-rest"],
|
|
ephemeral_gb=1,
|
|
executor_ephemeral_gb=2,
|
|
cpu=1,
|
|
executor_cpu=1,
|
|
executors=2,
|
|
)
|
|
def ephemeral_volumes_cluster(target_table):
|
|
import stackit_spark
|
|
import time
|
|
import subprocess
|
|
|
|
print(subprocess.run(["df", "-h"], capture_output=True, text=True).stdout)
|
|
|
|
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
|
|
catalog_name_in_spark = "stackit"
|
|
spark = stackit_spark.get_spark()
|
|
spark.sql(f"USE `{catalog_name_in_spark}`")
|
|
|
|
sdf = spark.sql(f"SELECT * FROM {target_table}")
|
|
sdf.show()
|
|
time.sleep(60)
|
|
print("Done!")
|
|
|
|
@stackit.spark_kubernetes_task(
|
|
**default_kwargs,
|
|
dremio_connections=["lakehouse-rest"],
|
|
ephemeral_gb=1,
|
|
cpu=1,
|
|
)
|
|
def ephemeral_volumes_driver(target_table):
|
|
import stackit_spark
|
|
import time
|
|
import subprocess
|
|
|
|
print(subprocess.run(["df", "-h"], capture_output=True, text=True).stdout)
|
|
|
|
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
|
|
catalog_name_in_spark = "stackit"
|
|
spark = stackit_spark.get_spark()
|
|
spark.sql(f"USE `{catalog_name_in_spark}`")
|
|
|
|
sdf = spark.sql(f"SELECT * FROM {target_table}")
|
|
sdf.show()
|
|
time.sleep(60)
|
|
print("Done!")
|
|
|
|
@stackit.spark_kubernetes_task(
|
|
**default_kwargs,
|
|
dremio_connections=["lakehouse-rest"],
|
|
)
|
|
def read_from_lakehouse_long_running(target_table):
|
|
import stackit_spark
|
|
import time
|
|
|
|
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
|
|
catalog_name_in_spark = "stackit"
|
|
spark = stackit_spark.get_spark()
|
|
spark.sql(f"USE `{catalog_name_in_spark}`")
|
|
|
|
sdf = spark.sql(f"SELECT * FROM {target_table}")
|
|
sdf.show()
|
|
|
|
# Re-run every minute and show that its still alive vor 70 Minutes
|
|
for i in range(3 * 70):
|
|
time.sleep(60)
|
|
print(f"Running for {i} minutes", flush=True)
|
|
sdf = spark.sql(f"SELECT * FROM {target_table}")
|
|
sdf.show()
|
|
|
|
@stackit.spark_kubernetes_task(**default_kwargs)
|
|
def read_from_lakehouse_manual(target_table, host, lh_tokenendpoint, lh_password, lh_catalog_uri):
|
|
import stackit_spark
|
|
|
|
# Dremio 26 / Polaris: use the OAuth2Manager for token exchange.
|
|
# This requires the authmgr-oauth2-runtime JAR in addition to the Iceberg JARs.
|
|
# The token endpoint and catalog URI come from the 'lakehouse-rest' connection extras:
|
|
# token_endpoint = https://<dremio-host>/oauth/token
|
|
# catalog_uri = https://<iceberg-catalog-host>/api/catalog
|
|
catalog_name_in_spark = "stackit"
|
|
spark = stackit_spark.get_spark(
|
|
additional_config={
|
|
"spark.jars.packages": (
|
|
"org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,"
|
|
"org.apache.iceberg:iceberg-aws-bundle:1.10.1,"
|
|
"com.dremio.iceberg.authmgr:authmgr-oauth2-runtime:1.0.0"
|
|
),
|
|
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}": "org.apache.iceberg.spark.SparkCatalog",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.cache-enabled": "false",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.type": "rest",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.warehouse": "default",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.uri": lh_catalog_uri,
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.header.X-Iceberg-Access-Delegation": "vended-credentials",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.type": "com.dremio.iceberg.authmgr.oauth2.OAuth2Manager",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-endpoint": lh_tokenendpoint,
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.grant-type": "urn:ietf:params:oauth:grant-type:token-exchange",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.client-id": "dremio",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.client-auth": "none",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.scope": "dremio.all",
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-exchange.subject-token": lh_password,
|
|
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-exchange.subject-token-type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token",
|
|
}
|
|
)
|
|
|
|
spark.sql(f"USE {catalog_name_in_spark}")
|
|
|
|
sdf = spark.sql(f"SELECT * FROM {target_table}")
|
|
sdf.show()
|
|
|
|
lh_host = lakehouse_connection.host if lakehouse_connection else None
|
|
lh_password = lakehouse_connection.password if lakehouse_connection else None
|
|
lh_tokenendpoint = lakehouse_connection.extra_dejson.get("token_endpoint") if lakehouse_connection else None
|
|
lh_catalog_uri = lakehouse_connection.extra_dejson.get("catalog_uri") if lakehouse_connection else None
|
|
|
|
target_table = write_to_lakehouse()
|
|
ephemeral_volumes_cluster(target_table)
|
|
ephemeral_volumes_driver(target_table)
|
|
read_from_lakehouse_long_running(target_table)
|
|
read_from_lakehouse(target_table)
|
|
read_from_lakehouse_manual(target_table, lh_host, lh_tokenendpoint, lh_password, lh_catalog_uri)
|
|
|
|
|
|
lakehouse()
|