import pendulum import warnings from airflow.sdk import dag, task from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator from stackit_workflows import config @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["demo","stackit-demo"], dag_id="stackit_05_spark_full_configuration", ) def spark_full_configuration(): # The same arguments are supported by the decorator. STACKITSparkScriptOperator( task_id="spark_full_configuration", # image : str, optional # Image of the launched container. # If not specified, a current spark image maintained by stackit is used. image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2", # cpu : float # CPU Request of the pod. If spark is used with no executors, # this is also used to determine the number of processes (i.e. local[2]). # If executors are used, this is the CPU request of the driver. cpu=4, # env_vars : dict, list of K8S.V1EnvVar, optional # Env-Vars to pass to the pod. Dictionary contains env-vars as keys and # their values as values. (i.e. {'MY_ENV_VAR': 42}). Please note that # all numeric env-vars are cast to string before creating the Kubernetes # Pod Template. env_vars={"MY_ENV_VAR": "my_value"}, # # Not yet supported: # # executor_ephemeral_gb : int, optional # # Size of the ephemeral Volumes added to each spark executor. # # The mount-path in the pod is specified with :paramref:`ephemeral_mount_path`. # # If set, spark.local.dir is set to :paramref:`ephemeral_mount_path`. # # Default: None # executor_ephemeral_gb=5, # # Not yet supported: # # ephemeral_mount_path : str, optional # # Not yet supported: # # ephemeral_mount_path : str, optional # # Path of the ephemeral volume mounts for the driver and the executors. # # Default: "/stackit-tmp" # ephemeral_mount_path="/stackit-tmp", # # Not yet supported: # # driver_ephemeral_gb: int, optional # # Size of the ephemeral Volumes added to the spark driver. # # The mount-path in the pod is specified with :paramref:`ephemeral_mount_path`. # # If not specified, the driver ephemeral volume is set to '1Gi' if any # # executor ephemeral volume is specified. This is to ensure that # # 'spark.local.dir' is set to an existing directory. # # If set, spark.local.dir is set to :paramref:`ephemeral_mount_path`. # executor_cpu : int or float, optional # CPUs to be used by Spark for each executor. Decimal values are allowed. executor_cpu=3.5, # executor_memory_gb : int, optional # Requested memory of the executors in gb. Decimal values are allowed. # If not specified, it is calculated from :paramref:`executor_cpu` as: # ``executor_cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6). executor_memory_gb=10, # executors: int, optional # Number of kubernetes executors. If specified, `spark_mode` defaults to `k8s` # instead of `local`. executors=2, # get_logs : bool, optional # Read stdout get the stdout of the container as logs of the tasks. Default: True get_logs=True, dremio_connections=["lakehouse-rest"], # memory_gb : int or float, optional # Memory request in GB for the (driver) pod. Decimal values are allowed. # If not specified, it is calculated from :paramref:`cpu` as: # ``cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6). memory_gb=10, # name : str, optional # Name of the pod spawned for this task. # We recommend to not change this value. # Default: ``"task---"`` name="ensure_this_is_unique", # provide_context : bool, optional # Provide Airflow context as environment variables. Default: True # Check example 03 for more information. provide_context=True, # container_resources : K8S.V1ResourceRequirements, optional # Currently not supported. Use :paramref:`cpu` and # :paramref:`memory_gb` instead. # script : str # Path of the script to execute. If a relative path is specified, it # is interpreted relative to the git-sync folder. # If an absolute path is specified it is not adapted further. script="Demo/scripts/my_spark_job.py", # Any other arguments as supported by the `KubernetesPodOperator` can be used here. labels={"foo": "bar"}, ) spark_full_configuration()