workflows-example-dags/Demo/stackit-05-spark-full-configuration.py
2026-05-28 17:44:11 +02:00

100 lines
4.9 KiB
Python

import pendulum
import warnings
from airflow.sdk import dag, task
from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator
from stackit_workflows import config
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["demo","stackit-demo"],
dag_id="stackit_05_spark_full_configuration",
)
def spark_full_configuration():
# The same arguments are supported by the decorator.
STACKITSparkScriptOperator(
task_id="spark_full_configuration",
# image : str, optional
# Image of the launched container.
# If not specified, a current spark image maintained by stackit is used.
image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
# cpu : float
# CPU Request of the pod. If spark is used with no executors,
# this is also used to determine the number of processes (i.e. local[2]).
# If executors are used, this is the CPU request of the driver.
cpu=4,
# env_vars : dict, list of K8S.V1EnvVar, optional
# Env-Vars to pass to the pod. Dictionary contains env-vars as keys and
# their values as values. (i.e. {'MY_ENV_VAR': 42}). Please note that
# all numeric env-vars are cast to string before creating the Kubernetes
# Pod Template.
env_vars={"MY_ENV_VAR": "my_value"},
# # Not yet supported:
# # executor_ephemeral_gb : int, optional
# # Size of the ephemeral Volumes added to each spark executor.
# # The mount-path in the pod is specified with :paramref:`ephemeral_mount_path`.
# # If set, spark.local.dir is set to :paramref:`ephemeral_mount_path`.
# # Default: None
# executor_ephemeral_gb=5,
# # Not yet supported:
# # ephemeral_mount_path : str, optional
# # Not yet supported:
# # ephemeral_mount_path : str, optional
# # Path of the ephemeral volume mounts for the driver and the executors.
# # Default: "/stackit-tmp"
# ephemeral_mount_path="/stackit-tmp",
# # Not yet supported:
# # driver_ephemeral_gb: int, optional
# # Size of the ephemeral Volumes added to the spark driver.
# # The mount-path in the pod is specified with :paramref:`ephemeral_mount_path`.
# # If not specified, the driver ephemeral volume is set to '1Gi' if any
# # executor ephemeral volume is specified. This is to ensure that
# # 'spark.local.dir' is set to an existing directory.
# # If set, spark.local.dir is set to :paramref:`ephemeral_mount_path`.
# executor_cpu : int or float, optional
# CPUs to be used by Spark for each executor. Decimal values are allowed.
executor_cpu=3.5,
# executor_memory_gb : int, optional
# Requested memory of the executors in gb. Decimal values are allowed.
# If not specified, it is calculated from :paramref:`executor_cpu` as:
# ``executor_cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6).
executor_memory_gb=10,
# executors: int, optional
# Number of kubernetes executors. If specified, `spark_mode` defaults to `k8s`
# instead of `local`.
executors=2,
# get_logs : bool, optional
# Read stdout get the stdout of the container as logs of the tasks. Default: True
get_logs=True,
dremio_connections=["lakehouse-rest"],
# memory_gb : int or float, optional
# Memory request in GB for the (driver) pod. Decimal values are allowed.
# If not specified, it is calculated from :paramref:`cpu` as:
# ``cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6).
memory_gb=10,
# name : str, optional
# Name of the pod spawned for this task.
# We recommend to not change this value.
# Default: ``"task-<dag_id>-<task_id>-<hash>"``
name="ensure_this_is_unique",
# provide_context : bool, optional
# Provide Airflow context as environment variables. Default: True
# Check example 03 for more information.
provide_context=True,
# container_resources : K8S.V1ResourceRequirements, optional
# Currently not supported. Use :paramref:`cpu` and
# :paramref:`memory_gb` instead.
# script : str
# Path of the script to execute. If a relative path is specified, it
# is interpreted relative to the git-sync folder.
# If an absolute path is specified it is not adapted further.
script="Demo/scripts/my_spark_job.py",
# Any other arguments as supported by the `KubernetesPodOperator` can be used here.
labels={"foo": "bar"},
)
spark_full_configuration()