Initial commit

This commit is contained in:
Tim_Doernemann 2026-05-28 17:44:11 +02:00
parent d4028fca11
commit a578239c4f
32 changed files with 2559 additions and 0 deletions

View file

@ -0,0 +1,8 @@
# Print current date
import datetime
# You can libraries baked into your own Docker image here!
# import awesome_lib
print("Hello from basic_python_script.py")
print(f"The current date is {datetime.datetime.now()}")

View file

@ -0,0 +1,42 @@
"""
Copyright (C) 2017-2021 Dremio Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from dremio.arguments.parse import options_default_validator
from dremio.flight.endpoint import DremioFlightEndpoint
def execute_query(dremio_host: str, dremio_port: int, username: str, token: str, query: str):
args = {
'hostname': dremio_host,
'port': dremio_port,
'tls': True,
'username': username,
'token': token,
'query': query
}
config = options_default_validator["default"] | args
# Instantiate DremioFlightEndpoint object
dremio_flight_endpoint = DremioFlightEndpoint(config)
# Connect to Dremio Arrow Flight server endpoint.
flight_client = dremio_flight_endpoint.connect()
# Get reader
reader = dremio_flight_endpoint.get_reader(flight_client)
# Print out the data as a dataframe
print(reader.read_pandas())

View file

@ -0,0 +1,9 @@
import stackit_spark
import time
import pandas as pd
spark = stackit_spark.get_spark()
data = pd.DataFrame({"number": [10]})
df = spark.createDataFrame(data)
time.sleep(30)
df.show()

View file

@ -0,0 +1,10 @@
import stackit_spark
import pandas as pd
from my_tools.say_hello import say_hello
say_hello()
spark = stackit_spark.get_spark()
data = pd.DataFrame({"number": [10]})
df = spark.createDataFrame(data)
df.show()

View file

@ -0,0 +1,72 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from stackit_spark import get_spark\n",
"\n",
"from my_tools.catalog_spark import get_nessie_token\n",
"\n",
"if \"STACKIT__PAPERMILL\" in os.environ:\n",
" # Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog\n",
" catalog_name_in_spark = \"stackit\"\n",
" spark = get_spark()\n",
" spark.sql(f\"USE {catalog_name_in_spark}\")\n",
"\n",
" sdf = spark.sql(f\"SELECT * FROM DEMO.user\")\n",
" sdf.show()\n",
" \n",
"else:\n",
" tokenendpoint = os.environ[\"TOKEN_ENDPOINT\"]\n",
" catalogendpoint = os.environ[\"CATALOG_ENDPOINT\"]\n",
" password = os.environ[\"DREMIO_PAT\"]\n",
"\n",
"\n",
" nessie_token = get_nessie_token(tokenendpoint, password)\n",
"\n",
" # Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog\n",
" catalog_name_in_spark = \"stackit\"\n",
" spark = get_spark(\n",
" additional_config={\n",
" \"spark.jars.packages\": \"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1\",\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}\": \"org.apache.iceberg.spark.SparkCatalog\",\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}.type\": \"rest\",\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}.warehouse\": \"catalog-s3\",\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}.uri\": catalogendpoint,\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}.token\": nessie_token,\n",
" }\n",
" )\n",
"\n",
" spark.sql(f\"USE {catalog_name_in_spark}\")\n",
"\n",
" sdf = spark.sql(f\"SELECT * FROM DEMO.user\")\n",
" sdf.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "stackit-papermill",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View file

@ -0,0 +1,35 @@
def get_nessie_token(tokenendpoint, password):
import requests
# Exchange Dremio PAT to Nessie token
token_request_body = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"scope": "dremio.all",
"subject_token_type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token",
"subject_token": password,
}
x = requests.post(tokenendpoint, data=token_request_body)
x.raise_for_status()
return x.json()["access_token"]
def get_spark_session(host, nessie_token):
import stackit_spark
catalog_name_in_spark = "stackit"
return stackit_spark.get_spark(
additional_config={
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1",
f"spark.sql.catalog.{catalog_name_in_spark}": "org.apache.iceberg.spark.SparkCatalog",
f"spark.sql.catalog.{catalog_name_in_spark}.type": "rest",
f"spark.sql.catalog.{catalog_name_in_spark}.uri": host,
f"spark.sql.catalog.{catalog_name_in_spark}.token": nessie_token,
}
)
if __name__ == "__main__":
get_nessie_token(
"https://dremio-internal.data-platform-dev.stackit.run/oauth/token",
"xxxxx",
)

View file

@ -0,0 +1,65 @@
def read_and_write_data():
import stackit_spark
import pandas as pd
import os
import boto3
import pyarrow.parquet as pq
from io import BytesIO
# Initialize the S3 client
s3 = boto3.client(
's3',
aws_access_key_id='J902D929PQ1BC1HG93ZS',
aws_secret_access_key='J7x29/4s6eDQ0T1UywB8xo1byWTetwXrCdvYoudH',
endpoint_url='https://object.storage.eu01.onstackit.cloud'
)
bucket_name = 'data-tpcds'
folder_names = [
's1000/time_dim',
's1000/item',
's1000/date_dim',
's1000/customer_demographics',
's1000/web_sales_',
's1000/customer1'
]
result_dfs = {}
spark = stackit_spark.get_spark()
spark.sql("USE lakehouse")
spark.sql("CREATE NAMESPACE IF NOT EXISTS DEMO_USECASE_AIRFLOW")
for folder_name in folder_names:
objs = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)['Contents']
df_list = []
for obj in objs:
key = obj['Key']
try:
# Get object from S3
response = s3.get_object(Bucket=bucket_name, Key=key)
body = response['Body'].read() # Read the object's bytes
# Read Parquet file using PyArrow
table = pq.read_table(BytesIO(body))
df_part = table.to_pandas()
df_list.append(df_part)
except Exception as e:
print(f"Error processing file {key} in folder {folder_name}: {e}")
# Concatenate the DataFrames for the current folder
if df_list:
df = pd.concat(df_list, ignore_index=True)
result_dfs[folder_name] = df
else:
result_dfs[folder_name] = None
for df in result_dfs:
table_name=f"DEMO_USECASE_AIRFLOW.{df.replace('/', '_')}"
spark_df = spark.createDataFrame(result_dfs[df])
# Save Spark DataFrame as an Iceberg table
spark_df.write.mode("overwrite").saveAsTable(table_name)
read_and_write_data()

View file

@ -0,0 +1,65 @@
def read_and_write_data():
import stackit_spark
import pandas as pd
import os
import boto3
import pyarrow.parquet as pq
from io import BytesIO
# Initialize the S3 client
s3 = boto3.client(
's3',
aws_access_key_id='J902D929PQ1BC1HG93ZS',
aws_secret_access_key='J7x29/4s6eDQ0T1UywB8xo1byWTetwXrCdvYoudH',
endpoint_url='https://object.storage.eu01.onstackit.cloud'
)
bucket_name = 'data-tpcds'
folder_name = 's1000/customer_address'
objs = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)['Contents']
objs = objs[:1]
df_list = []
for obj in objs:
key = obj['Key']
# Get object from S3
response = s3.get_object(Bucket=bucket_name, Key=key)
body = response['Body'].read() # Read the object's bytes
table = pq.read_table(BytesIO(body))
df_part = table.to_pandas()
df_list.append(df_part)
df = pd.concat(df_list, ignore_index=True)
spark = stackit_spark.get_spark()
spark.sql("USE lakehouse")
spark.sql("CREATE NAMESPACE IF NOT EXISTS DEMO_USECASE_AIRFLOW")
spark_df = spark.createDataFrame(df)
# Save Spark DataFrame as an Iceberg table
spark_df.write.mode("overwrite").saveAsTable("DEMO_USECASE_AIRFLOW.customer_address")
if __name__ == "__main__":
#from dotenv import load_dotenv
import os
#load_dotenv()
#for key, value in os.environ.items():
# print(f'{key}: {value}')
os.environ['STACKIT__SPARK__SQL__CATALOG__LAKEHOUSE__S3__ACCESS-KEY-ID'] = 'DJ6STN2PRVEH6XIMP56V'
os.environ['STACKIT__SPARK__SQL__CATALOG__LAKEHOUSE__CATALOG-IMPL'] = 'org.apache.iceberg.nessie.NessieCatalog'
os.environ['STACKIT__SPARK__SQL__CATALOG__LAKEHOUSE__S3__SECRET-ACCESS-KEY'] = 'nG/AheODBfMcEhZL/cR+BQrQmO79Hia7nqweMu+n'
os.environ['STACKIT__SPARK__SQL__CATALOG__LAKEHOUSE__IO-IMPL'] = 'org.apache.iceberg.aws.s3.S3FileIO'
os.environ['STACKIT__SPARK__SQL__CATALOG__LAKEHOUSE'] = 'org.apache.iceberg.spark.SparkCatalog'
os.environ['STACKIT__SPARK__SQL__CATALOG__LAKEHOUSE__URI'] = 'http://nessie-internal.nessie-ns.svc.cluster.local:19120/api/v1'
os.environ['STACKIT__SPARK__SQL__CATALOG__LAKEHOUSE__S3__ENDPOINT'] = 'https://object.storage.eu01.onstackit.cloud'
os.environ['STACKIT__SPARK__SQL__CATALOG__LAKEHOUSE__WAREHOUSE'] = 's3://data-platform-playground-internal/warehouse'
read_and_write_data()

View file

@ -0,0 +1,2 @@
def say_hello():
print("Hello from say_hello()")

View file

@ -0,0 +1,182 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "8aae8ae7-4cb4-4b66-b325-aa3eabdeb455",
"metadata": {},
"outputs": [],
"source": [
"run_date = \"2025-05-08\"\n",
"input_sales = \"local.retail_sales\"\n",
"input_inventory = \"local.retail_inventory\"\n",
"output_table = \"local.restock_plan\"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3598df66-6027-4f02-b7bd-c279b9d20c32",
"metadata": {},
"outputs": [],
"source": [
"# create spark session with authentication to the catalog\n",
"\n",
"def get_catalog_token(tokenendpoint, password):\n",
" import requests\n",
"\n",
" # Exchange Dremio PAT to Nessie token\n",
" token_request_body = {\n",
" \"grant_type\": \"urn:ietf:params:oauth:grant-type:token-exchange\",\n",
" \"scope\": \"dremio.all\",\n",
" \"subject_token_type\": \"urn:ietf:params:oauth:token-type:dremio:personal-access-token\",\n",
" \"subject_token\": password,\n",
" }\n",
" x = requests.post(tokenendpoint, data=token_request_body)\n",
" x.raise_for_status()\n",
" return x.json()[\"access_token\"]\n",
"\n",
"\n",
"def get_spark_session(host, catalog_token):\n",
" import stackit_spark\n",
"\n",
" catalog_name_in_spark = \"stackit\"\n",
" return stackit_spark.get_spark(\n",
" additional_config={\n",
" \"spark.jars.packages\": \"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1\",\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}\": \"org.apache.iceberg.spark.SparkCatalog\",\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}.warehouse\": \"catalog-s3\",\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}.type\": \"rest\",\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}.uri\": host,\n",
" f\"spark.sql.catalog.{catalog_name_in_spark}.token\": catalog_token,\n",
" }\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f9071bfb-1767-47b3-907f-723ce4389d9e",
"metadata": {},
"outputs": [],
"source": [
"catalog_token = get_catalog_token(\"https://dremio-internal.data-platform.stackit.run/oauth/token\", \"JnHzzS1LRFeZw4HIJQNP+iGJereEuCehcZwyGwSxcZPSrX4H7NL6FGqOxf/lRw==\")\n",
"spark = get_spark_session(\"https://dremio-internal-catalog.data-platform.stackit.run/iceberg/main\", catalog_token)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8c250ade-1565-4c95-b1ae-134724a2e77b",
"metadata": {},
"outputs": [],
"source": [
"spark.sql(f\"USE stackit\")\n",
"# Step 1: Create a namespace\n",
"spark.sql(\"CREATE NAMESPACE IF NOT EXISTS retail_demo\")\n",
"\n",
"# Step 2: Create synthetic sales data\n",
"sales_df = spark.range(100).selectExpr(\n",
" \"date_add('2025-05-01', cast(rand() * 7 as int)) as sale_date\",\n",
" \"cast(rand() * 10 + 1 as int) as units_sold\",\n",
" \"case when cast(rand() * 3 as int) = 0 then 'A123' \"\n",
" \" when cast(rand() * 3 as int) = 1 then 'B456' \"\n",
" \" else 'C789' end as product_id\",\n",
" \"case when cast(rand() * 2 as int) = 0 then '101' else '102' end as store_id\"\n",
")\n",
"\n",
"# Step 3: Create synthetic inventory data\n",
"inventory_df = spark.range(10).selectExpr(\n",
" \"'2025-05-08' as inventory_date\",\n",
" \"cast(rand() * 20 + 10 as int) as current_stock\",\n",
" \"case when cast(rand() * 3 as int) = 0 then 'A123' \"\n",
" \" when cast(rand() * 3 as int) = 1 then 'B456' \"\n",
" \" else 'C789' end as product_id\",\n",
" \"case when cast(rand() * 2 as int) = 0 then '101' else '102' end as store_id\"\n",
")\n",
"\n",
"# Step 4: Write to Iceberg tables\n",
"sales_df.write.mode(\"overwrite\").saveAsTable(\"retail_demo.sales_data\")\n",
"inventory_df.write.mode(\"overwrite\").saveAsTable(\"retail_demo.inventory_data\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5bd53006-ddbf-4ecf-bf3b-fa173cf9893b",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql.functions import expr\n",
"\n",
"# Load sales and inventory data\n",
"sales = spark.sql(\"\"\"\n",
" SELECT store_id, product_id, sale_date, units_sold\n",
" FROM retail_demo.sales_data\n",
"\"\"\")\n",
"\n",
"inventory = spark.sql(\"\"\"\n",
" SELECT store_id, product_id, inventory_date, current_stock\n",
" FROM retail_demo.inventory_data\n",
"\"\"\")\n",
"\n",
"# Step 2: Aggregate demand per store/product (e.g. last 7 days)\n",
"demand = sales.groupBy(\"store_id\", \"product_id\") \\\n",
" .agg({\"units_sold\": \"avg\"}) \\\n",
" .withColumnRenamed(\"avg(units_sold)\", \"predicted_demand\")\n",
"\n",
"# Step 3: Join with inventory\n",
"restock_plan = demand.join(inventory, on=[\"store_id\", \"product_id\"], how=\"inner\")\n",
"\n",
"# Step 4: Calculate restock quantity\n",
"restock_plan = restock_plan.withColumn(\n",
" \"restock_qty\",\n",
" expr(\"CASE WHEN predicted_demand - current_stock > 0 THEN int(predicted_demand - current_stock) ELSE 0 END\")\n",
")\n",
"\n",
"# Step 5: Add metadata\n",
"restock_plan = restock_plan.withColumn(\"run_date\", expr(\"current_date()\"))\n",
"\n",
"# Step 6: Save to Iceberg\n",
"restock_plan.write.mode(\"overwrite\").saveAsTable(\"retail_demo.restock_plan\")\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5fea9096-b7b3-44d4-91aa-189ac9ea4a33",
"metadata": {},
"outputs": [],
"source": [
"# Convert the Spark DataFrame to a Pandas DataFrame for better display in Jupyter\n",
"restock_df = restock_plan.toPandas()\n",
"\n",
"# Display the restock plan nicely in Jupyter using HTML table format\n",
"import IPython.display as display\n",
"display.display(display.HTML(restock_df.to_html(index=False)))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

14
Demo/setup-dbt.md Normal file
View file

@ -0,0 +1,14 @@
# Quick Guide How to setup the orchestration of dbt models using StackIT Managed Airflow
This guide gives an overview on how to pragmatically set up orchestrating dbt models through Airflow. It targets users that want to have version control over their SQL Code used in Dremio.
### Step 1: Add you dbt-repo as submodule
```bash
git submodule add git@ssh.dev.azure.com:v3/schwarzit-wiking/schwarzit.data-platform-playground/dbt-demo
```
### Step 2: update your submodule
On every change in the dbt submodule it needs to be updated using the following command:
```bash
git submodule update --remote dbt-demo
```

View file

@ -0,0 +1,40 @@
import json
import pendulum
from airflow.sdk import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_01_taskflow",
)
def taskflow_api():
@task()
def extract():
data_string = '{"1001": 301.38, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_api()

View file

@ -0,0 +1,48 @@
import pendulum
from airflow.sdk import dag, task
from airflow.operators.bash import BashOperator
from textwrap import dedent
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_02_regular_operators",
)
def regular_operators():
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 10",
retries=3,
)
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
params={"my_param": "Parameter I passed in"},
)
t1 >> [t2, t3]
regular_operators()

View file

@ -0,0 +1,79 @@
import json
import pendulum
from airflow.sdk import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import (
KubernetesPodOperator,
)
from stackit_workflows.kubernetes import POD_NAMESPACE, K8S
from stackit_workflows.airflow_plugin.decorators import stackit
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_03_kubernetes_operator",
)
def kubernetes_operator():
# You may use the KubernetesPodOperator to launch any image you want!
# You may even launch additional pods from your pod - the default serviceAccount
# has permissions to mange pods, configmaps and services of type ClusterIP.
# Not all fields may be set freely on the pod. There is an admission controller in place
# that modifies taints and other fields. Pods run in a dedicated namespace.
k = KubernetesPodOperator(
name="hello-dry-run",
image="debian",
#image="unknown_image",
cmds=["bash", "-c"],
arguments=["echo hello world"],
labels={"foo": "bar"},
task_id="dry_run_demo",
log_events_on_failure=True,
do_xcom_push=True,
get_logs=True,
#Container resources requests and limits. Mandatory to set
container_resources=K8S.V1ResourceRequirements(
requests={"cpu": "100m", "memory": "50Mi"}, #optional: "ephemeral-storage": "1Gi"
limits={"cpu": "200m", "memory": "100Mi"}, #optional "ephemeral-storage": "2Gi"
),
# All pods launched must run as non-root users. Otherwise they won't start.
security_context=K8S.V1PodSecurityContext(run_as_user=100),
)
# The stackit python kubernetes decorator is an extension of the kubernetes decorator.
# It does:
# - Specify a default up-to-date data transformation image with python, pandas and other libraries pre-installed
# - Set the namespace correctly
# - Provide the Airflow "context" as environment variables
# - Synchronizes this repository into the launched pod so that you can use imports
@stackit.python_kubernetes_task()
def stackit_python_kubernetes():
import os
# This is not possible with the KubernetesPodOperator unless you
# clone the DAG repo yourself!
from scripts.my_tools.say_hello import say_hello
say_hello()
# We provide the most common Airflow context variables as environment variables
task_id = os.environ["AIRFLOW__CONTEXT__TASK__TASK_ID"]
print(f"Task ID: {task_id}")
# Lets print all available environment variables
env_vars = {
key: value
for key, value in os.environ.items()
if key.startswith("AIRFLOW__CONTEXT") or key.startswith("STACKIT__")
}
for key, value in sorted(env_vars.items()):
print(f"{key}: {value}")
stackit_python_kubernetes()
kubernetes_operator()

View file

@ -0,0 +1,149 @@
import pendulum
from airflow.sdk import dag, task
from stackit_workflows.airflow_plugin.decorators import stackit
from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator
# It is good practice but not required to specify the image to use.
default_kwargs = {
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2"
}
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_04_simple_spark",
)
def simple_spark():
# This is a regular task using the airflow taskflow API
@task()
def generate_number():
import random
return random.randint(1, 100)
# This is our own provider which makes it easy to run spark jobs from airflow.
# By default, this launches a single node spark with 1 CPU and 6 GB RAM.
@stackit.spark_kubernetes_task(**default_kwargs)
def spark_single_node(random_number: int):
import stackit_spark
import pandas as pd
spark = stackit_spark.get_spark()
df = spark.createDataFrame(pd.DataFrame({"random_number": [random_number]}))
df.show()
# If we need more resources, we can specify them with the `cpu` and `memory_gb` parameters.
# By default, memory is calculated as: Memory in GB = 6 * CPU.
# Memory can also be specified explicitly.
@stackit.spark_kubernetes_task(cpu=3, **default_kwargs)
def spark_single_node_large(random_number: int):
import stackit_spark
import pandas as pd
import time
spark = stackit_spark.get_spark()
data = pd.DataFrame({"random_number": [random_number]})
df = spark.createDataFrame(data)
df.show()
time.sleep(60)
# You can optionally specify the resources and also the number of executors.
# `cpu` and `memory_gb` set resources for the driver, `executor_cpu` and `executor_memory_gb`
# set resources for the executors.
# If memory is not specified, it is calculated based on the specified CPUs. (1 CPU = 6 GB RAM)
# `executors` sets the number of executors.
# This uses by default a stackit provided spark image. It can be overridden.
# For many tasks one spark node is enough.
# However, for very large transformations, we might need to spin up a cluster.
#
# Take this step only if you need more than ~16 CPUs and 128 GB RAM.
# Running Spark in cluster node can sometimes even hurt performance as
# data needs to be shuffled between nodes - and Network IO is often the bottleneck.
#
# `cpu` and `memory_gb` can still be used to configure the driver.
# Executors are configured with `executor_cpu` and `executor_memory_gb`.
# The number of executors can be set with `executors`. Each executor
# gets the resources specified in `executor_cpu` and `executor_memory_gb`.
@stackit.spark_kubernetes_task(cpu=2, executors=3, executor_cpu=2, **default_kwargs)
def spark_cluster(random_number: int):
import stackit_spark
import pandas as pd
import time
# You can customize spark here:
spark = stackit_spark.get_spark(
additional_config={
"spark.sql.shuffle.partitions": 200,
"spark.executor.memoryOverhead": "1g",
# We add additional jars from maven
"spark.jars.packages": "org.apache.hadoop:hadoop-azure-datalake:3.3.4,org.apache.hadoop:hadoop-azure:3.3.4,",
}
)
data = pd.DataFrame({"random_number": [random_number]})
df = spark.createDataFrame(data)
df.show()
sc = spark._jsc.sc()
executors = [
executor.host() for executor in sc.statusTracker().getExecutorInfos()
]
print(f"Executors: {executors}")
time.sleep(60)
# Instead of using the taskflow API, you can also use an operator and specify a script to
# be executed.
t1 = STACKITSparkScriptOperator(
task_id="spark_operator",
# You can use here all parameters that you already learned about above.
cpu=2,
# Specify a relative path here from the root of the DAGs Repository
script="Demo/scripts/my_spark_job.py",
**default_kwargs,
)
# When using the StackITSparkScriptOperator, the script is run in the context of
# the git repository it is contained in. This means you can use imports inside
# of scripts!
# Per default, the script is expected to be in the same repository as this DAG.
# If it is in another repository, you can specify the git_* arguments of the
# operator. Check the `05-spark-full-configuration.py` for an example.
t1 = STACKITSparkScriptOperator(
task_id="spark_operator_with_imports",
# You can use here all parameters that you already learned about above.
cpu=2,
# Specify a relative path here from the root of the DAGs Repository
script="Demo/scripts/my_spark_job_with_imports.py",
**default_kwargs,
)
# Even when using your own image, you can still use this operator to execute
# a python script in any git repository in the context of your image. You
# won't be able to use the `stackit_spark` module in this case.
# `python` needs to be installed in the image.
t1 = STACKITSparkScriptOperator(
task_id="run_script_in_custom_image",
# You can use here all parameters that you already learned about above.
# This will still set the resource requests of your pod!
cpu=2,
# Specify a relative path here from the root of the DAGs Repository
script="Demo/scripts/basic_python_script.py",
image="python:3.12-slim-bullseye",
# All pods must run as non-root users. We recommend to use images
# that already run as non-root users.
security_context={"runAsUser": 1000},
)
random_number = generate_number()
spark_single_node(random_number)
spark_single_node_large(random_number)
spark_cluster(random_number)
random_number >> t1
simple_spark()

View file

@ -0,0 +1,100 @@
import pendulum
import warnings
from airflow.sdk import dag, task
from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator
from stackit_workflows import config
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_05_spark_full_configuration",
)
def spark_full_configuration():
# The same arguments are supported by the decorator.
STACKITSparkScriptOperator(
task_id="spark_full_configuration",
# image : str, optional
# Image of the launched container.
# If not specified, a current spark image maintained by stackit is used.
image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
# cpu : float
# CPU Request of the pod. If spark is used with no executors,
# this is also used to determine the number of processes (i.e. local[2]).
# If executors are used, this is the CPU request of the driver.
cpu=4,
# env_vars : dict, list of K8S.V1EnvVar, optional
# Env-Vars to pass to the pod. Dictionary contains env-vars as keys and
# their values as values. (i.e. {'MY_ENV_VAR': 42}). Please note that
# all numeric env-vars are cast to string before creating the Kubernetes
# Pod Template.
env_vars={"MY_ENV_VAR": "my_value"},
# # Not yet supported:
# # executor_ephemeral_gb : int, optional
# # Size of the ephemeral Volumes added to each spark executor.
# # The mount-path in the pod is specified with :paramref:`ephemeral_mount_path`.
# # If set, spark.local.dir is set to :paramref:`ephemeral_mount_path`.
# # Default: None
# executor_ephemeral_gb=5,
# # Not yet supported:
# # ephemeral_mount_path : str, optional
# # Not yet supported:
# # ephemeral_mount_path : str, optional
# # Path of the ephemeral volume mounts for the driver and the executors.
# # Default: "/stackit-tmp"
# ephemeral_mount_path="/stackit-tmp",
# # Not yet supported:
# # driver_ephemeral_gb: int, optional
# # Size of the ephemeral Volumes added to the spark driver.
# # The mount-path in the pod is specified with :paramref:`ephemeral_mount_path`.
# # If not specified, the driver ephemeral volume is set to '1Gi' if any
# # executor ephemeral volume is specified. This is to ensure that
# # 'spark.local.dir' is set to an existing directory.
# # If set, spark.local.dir is set to :paramref:`ephemeral_mount_path`.
# executor_cpu : int or float, optional
# CPUs to be used by Spark for each executor. Decimal values are allowed.
executor_cpu=3.5,
# executor_memory_gb : int, optional
# Requested memory of the executors in gb. Decimal values are allowed.
# If not specified, it is calculated from :paramref:`executor_cpu` as:
# ``executor_cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6).
executor_memory_gb=10,
# executors: int, optional
# Number of kubernetes executors. If specified, `spark_mode` defaults to `k8s`
# instead of `local`.
executors=2,
# get_logs : bool, optional
# Read stdout get the stdout of the container as logs of the tasks. Default: True
get_logs=True,
dremio_connections=["lakehouse-rest"],
# memory_gb : int or float, optional
# Memory request in GB for the (driver) pod. Decimal values are allowed.
# If not specified, it is calculated from :paramref:`cpu` as:
# ``cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6).
memory_gb=10,
# name : str, optional
# Name of the pod spawned for this task.
# We recommend to not change this value.
# Default: ``"task-<dag_id>-<task_id>-<hash>"``
name="ensure_this_is_unique",
# provide_context : bool, optional
# Provide Airflow context as environment variables. Default: True
# Check example 03 for more information.
provide_context=True,
# container_resources : K8S.V1ResourceRequirements, optional
# Currently not supported. Use :paramref:`cpu` and
# :paramref:`memory_gb` instead.
# script : str
# Path of the script to execute. If a relative path is specified, it
# is interpreted relative to the git-sync folder.
# If an absolute path is specified it is not adapted further.
script="Demo/scripts/my_spark_job.py",
# Any other arguments as supported by the `KubernetesPodOperator` can be used here.
labels={"foo": "bar"},
)
spark_full_configuration()

View file

@ -0,0 +1,101 @@
# Identical to '05-spark-full-configuration.py', but uses the legacy 'stackit_airflow' package
import pendulum
from airflow.sdk import dag
from stackit_airflow.airflow_plugin.operators import StackITSparkOperator
from stackit_workflows import config
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_05b_spark_full_configuration",
)
def spark_full_configuration():
# The same arguments are supported by the decorator.
StackITSparkOperator(
task_id="spark_full_configuration",
# image : str, optional
# Image of the launched container.
# If not specified, a current spark image maintained by stackit is used.
image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
# cpu : float
# CPU Request of the pod. If spark is used with no executors,
# this is also used to determine the number of processes (i.e. local[2]).
# If executors are used, this is the CPU request of the driver.
cpu=4,
# env_vars : dict, list of K8S.V1EnvVar, optional
# Env-Vars to pass to the pod. Dictionary contains env-vars as keys and
# their values as values. (i.e. {'MY_ENV_VAR': 42}). Please note that
# all numeric env-vars are cast to string before creating the Kubernetes
# Pod Template.
env_vars={"MY_ENV_VAR": "my_value"},
# # Not yet supported:
# # executor_ephemeral_gb : int, optional
# # Size of the ephemeral Volumes added to each spark executor.
# # The mount-path in the pod is specified with :paramref:`ephemeral_mount_path`.
# # If set, spark.local.dir is set to :paramref:`ephemeral_mount_path`.
# # Default: None
# executor_ephemeral_gb=5,
# # Not yet supported:
# # ephemeral_mount_path : str, optional
# # Not yet supported:
# # ephemeral_mount_path : str, optional
# # Path of the ephemeral volume mounts for the driver and the executors.
# # Default: "/stackit-tmp"
# ephemeral_mount_path="/stackit-tmp",
# # Not yet supported:
# # driver_ephemeral_gb: int, optional
# # Size of the ephemeral Volumes added to the spark driver.
# # The mount-path in the pod is specified with :paramref:`ephemeral_mount_path`.
# # If not specified, the driver ephemeral volume is set to '1Gi' if any
# # executor ephemeral volume is specified. This is to ensure that
# # 'spark.local.dir' is set to an existing directory.
# # If set, spark.local.dir is set to :paramref:`ephemeral_mount_path`.
# executor_cpu : int or float, optional
# CPUs to be used by Spark for each executor. Decimal values are allowed.
executor_cpu=3.5,
# executor_memory_gb : int, optional
# Requested memory of the executors in gb. Decimal values are allowed.
# If not specified, it is calculated from :paramref:`executor_cpu` as:
# ``executor_cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6).
executor_memory_gb=10,
# executors: int, optional
# Number of kubernetes executors. If specified, `spark_mode` defaults to `k8s`
# instead of `local`.
executors=2,
# get_logs : bool, optional
# Read stdout get the stdout of the container as logs of the tasks. Default: True
get_logs=True,
dremio_connections=["lakehouse-rest"],
# memory_gb : int or float, optional
# Memory request in GB for the (driver) pod. Decimal values are allowed.
# If not specified, it is calculated from :paramref:`cpu` as:
# ``cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6).
memory_gb=10,
# name : str, optional
# Name of the pod spawned for this task.
# We recommend to not change this value.
# Default: ``"task-<dag_id>-<task_id>-<hash>"``
name="ensure_this_is_unique",
# provide_context : bool, optional
# Provide Airflow context as environment variables. Default: True
# Check example 03 for more information.
provide_context=True,
# container_resources : K8S.V1ResourceRequirements, optional
# Currently not supported. Use :paramref:`cpu` and
# :paramref:`memory_gb` instead.
# script : str
# Path of the script to execute. If a relative path is specified, it
# is interpreted relative to the git-sync folder.
# If an absolute path is specified it is not adapted further.
script="Demo/scripts/my_spark_job.py",
# Any other arguments as supported by the `KubernetesPodOperator` can be used here.
labels={"foo": "bar"},
)
spark_full_configuration()

View file

@ -0,0 +1,213 @@
import pendulum
from airflow.sdk import dag, task, Connection
from stackit_workflows.airflow_plugin.decorators import stackit
from stackit_workflows import config
from kubernetes.client import models as K8S
default_kwargs = {
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.6-0.4.0rc1"
}
# SNA / "Data Platform Innovation Team" Instances setup
# Airflow Connection "lakehouse-rest" must be configured for Dremio 26 / Polaris:
# host = https://dremio-internal-v26.… (Dremio/OAuth host)
# password = <Dremio PAT (Personal Access Token)> or <Password>
# extras:
# token_endpoint = https://<host>/oauth/token (optional, derived from host if absent)
# warehouse = default (optional, defaults to "default")
# catalog_name = stackit (optional, defaults to conn_id)
# catalog_uri= https://iceberg-catalog-internal-v26.…/api/catalog for Dremio 26+ or
# https://iceberg-catalog-internal-v26.…/api/v3/iceberg for Dremio 25 and Nessie
# Note: catalog_uri *must* be set. It will otherwise default to the setting for Dremio-as-a-Service
# which does *not* work for SNA instances
try:
lakehouse_connection = Connection.get("lakehouse-rest")
except Exception:
import warnings
warnings.warn(
"Airflow connection 'lakehouse-rest' is not configured. "
"This DAG will be visible but cannot be run until the connection is created. "
"Please create an Airflow connection with conn_id='lakehouse-rest'"
"see the comments at the top of this file for the required fields.",
stacklevel=2,
)
lakehouse_connection = None
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_06_lakehouse",
)
def lakehouse():
# When using the stackit operator or decorator, lakehouse connections can be passed as a list.
# Spark is pre-configured to use the specified lakehouse connection.
# Check the documentation or the 05-spark-full-configuration.py dag for more information.
@stackit.spark_kubernetes_task(
**default_kwargs, dremio_connections=["lakehouse-rest"]
)
def write_to_lakehouse():
import stackit_spark
import pandas as pd
import os
for k, v in os.environ.items():
print(f"{k}: {v}")
catalog_name_in_spark = "stackit"
target_table = "DEMO.user"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
spark.sql("CREATE NAMESPACE IF NOT EXISTS DEMO")
data = pd.DataFrame([[1, "Alice"], [2, "Bob"]], columns=["id", "name"])
sdf = spark.createDataFrame(data)
sdf.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
return target_table
@stackit.spark_kubernetes_task(
**default_kwargs, dremio_connections=["lakehouse-rest"]
)
def read_from_lakehouse(target_table):
import stackit_spark
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
@stackit.spark_kubernetes_task(
**default_kwargs,
dremio_connections=["lakehouse-rest"],
ephemeral_gb=1,
executor_ephemeral_gb=2,
cpu=1,
executor_cpu=1,
executors=2,
)
def ephemeral_volumes_cluster(target_table):
import stackit_spark
import time
import subprocess
print(subprocess.run(["df", "-h"], capture_output=True, text=True).stdout)
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
time.sleep(60)
print("Done!")
@stackit.spark_kubernetes_task(
**default_kwargs,
dremio_connections=["lakehouse-rest"],
ephemeral_gb=1,
cpu=1,
)
def ephemeral_volumes_driver(target_table):
import stackit_spark
import time
import subprocess
print(subprocess.run(["df", "-h"], capture_output=True, text=True).stdout)
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
time.sleep(60)
print("Done!")
@stackit.spark_kubernetes_task(
**default_kwargs,
dremio_connections=["lakehouse-rest"],
)
def read_from_lakehouse_long_running(target_table):
import stackit_spark
import time
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
# Re-run every minute and show that its still alive vor 70 Minutes
for i in range(3 * 70):
time.sleep(60)
print(f"Running for {i} minutes", flush=True)
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
@stackit.spark_kubernetes_task(**default_kwargs)
def read_from_lakehouse_manual(target_table, host, lh_tokenendpoint, lh_password, lh_catalog_uri):
import stackit_spark
# Dremio 26 / Polaris: use the OAuth2Manager for token exchange.
# This requires the authmgr-oauth2-runtime JAR in addition to the Iceberg JARs.
# The token endpoint and catalog URI come from the 'lakehouse-rest' connection extras:
# token_endpoint = https://<dremio-host>/oauth/token
# catalog_uri = https://<iceberg-catalog-host>/api/catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark(
additional_config={
"spark.jars.packages": (
"org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,"
"org.apache.iceberg:iceberg-aws-bundle:1.10.1,"
"com.dremio.iceberg.authmgr:authmgr-oauth2-runtime:1.0.0"
),
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
f"spark.sql.catalog.{catalog_name_in_spark}": "org.apache.iceberg.spark.SparkCatalog",
f"spark.sql.catalog.{catalog_name_in_spark}.cache-enabled": "false",
f"spark.sql.catalog.{catalog_name_in_spark}.type": "rest",
f"spark.sql.catalog.{catalog_name_in_spark}.warehouse": "default",
f"spark.sql.catalog.{catalog_name_in_spark}.uri": lh_catalog_uri,
f"spark.sql.catalog.{catalog_name_in_spark}.header.X-Iceberg-Access-Delegation": "vended-credentials",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.type": "com.dremio.iceberg.authmgr.oauth2.OAuth2Manager",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-endpoint": lh_tokenendpoint,
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.grant-type": "urn:ietf:params:oauth:grant-type:token-exchange",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.client-id": "dremio",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.client-auth": "none",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.scope": "dremio.all",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-exchange.subject-token": lh_password,
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-exchange.subject-token-type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token",
}
)
spark.sql(f"USE {catalog_name_in_spark}")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
lh_host = lakehouse_connection.host if lakehouse_connection else None
lh_password = lakehouse_connection.password if lakehouse_connection else None
lh_tokenendpoint = lakehouse_connection.extra_dejson.get("token_endpoint") if lakehouse_connection else None
lh_catalog_uri = lakehouse_connection.extra_dejson.get("catalog_uri") if lakehouse_connection else None
target_table = write_to_lakehouse()
ephemeral_volumes_cluster(target_table)
ephemeral_volumes_driver(target_table)
read_from_lakehouse_long_running(target_table)
read_from_lakehouse(target_table)
read_from_lakehouse_manual(target_table, lh_host, lh_tokenendpoint, lh_password, lh_catalog_uri)
lakehouse()

View file

@ -0,0 +1,211 @@
import pendulum
from airflow.sdk import dag, task, Connection
from stackit_workflows.airflow_plugin.decorators import stackit
from stackit_workflows import config
from kubernetes.client import models as K8S
default_kwargs = {
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.6-0.4.0rc1"
}
# Dremio-as-a-service setup
# Airflow Connection "lakehouse-rest-daas" must be configured for Dremio 26 / Polaris:
# host = https://dremio-internal-v26.… (Dremio/OAuth host)
# password = <Dremio PAT (Personal Access Token)> or <Password>
# extras:
# token_endpoint = https://<host>/oauth/token (optional, derived from host if absent)
# warehouse = default (optional, defaults to "default")
# catalog_name = stackit (optional, defaults to conn_id)
# catalog_uri = https://catalog.$host/ (optional, derived from host if absent)
try:
lakehouse_connection = Connection.get("lakehouse-rest-daas")
except Exception:
import warnings
warnings.warn(
"Airflow connection 'lakehouse-rest-daas' is not configured. "
"This DAG will be visible but cannot be run until the connection is created. "
"Please create an Airflow connection with conn_id='lakehouse-rest-daas'"
"see the comments at the top of this file for the required fields.",
stacklevel=2,
)
lakehouse_connection = None
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_06daas_lakehouse",
)
def lakehouse():
# When using the stackit operator or decorator, lakehouse connections can be passed as a list.
# Spark is pre-configured to use the specified lakehouse connection.
# Check the documentation or the 05-spark-full-configuration.py dag for more information.
@stackit.spark_kubernetes_task(
**default_kwargs, dremio_connections=["lakehouse-rest-daas"]
)
def write_to_lakehouse():
import stackit_spark
import pandas as pd
import os
for k, v in os.environ.items():
print(f"{k}: {v}")
catalog_name_in_spark = "stackit"
target_table = "DEMO.user"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
spark.sql("CREATE NAMESPACE IF NOT EXISTS DEMO")
data = pd.DataFrame([[1, "Alice"], [2, "Bob"]], columns=["id", "name"])
sdf = spark.createDataFrame(data)
sdf.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
return target_table
@stackit.spark_kubernetes_task(
**default_kwargs, dremio_connections=["lakehouse-rest-daas"]
)
def read_from_lakehouse(target_table):
import stackit_spark
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
@stackit.spark_kubernetes_task(
**default_kwargs,
dremio_connections=["lakehouse-rest-daas"],
ephemeral_gb=1,
executor_ephemeral_gb=2,
cpu=1,
executor_cpu=1,
executors=2,
)
def ephemeral_volumes_cluster(target_table):
import stackit_spark
import time
import subprocess
print(subprocess.run(["df", "-h"], capture_output=True, text=True).stdout)
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
time.sleep(60)
print("Done!")
@stackit.spark_kubernetes_task(
**default_kwargs,
dremio_connections=["lakehouse-rest-daas"],
ephemeral_gb=1,
cpu=1,
)
def ephemeral_volumes_driver(target_table):
import stackit_spark
import time
import subprocess
print(subprocess.run(["df", "-h"], capture_output=True, text=True).stdout)
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
time.sleep(60)
print("Done!")
@stackit.spark_kubernetes_task(
**default_kwargs,
dremio_connections=["lakehouse-rest-daas"],
)
def read_from_lakehouse_long_running(target_table):
import stackit_spark
import time
# Create Spark Session with Iceberg Rest Credentials for Dremio Enterprise Catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
# Re-run every minute and show that its still alive vor 70 Minutes
for i in range(3 * 70):
time.sleep(60)
print(f"Running for {i} minutes", flush=True)
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
@stackit.spark_kubernetes_task(**default_kwargs)
def read_from_lakehouse_manual(target_table, host, lh_tokenendpoint, lh_password, lh_catalog_uri):
import stackit_spark
# Dremio 26 / Polaris: use the OAuth2Manager for token exchange.
# This requires the authmgr-oauth2-runtime JAR in addition to the Iceberg JARs.
# The token endpoint and catalog URI come from the 'lakehouse-rest-daas' connection extras:
# token_endpoint = https://<dremio-host>/oauth/token
# catalog_uri = https://<iceberg-catalog-host>/api/catalog
catalog_name_in_spark = "stackit"
spark = stackit_spark.get_spark(
additional_config={
"spark.jars.packages": (
"org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,"
"org.apache.iceberg:iceberg-aws-bundle:1.10.1,"
"com.dremio.iceberg.authmgr:authmgr-oauth2-runtime:1.0.0"
),
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
f"spark.sql.catalog.{catalog_name_in_spark}": "org.apache.iceberg.spark.SparkCatalog",
f"spark.sql.catalog.{catalog_name_in_spark}.cache-enabled": "false",
f"spark.sql.catalog.{catalog_name_in_spark}.type": "rest",
f"spark.sql.catalog.{catalog_name_in_spark}.warehouse": "default",
f"spark.sql.catalog.{catalog_name_in_spark}.uri": lh_catalog_uri,
f"spark.sql.catalog.{catalog_name_in_spark}.header.X-Iceberg-Access-Delegation": "vended-credentials",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.type": "com.dremio.iceberg.authmgr.oauth2.OAuth2Manager",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-endpoint": lh_tokenendpoint,
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.grant-type": "urn:ietf:params:oauth:grant-type:token-exchange",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.client-id": "dremio",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.client-auth": "none",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.scope": "dremio.all",
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-exchange.subject-token": lh_password,
f"spark.sql.catalog.{catalog_name_in_spark}.rest.auth.oauth2.token-exchange.subject-token-type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token",
}
)
spark.sql(f"USE {catalog_name_in_spark}")
sdf = spark.sql(f"SELECT * FROM {target_table}")
sdf.show()
lh_host = lakehouse_connection.host if lakehouse_connection else None
lh_password = lakehouse_connection.password if lakehouse_connection else None
lh_tokenendpoint = lakehouse_connection.extra_dejson.get("token_endpoint") if lakehouse_connection else None
lh_catalog_uri = lakehouse_connection.extra_dejson.get("catalog_uri") if lakehouse_connection else None
target_table = write_to_lakehouse()
ephemeral_volumes_cluster(target_table)
ephemeral_volumes_driver(target_table)
read_from_lakehouse_long_running(target_table)
read_from_lakehouse(target_table)
read_from_lakehouse_manual(target_table, lh_host, lh_tokenendpoint, lh_password, lh_catalog_uri)
lakehouse()

View file

@ -0,0 +1,41 @@
import pendulum
from airflow.sdk import dag
from stackit_workflows.airflow_plugin.decorators import stackit
default_kwargs = {
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2"
}
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_07_extra_packages",
)
def packages():
# In the stackit provided images, the python environment is owned by the runtime user.
# Thus, packages can be installed on the fly using `pip`, `conda`, or `mamba`.
# We recommend using `mamba` whenever possible, as mamba ships optimized binaries and
# resolves dependencies instead of just warning about incompatibilities.
# This process takes compute resources every time the image is started. If you need
# to run this task frequently, we recommend to build a custom image with the required packages.
@stackit.spark_kubernetes_task(**default_kwargs)
def tell_jokes():
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "Joking"])
import Joking
print(Joking.random_joke())
tell_jokes()
packages()

View file

@ -0,0 +1,42 @@
#Identical to '07-extra-packages.py', but uses 'spark_kubernetes' instead of 'spark_kubernetes_task'
import pendulum
from airflow.sdk import dag
from stackit_workflows.airflow_plugin.decorators import stackit
default_kwargs = {
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2"
}
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_07b_extra_packages",
)
def packages():
# In the stackit provided images, the python environment is owned by the runtime user.
# Thus, packages can be installed on the fly using `pip`, `conda`, or `mamba`.
# We recommend using `mamba` whenever possible, as mamba ships optimized binaries and
# resolves dependencies instead of just warning about incompatibilities.
# This process takes compute resources every time the image is started. If you need
# to run this task frequently, we recommend to build a custom image with the required packages.
@stackit.spark_kubernetes(**default_kwargs)
def tell_jokes():
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "Joking"])
import Joking
print(Joking.random_joke())
tell_jokes()
packages()

View file

@ -0,0 +1,150 @@
import pendulum
from airflow.sdk import dag, task, Connection
from stackit_workflows.airflow_plugin.decorators import stackit
default_kwargs = {
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2"
}
try:
lakehouse_connection = Connection.get("lakehouse-rest")
except Exception:
import warnings
warnings.warn(
"Airflow connection 'lakehouse-rest' is not configured. "
"This DAG will be visible but cannot be run until the connection is created. "
"Please create an Airflow connection with conn_id='lakehouse-rest'"
"see the comments in Demo/06-lakehouse.py for the required fields.",
stacklevel=2,
)
lakehouse_connection = None
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_08_table_maintenance",
)
def table_maintenance():
"""
This DAG gives an example implementation on how you can operationalize your table maintenance using Airflow and Dremio.
There are two examples: One for the OPTIMZE/expire_snapshot and one for the VACUUM/rewrite_data_files command.
Full reference of all available iceberg optmization procedures can be found here:https://iceberg.apache.org/docs/1.8.0/spark-procedures/.
For further information refer to the StackIT Data Platform User Documentation: https://itdoc.schwarz/display/STACKIT/Dremio+-+Iceberg+Table+Maintenance+Guide.
"""
@stackit.spark_kubernetes_task(**default_kwargs, dremio_connections=["lakehouse-rest"])
def vacuum_table_using_spark():
"""
Each write/update/delete/upsert/compaction in Iceberg produces a new snapshot while keeping the old data
and metadata around for snapshot isolation and time travel. The expire_snapshots procedure can be used
to remove older snapshots and their files which are no longer needed.
"""
import stackit_spark
import os
for k, v in os.environ.items():
print(f"{k}: {v}")
catalog_name_in_spark = "catalog-s3"
target_table = "DEMO.user"
spark = stackit_spark.get_spark()
spark.sql(f"USE `{catalog_name_in_spark}`")
result = spark.sql(f"CALL `{catalog_name_in_spark}`.system.expire_snapshots( \
table => 'DEMO.user', \
retain_last => 100 \
)")
result.show()
@stackit.spark_kubernetes_task(**default_kwargs, dremio_connections=["lakehouse-rest"])
def rewrite_data_files():
"""
Iceberg tracks each data file in a table. More data files leads to more metadata stored in manifest files,
and small data files causes an unnecessary amount of metadata and less efficient queries from file open costs.
Iceberg can compact data files in parallel using Spark with the rewriteDataFiles action.
This will combine small files into larger files to reduce metadata overhead and runtime file open cost.
"""
import stackit_spark
import os
for k, v in os.environ.items():
print(f"{k}: {v}")
catalog_name_in_spark = "catalog-s3"
target_table = "DEMO.user"
spark = stackit_spark.get_spark(
additional_config={
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1"
})
spark.sql(f"USE `{catalog_name_in_spark}`")
result = spark.sql(f"CALL `{catalog_name_in_spark}`.system.rewrite_data_files(table => 'DEMO.user')")
result.show()
@task(task_id="vacuum_catalog_using_dremio")
def vacuum_catalog_using_dremio():
import sys
import pathlib
sys.path.append(str(pathlib.Path(__file__).parent))
from tools.execute_dremio_query import initialize_flight, read_query_pandas
from airflow.hooks.base import BaseHook
dremio_connection = BaseHook.get_connection("dremio-flight")
print(f"Host: {dremio_connection.host}")
print(f"Login: {dremio_connection.login}")
print(f"Password: {dremio_connection.password}")
print(f"Catalog: {dremio_connection.schema}")
client, auth = initialize_flight(
flight_uri=dremio_connection.host,
username=dremio_connection.login,
password=dremio_connection.password,
)
data = read_query_pandas(
client,
auth,
query=f'VACUUM CATALOG "{dremio_connection.schema}";',
)
print(data)
@task(task_id="optimize_table_using_dremio")
def optimize_table_using_dremio():
import sys
import pathlib
sys.path.append(str(pathlib.Path(__file__).parent))
from tools.execute_dremio_query import initialize_flight, read_query_pandas
from airflow.hooks.base import BaseHook
dremio_connection = BaseHook.get_connection("dremio-flight")
print(f"Host: {dremio_connection.host}")
print(f"Login: {dremio_connection.login}")
print(f"Password: {dremio_connection.password}")
print(f"Catalog: {dremio_connection.schema}")
client, auth = initialize_flight(
flight_uri=dremio_connection.host,
username=dremio_connection.login,
password=dremio_connection.password,
)
data = read_query_pandas(
client,
auth,
query=f'''
OPTIMIZE TABLE "{dremio_connection.schema}"."E2E_TEST"."bronze_user"
REWRITE DATA (MIN_FILE_SIZE_MB=100, MAX_FILE_SIZE_MB=1000, TARGET_FILE_SIZE_MB=512);
''',
)
print(data)
vacuum_table_using_spark() >> rewrite_data_files() >> [vacuum_catalog_using_dremio(), optimize_table_using_dremio()]
table_maintenance()

68
Demo/stackit-09-dbt.py Normal file
View file

@ -0,0 +1,68 @@
"""
An example DAG that uses Cosmos to render a dbt project into an Airflow DAG.
"""
import os
from datetime import datetime
from pathlib import Path
from cosmos import DbtDag, ProfileConfig, ProjectConfig, ExecutionConfig
from cosmos.profiles import BaseProfileMapping
from airflow.sdk import Connection
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dbt-demo"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
# Retrieve Dremio Credentials
# Airflow Connection "dbt_dremio" must be configured:
# host = <Dremio host URL>
# login = <Dremio username>
# password = <Dremio PAT (Personal Access Token)> or <Password>
# extras:
# dremio_space = <dbt target space in Dremio>
# dremio_space_folder = <folder within the space>
try:
dremio_connection = Connection.get("dbt_dremio")
os.environ["DREMIO_PAT"] = dremio_connection.password
os.environ["DREMIO_HOST"] = dremio_connection.host
os.environ["DREMIO_USER"] = dremio_connection.login
os.environ["DREMIO_SPACE"] = dremio_connection.extra_dejson["dremio_space"]
os.environ["DREMIO_FOLDER"] = dremio_connection.extra_dejson["dremio_space_folder"]
profile_config = ProfileConfig(
profile_name="dbt_demo",
target_name="dev",
profiles_yml_filepath= "/opt/airflow/dags/repo/dbt-demo/dbt_demo/profiles.yml",
)
basic_cosmos_dag = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "dbt_demo",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
dbt_executable_path=f"/app/packages/stackit-workflows/dbt_venv/bin/dbt",
),
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
},
# normal dag parameters
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["demo", "stackit-demo"],
dag_id="stackit_09_dbt_dremio",
default_args={"retries": 2},
)
except Exception:
import warnings
warnings.warn(
"Airflow connection 'dbt_dremio' is not configured. "
"DAG '09_dbt_dremio' will not be registered until the connection is created. "
"Please create an Airflow connection with conn_id='dbt_dremio' with the following fields: "
"host (Dremio host URL), login (username), password (PAT), "
"and extras: dremio_space, dremio_space_folder.",
stacklevel=2,
)

108
Demo/stackit-10-notebook.py Normal file
View file

@ -0,0 +1,108 @@
import pendulum
from airflow.sdk import dag, task
from airflow.models.connection import Connection
from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator
from stackit_workflows import config
# Airflow Connection "s3_papermill_output" must be configured to write notebook output to S3:
# host = <S3 endpoint URL> (used as BOTO3_ENDPOINT_URL)
# login = <AWS_ACCESS_KEY_ID>
# password = <AWS_SECRET_ACCESS_KEY>
try:
s3_connection = Connection.get_connection_from_secrets("s3_papermill_output")
except Exception:
import warnings
warnings.warn(
"Airflow connection 's3_papermill_output' is not configured. "
"This DAG will be visible but cannot be run until the connection is created. "
"Please create an Airflow connection with conn_id='s3_papermill_output': "
"host=<S3 endpoint URL>, login=<AWS_ACCESS_KEY_ID>, password=<AWS_SECRET_ACCESS_KEY>.",
stacklevel=2,
)
s3_connection = None
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_10_jupyter_notebook",
)
def spark_full_configuration():
if s3_connection is None:
# S3 connection not available — define a placeholder task so the DAG
# can still be imported and inspected.
@task()
def setup_required():
raise RuntimeError(
"Airflow connection 's3_papermill_output' is not configured. "
"Please create it with: host=<S3 endpoint URL>, "
"login=<AWS_ACCESS_KEY_ID>, password=<AWS_SECRET_ACCESS_KEY>."
)
setup_required()
return
output_s3 = STACKITSparkScriptOperator(
task_id="papermill_s3",
# image : str, optional
# Image of the launched container.
# If not specified, a current spark image maintained by stackit is used.
image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
# get_logs : bool, optional
# Read stdout get the stdout of the container as logs of the tasks. Default: True
get_logs=True,
# is_papermill : bool, optional
# Flag to define if the script defined in the `script` parameter
# is a jupyter notebook that is supposed to be executed using papermill.
# This parameter does not need to be set in order to use papermill. If
# it is not set the spark operator identifies notebooks automatically
# and uses papermill.
is_papermill=True,
# script : str
# Path of the script to execute. If a relative path is specified, it
# is interpreted relative to the git-sync folder.
# If an absolute path is specified it is not adapted further.
script="Demo/scripts/my_spark_notebook.ipynb",
# papermill_target_folder: str
# If papermill is supposed to be used, some target folder needs to be defined
# here in order to write the target notebook to the corresponding folder.
# If this is set to "-" the outpout is written to stdout.
papermill_target_folder="s3://internal-airflow-papermill-output/notebooks",
# s3_arguments : dict
# Arguments used to authenticate to S3. Should be used to pass
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and BOTO3_ENDPOINT_URL
# which are required for authentication to external s3 for pushing
# the results to.
s3_arguments={
"AWS_ACCESS_KEY_ID": s3_connection.login,
"AWS_SECRET_ACCESS_KEY": s3_connection.password,
"BOTO3_ENDPOINT_URL": s3_connection.host,
},
# dremio_connections : list of str, optional
# List of connection ids that are used to configure spark.
# The "lakehouse" connection is pre-configured and connects to
# the same lakehouse as Dremio.
dremio_connections=["lakehouse-rest"],
)
output_stdout = STACKITSparkScriptOperator(
task_id="papermill_stdout",
image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
get_logs=True,
is_papermill=True,
script="Demo/scripts/my_spark_notebook.ipynb",
papermill_target_folder="-",
s3_arguments={
"AWS_ACCESS_KEY_ID": s3_connection.login,
"AWS_SECRET_ACCESS_KEY": s3_connection.password,
"BOTO3_ENDPOINT_URL": s3_connection.host,
},
dremio_connections=["lakehouse-rest"],
)
output_stdout >> output_s3
spark_full_configuration()

View file

@ -0,0 +1,46 @@
import pendulum
import base64
from airflow.sdk import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import (
KubernetesPodOperator,
)
from stackit_workflows.kubernetes import K8S, V1, client
from airflow.settings import json
from kubernetes.client import models as k8s
# It is good practice but not required to specify the image to use.
default_kwargs = {
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2"
}
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_11_custom_secret_podoperator",
)
def simple_spark():
k = KubernetesPodOperator(
name="hello-dry-run",
image="schwarzit-xx-sit-ragstudio-docker-local.jfrog.io/airflowrunner:latest",
image_pull_secrets=[K8S.V1LocalObjectReference("internal-dockerpull-secret")],
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
log_events_on_failure=True,
do_xcom_push=True,
#Container resources requests and limits. Mandatory to set
container_resources=K8S.V1ResourceRequirements(
requests={"cpu": "100m", "memory": "50Mi"}, #optional: "ephemeral-storage": "1Gi"
limits={"cpu": "200m", "memory": "100Mi"}, #optional "ephemeral-storage": "2Gi"
),
# All pods launched must run as non-root users. Otherwise they won't start.
security_context=K8S.V1PodSecurityContext(run_as_user=100),
)
return k
simple_spark()

View file

@ -0,0 +1,80 @@
import json
import pendulum
from airflow.sdk import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import (
KubernetesPodOperator,
)
from stackit_workflows.kubernetes import POD_NAMESPACE, K8S
from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator
from stackit_workflows.models import GpuConfig, MLFramework
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_12_on_gpu_node",
)
def kubernetes_operator():
gpu_task = KubernetesPodOperator(
name="gpu-task",
#image=<use spark image with GPU support>,
image="debian",
cmds=["echo"],
arguments=["This task is running in a gpu node"],
task_id="run_task_on_gpu",
# Use affinity parameter to schedule pods on dedicated gpu nodes. In this example the node we want to
# schedule pods on is labelled dedicated=gpu. In this case the pod is REQUIRED to be scheduled on the
# specified node, but it can also be a more relaxed criteria where the schedule is PREFERRED (
# preferred_during_scheduling_ignored_during_execution).
affinity=K8S.V1Affinity(
node_affinity=K8S.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=K8S.V1NodeSelector(
node_selector_terms=[
K8S.V1NodeSelectorTerm(
match_expressions=[
K8S.V1NodeSelectorRequirement(
key='dedicated',
operator='In',
values=['gpu']
)
]
)
]
)
)
),
# When nodes we want to schedule pods on have taints, we need to add the respective tolerations. In this
# example the gpu node has the taint dedicated=gpu:NoSchedule. You can pass a list of such tolerations that
# enables the pod to be scheduled on these tainted nodes dedicated for specific workload.
tolerations=[
K8S.V1Toleration(
key='dedicated',
operator='Equal',
value='gpu',
effect='NoSchedule'
)
],
# All pods launched must run as non-root users. Otherwise they won't start.
security_context=K8S.V1PodSecurityContext(run_as_user=100),
)
gpu_flag_test = STACKITSparkScriptOperator(
task_id="gpu_flag",
gpu=GpuConfig(ml_framework=MLFramework.PYTORCH),
script="Demo/scripts/basic_python_script.py",
image="python:3.12-slim-bullseye",
security_context={"runAsUser": 1000},
)
gpu_task
gpu_flag_test
kubernetes_operator()

View file

@ -0,0 +1,109 @@
import pendulum
import os
from airflow.sdk import dag, task
from airflow.models.connection import Connection
from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator
from stackit_workflows import config
# Airflow Connection "s3_papermill_output" must be configured to write notebook output to S3:
# host = <S3 endpoint URL> (used as BOTO3_ENDPOINT_URL)
# login = <AWS_ACCESS_KEY_ID>
# password = <AWS_SECRET_ACCESS_KEY>
try:
s3_connection = Connection.get_connection_from_secrets("s3_papermill_output")
except Exception:
import warnings
warnings.warn(
"Airflow connection 's3_papermill_output' is not configured. "
"This DAG will be visible but cannot be run until the connection is created. "
"Please create an Airflow connection with conn_id='s3_papermill_output': "
"host=<S3 endpoint URL>, login=<AWS_ACCESS_KEY_ID>, password=<AWS_SECRET_ACCESS_KEY>.",
stacklevel=2,
)
s3_connection = None
@dag(
schedule="0 7 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_13_demo_sales_usecase",
)
def spark_full_configuration():
if s3_connection is None:
# S3 connection not available — define a placeholder task so the DAG
# can still be imported and inspected.
@task()
def setup_required():
raise RuntimeError(
"Airflow connection 's3_papermill_output' is not configured. "
"Please create it with: host=<S3 endpoint URL>, "
"login=<AWS_ACCESS_KEY_ID>, password=<AWS_SECRET_ACCESS_KEY>."
)
setup_required()
return
output_s3 = STACKITSparkScriptOperator(
task_id="save_results_in_s3",
# image : str, optional
# Image of the launched container.
# If not specified, a current spark image maintained by stackit is used.
image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
# get_logs : bool, optional
# Read stdout get the stdout of the container as logs of the tasks. Default: True
get_logs=True,
# is_papermill : bool, optional
# Flag to define if the script defined in the `script` parameter
# is a jupyter notebook that is supposed to be executed using papermill.
# This parameter does not need to be set in order to use papermill. If
# it is not set the spark operator identifies notebooks automatically
# and uses papermill.
is_papermill=True,
# script : str
# Path of the script to execute. If a relative path is specified, it
# is interpreted relative to the git-sync folder.
# If an absolute path is specified it is not adapted further.
script="Demo/scripts/sales_prediction.ipynb",
# papermill_target_folder: str
# If papermill is supposed to be used, some target folder needs to be defined
# here in order to write the target notebook to the corresponding folder.
# If this is set to "-" the outpout is written to stdout.
papermill_target_folder="s3://internal-airflow-papermill-output/notebooks",
# s3_arguments : dict
# Arguments used to authenticate to S3. Should be used to pass
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and BOTO3_ENDPOINT_URL
# which are required for authentication to external s3 for pushing
# the results to.
s3_arguments={
"AWS_ACCESS_KEY_ID": s3_connection.login,
"AWS_SECRET_ACCESS_KEY": s3_connection.password,
"BOTO3_ENDPOINT_URL": s3_connection.host,
},
# dremio_connections : list of str, optional
# List of connection ids that are used to configure spark.
# The "lakehouse" connection is pre-configured and connects to
# the same lakehouse as Dremio.
dremio_connections=["lakehouse-rest"],
)
output_stdout = STACKITSparkScriptOperator(
task_id="run_prediction",
image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
get_logs=True,
is_papermill=True,
script="Demo/scripts/sales_prediction.ipynb",
papermill_target_folder="-",
s3_arguments={
"AWS_ACCESS_KEY_ID": s3_connection.login,
"AWS_SECRET_ACCESS_KEY": s3_connection.password,
"BOTO3_ENDPOINT_URL": s3_connection.host,
},
dremio_connections=["lakehouse-rest"],
)
output_stdout >> output_s3
spark_full_configuration()

View file

@ -0,0 +1,141 @@
import pendulum
from airflow.sdk import dag
from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator, STACKITPythonScriptOperator
from stackit_workflows.airflow_plugin.decorators import stackit
# -------------------------------------------------------------------------
# DEMO DAG: How to use the STACKIT Operators
# -------------------------------------------------------------------------
# This DAG demonstrates the different ways to run workloads on Kubernetes
# using the STACKIT operator suite. It covers three main scenarios:
#
# 1. Running Python code directly with the @stackit decorator
# 2. Running Spark workloads via the @stackit Spark decorator
# 3. Running standalone Python or Spark scripts with the ScriptOperators
#
# Each operator abstracts away boilerplate (like syncing the repo,
# configuring the namespace, injecting context, mounting secrets, etc.),
# so you can focus only on your workload.
# -------------------------------------------------------------------------
# It is good practice (but not required) to specify the image you want to run on.
# By default, the STACKIT Spark image contains Spark, Java, Python, and common libs.
default_kwargs = {
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2"
}
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_14_stackit_operators",
)
def stackit_operators():
# ---------------------------------------------------------------------
# 1) PYTHON DECORATOR
# ---------------------------------------------------------------------
# The @stackit.python_kubernetes_task decorator is the simplest way
# to run Python code in Kubernetes.
#
# Features provided out-of-the-box:
# - Uses a prebuilt Python image with pandas, requests, etc.
# - Injects Airflow context (dag_id, run_id, task_id, etc.) as ENV vars
# - Git-syncs your DAG repo into the pod (so imports just work)
# - Runs in the correct namespace with STACKIT defaults
#
# This is the preferred way if you want to write Python directly inside
# your DAG file without worrying about container setup.
@stackit.python_kubernetes_task()
def stackit_decorated_python_kubernetes():
import os
# Example: import a helper module from the repo
from scripts.my_tools.say_hello import say_hello
say_hello()
# Show how Airflow context is injected as environment variables
task_id = os.environ["AIRFLOW__CONTEXT__TASK__TASK_ID"]
print(f"Task ID: {task_id}")
# Print all relevant injected ENV variables
env_vars = {
key: value
for key, value in os.environ.items()
if key.startswith("AIRFLOW__CONTEXT") or key.startswith("STACKIT__")
}
for key, value in sorted(env_vars.items()):
print(f"{key}: {value}")
# ---------------------------------------------------------------------
# 2) SPARK DECORATOR
# ---------------------------------------------------------------------
# The @stackit.spark_kubernetes_task decorator makes it easy to run
# Spark jobs from Airflow, without setting up Spark-on-K8s manually.
#
# Features:
# - Starts a single-node Spark cluster by default
# - Includes the stackit_spark helper library for easy SparkSession setup
# - Allows you to scale CPU/memory via parameters
#
# This is the preferred way to run Spark transformations as functions.
@stackit.spark_kubernetes_task(**default_kwargs)
def stackit_decorated_spark_kubernetes(random_number: int):
import stackit_spark
import pandas as pd
spark = stackit_spark.get_spark()
df = spark.createDataFrame(pd.DataFrame({"random_number": [random_number]}))
df.show()
# ---------------------------------------------------------------------
# 3) PYTHON SCRIPT OPERATOR
# ---------------------------------------------------------------------
# If you already have a Python script in your repository (instead of
# embedding code directly in the DAG), use STACKITPythonScriptOperator.
#
# Features:
# - Executes a .py file from your DAG repo
# - Still benefits from repo sync and injected Airflow context
# - Works with any image that has Python installed
#
# Note: Unlike the Spark operator, this does not provide the
# stackit_spark helper module, unless your image includes it.
stackit_python_kubernetes = STACKITPythonScriptOperator(
task_id="stackit_python_kubernetes",
script="Demo/scripts/basic_python_script.py", # relative to repo root
)
# ---------------------------------------------------------------------
# 4) SPARK SCRIPT OPERATOR
# ---------------------------------------------------------------------
# Use STACKITSparkScriptOperator if you have Spark jobs stored as
# standalone scripts (in this repo or in another git repo).
#
# Features:
# - Executes a .py Spark job script
# - Supports imports inside the script (thanks to git-sync)
# - Runs on the specified Spark image with configurable resources
#
# For cross-repo jobs, you can set `git_repo`, `git_ref`, etc.
stackit_spark_kubernetes = STACKITSparkScriptOperator(
task_id="stackit_spark_kubernetes",
script="Demo/scripts/my_spark_job_with_imports.py", # relative to repo root
cpu=2,
**default_kwargs,
)
# ---------------------------------------------------------------------
# DAG FLOW
# ---------------------------------------------------------------------
# First run some inline Python, then a Spark function,
# then trigger the script-based operators.
stackit_decorated_python_kubernetes() >> stackit_decorated_spark_kubernetes(12) >> [
stackit_python_kubernetes,
stackit_spark_kubernetes,
]
stackit_operators()

View file

@ -0,0 +1,141 @@
"""
Demo: Data Assets (formerly "Datasets") in Apache Airflow 3
Data Assets let you create data-driven scheduling: one DAG produces/updates
an asset, and another DAG is automatically triggered when that asset changes.
This file defines:
- A "producer" DAG that updates two data assets
- A "consumer" DAG that is scheduled to run whenever BOTH assets are updated
- A "single asset consumer" DAG triggered by a single asset update
Key concepts:
- Asset: Represents a logical data artifact (file, table, topic, etc.)
- schedule=<Asset(...)>: Triggers the DAG when the asset is updated
- outlet: Marks a task as producing/updating an asset
Note on Data Assets vs. S3/external Triggers:
- Data Assets are for Airflow-internal coordination: DAG A produces data,
DAG B reacts. No polling, no cloud API calls, zero cost.
- S3KeySensor / external triggers are for watching files created by
processes OUTSIDE of Airflow (e.g., a third-party ETL tool drops a file
into S3). Those do involve polling and incur API costs.
See: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html
"""
import json
import pendulum
from airflow.sdk import Asset, dag, task
# ---------------------------------------------------------------------------
# Data Assets
#
# Assets are identified by a URI. The URI is purely a LOGICAL LABEL
# Airflow never actually connects to, reads, or polls this URI!
# It is only used as a key to link producer tasks to consumer DAGs.
#
# When a producer task (with outlets=[...]) completes successfully, the
# Airflow scheduler internally records "this asset was updated" in its
# metadata database. Consumer DAGs scheduled on that asset are then
# triggered automatically entirely internal, zero external API calls,
# no polling, no cloud costs.
#
# The URI could just as well be "my-orders" or "daily-report" using
# URIs like "s3://..." is just a convention to document where the data
# actually lives, but Airflow itself ignores the scheme/path.
# ---------------------------------------------------------------------------
orders_asset = Asset("s3://my-stackit-bucket/orders/daily.parquet")
customers_asset = Asset("s3://my-stackit-bucket/customers/latest.csv")
# ===========================================================================
# Producer DAG updates both assets
# ===========================================================================
@dag(
schedule=None, # triggered manually or by an external system
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo", "data-assets"],
dag_id="stackit_15a_data_assets_producer",
)
def data_assets_producer():
@task(outlets=[orders_asset])
def produce_orders():
"""Simulate producing order data and updating the orders asset."""
orders = [
{"order_id": 1001, "amount": 250.00, "customer": "Alice"},
{"order_id": 1002, "amount": 125.50, "customer": "Bob"},
{"order_id": 1003, "amount": 340.75, "customer": "Charlie"},
]
# In a real scenario you would write to S3, a database, etc.
print(f"Produced {len(orders)} orders:")
print(json.dumps(orders, indent=2))
return orders
@task(outlets=[customers_asset])
def produce_customers():
"""Simulate producing customer data and updating the customers asset."""
customers = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
{"name": "Charlie", "email": "charlie@example.com"},
]
print(f"Produced {len(customers)} customers:")
print(json.dumps(customers, indent=2))
return customers
produce_orders() >> produce_customers()
data_assets_producer()
# ===========================================================================
# Consumer DAG 1 triggered when the orders asset is updated
# ===========================================================================
@dag(
schedule=orders_asset, # runs whenever orders_asset is updated
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo", "data-assets"],
dag_id="stackit_15b_data_assets_consumer_orders",
)
def data_assets_consumer_orders():
@task()
def process_orders():
"""React to new order data being available."""
print("New order data detected! Running order processing pipeline...")
print("Aggregating daily totals...")
print("Order processing complete.")
process_orders()
data_assets_consumer_orders()
# ===========================================================================
# Consumer DAG 2 triggered when BOTH assets have been updated
# ===========================================================================
@dag(
schedule=(orders_asset & customers_asset), # requires BOTH assets to be updated
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo", "data-assets"],
dag_id="stackit_15c_data_assets_consumer_combined",
)
def data_assets_consumer_combined():
@task()
def join_orders_and_customers():
"""React when both orders and customers data are refreshed."""
print("Both orders AND customers data updated!")
print("Joining datasets for the combined analytics report...")
print("Combined report generated successfully.")
join_orders_and_customers()
data_assets_consumer_combined()

View file

@ -0,0 +1,36 @@
# Dremio 25 only
def get_nessie_token(tokenendpoint, password):
import requests
# Exchange Dremio PAT to Nessie token
token_request_body = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"scope": "dremio.all",
"subject_token_type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token",
"subject_token": password,
}
x = requests.post(tokenendpoint, data=token_request_body)
x.raise_for_status()
return x.json()["access_token"]
def get_spark_session(host, nessie_token):
import stackit_spark
catalog_name_in_spark = "stackit"
return stackit_spark.get_spark(
additional_config={
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1",
f"spark.sql.catalog.{catalog_name_in_spark}": "org.apache.iceberg.spark.SparkCatalog",
f"spark.sql.catalog.{catalog_name_in_spark}.type": "rest",
f"spark.sql.catalog.{catalog_name_in_spark}.uri": host,
f"spark.sql.catalog.{catalog_name_in_spark}.token": nessie_token,
}
)
if __name__ == "__main__":
get_nessie_token(
"https://dremio-internal.data-platform-dev.stackit.run/oauth/token",
"xxxxx",
)

View file

@ -0,0 +1,150 @@
import time
import pyarrow as pa
from pyarrow import flight
import logging
logger = logging.getLogger(__file__)
def initialize_flight(flight_uri: str, username, password):
"""Initialize flight connection for a configuration"""
client = flight.FlightClient(flight_uri)
headers = []
initial_options = flight.FlightCallOptions(headers=headers)
auth_header = client.authenticate_basic_token(username, password, initial_options)
return client, auth_header
def read_query_arrow(
client: flight.FlightClient,
auth_header: tuple,
query: str,
return_iterator: bool = False,
tqdm=None,
):
"""Load Dremio query to Arrow Table using Flight
Parameters
----------
client: flight.FlightClient
The flight client to be used.
auth_header: (str, str)
The bearer token to be used.
query : str
The dremio query to be executed.
return_iterator : bool, optional
If True, an iterator yielding Arrow Tables is returned instead of the
whole DataFrame.
return_only_info : bool, optional
If True, only the flight info is returned. This is useful if you want to
get the schema of a query without actually loading the data.
Default: False
tqdm : tqdm.tqdm
tqdm used for progress bar.
Returns
-------
tbl : pyarrow.Table
The query result. If return_iterator is True, returns an Iterator yielding
Arrow Tables
"""
options = flight.FlightCallOptions(
headers=[auth_header],
)
flight_desc = flight.FlightDescriptor.for_command(command=query)
flight_info = client.get_flight_info(flight_desc, options)
# Retrieve the result set as a stream of Arrow record batches.
reader = client.do_get(flight_info.endpoints[0].ticket, options)
def _generator(_return_table=False):
pbar = None if tqdm is None else tqdm(desc="Flight Transfer Chunk")
while True:
if pbar is not None:
pbar.update()
try:
batch, _ = reader.read_chunk()
if _return_table:
batch = pa.Table.from_batches([batch])
yield batch
except StopIteration:
break
batches = _generator(_return_table=return_iterator)
if not return_iterator:
t = time.time()
data = pa.Table.from_batches(batches)
logger.debug(f"Loaded query {query} via flight in {time.time()-t:.2f}s.")
return data
else:
return batches
def read_query_pandas(
client: flight.FlightClient,
auth_header: tuple,
query,
pandas_options: dict = None,
return_iterator: bool = False,
tqdm=None,
):
"""Load Dremio query to Pandas DataFrame using Flight
Parameters
----------
client: flight.FlightClient
The flight client to be used.
auth_header: (str, str)
The bearer token to be used.
query : str
The dremio query to be executed.
additional_headers : list, optional
Additional headers to be passed in the ``flight.FlightCallOptions``.
pandas_options : dict , optional
Additional kwargs passed to ``pyarrow._flight.FlightStreamReader.read_pandas``.
return_iterator : bool, optional
If True, an iterator yielding DataFrames is returned instead of the
whole DataFrame.
tqdm : tqdm.tqdm
tqdm used for progress bar.
Returns
-------
df : pandas.DataFrame
The query result. If ``return_iterator`` is True, returns an Iterator yielding
pandas DataFrames.
"""
pandas_options = pandas_options or {}
r = read_query_arrow(
query=query,
auth_header=auth_header,
client=client,
return_iterator=return_iterator,
tqdm=tqdm,
)
if return_iterator:
return (df.to_pandas(**pandas_options) for df in r)
else:
return r.to_pandas(**pandas_options)
if __name__ == "__main__":
client, auth_header = initialize_flight(
flight_uri="grpc+tls://dremio-internal-sql.data-platform-dev.stackit.run:32010",
username="Christian.Thiel_ext@external.mail.schwarz",
password="xxxx",
)
data = read_query_pandas(
client,
auth_header,
query="SELECT 1",
)
print(data)

2
Demo/tools/say_hello.py Normal file
View file

@ -0,0 +1,2 @@
def say_hello():
print("Hello from imported function!")