import json import pendulum from airflow.sdk import dag, task from airflow.providers.cncf.kubernetes.operators.pod import ( KubernetesPodOperator, ) from stackit_workflows.kubernetes import POD_NAMESPACE, K8S from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator from stackit_workflows.models import GpuConfig, MLFramework @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["demo","stackit-demo"], dag_id="stackit_12_on_gpu_node", ) def kubernetes_operator(): gpu_task = KubernetesPodOperator( name="gpu-task", #image=, image="debian", cmds=["echo"], arguments=["This task is running in a gpu node"], task_id="run_task_on_gpu", # Use affinity parameter to schedule pods on dedicated gpu nodes. In this example the node we want to # schedule pods on is labelled dedicated=gpu. In this case the pod is REQUIRED to be scheduled on the # specified node, but it can also be a more relaxed criteria where the schedule is PREFERRED ( # preferred_during_scheduling_ignored_during_execution). affinity=K8S.V1Affinity( node_affinity=K8S.V1NodeAffinity( required_during_scheduling_ignored_during_execution=K8S.V1NodeSelector( node_selector_terms=[ K8S.V1NodeSelectorTerm( match_expressions=[ K8S.V1NodeSelectorRequirement( key='dedicated', operator='In', values=['gpu'] ) ] ) ] ) ) ), # When nodes we want to schedule pods on have taints, we need to add the respective tolerations. In this # example the gpu node has the taint dedicated=gpu:NoSchedule. You can pass a list of such tolerations that # enables the pod to be scheduled on these tainted nodes dedicated for specific workload. tolerations=[ K8S.V1Toleration( key='dedicated', operator='Equal', value='gpu', effect='NoSchedule' ) ], # All pods launched must run as non-root users. Otherwise they won't start. security_context=K8S.V1PodSecurityContext(run_as_user=100), ) gpu_flag_test = STACKITSparkScriptOperator( task_id="gpu_flag", gpu=GpuConfig(ml_framework=MLFramework.PYTORCH), script="Demo/scripts/basic_python_script.py", image="python:3.12-slim-bullseye", security_context={"runAsUser": 1000}, ) gpu_task gpu_flag_test kubernetes_operator()