import pendulum import os from airflow.sdk import dag, task from airflow.models.connection import Connection from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator from stackit_workflows import config # Airflow Connection "s3_papermill_output" must be configured to write notebook output to S3: # host = (used as BOTO3_ENDPOINT_URL) # login = # password = try: s3_connection = Connection.get_connection_from_secrets("s3_papermill_output") except Exception: import warnings warnings.warn( "Airflow connection 's3_papermill_output' is not configured. " "This DAG will be visible but cannot be run until the connection is created. " "Please create an Airflow connection with conn_id='s3_papermill_output': " "host=, login=, password=.", stacklevel=2, ) s3_connection = None @dag( schedule="0 7 * * *", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["demo","stackit-demo"], dag_id="stackit_13_demo_sales_usecase", ) def spark_full_configuration(): if s3_connection is None: # S3 connection not available — define a placeholder task so the DAG # can still be imported and inspected. @task() def setup_required(): raise RuntimeError( "Airflow connection 's3_papermill_output' is not configured. " "Please create it with: host=, " "login=, password=." ) setup_required() return output_s3 = STACKITSparkScriptOperator( task_id="save_results_in_s3", # image : str, optional # Image of the launched container. # If not specified, a current spark image maintained by stackit is used. image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2", # get_logs : bool, optional # Read stdout get the stdout of the container as logs of the tasks. Default: True get_logs=True, # is_papermill : bool, optional # Flag to define if the script defined in the `script` parameter # is a jupyter notebook that is supposed to be executed using papermill. # This parameter does not need to be set in order to use papermill. If # it is not set the spark operator identifies notebooks automatically # and uses papermill. is_papermill=True, # script : str # Path of the script to execute. If a relative path is specified, it # is interpreted relative to the git-sync folder. # If an absolute path is specified it is not adapted further. script="Demo/scripts/sales_prediction.ipynb", # papermill_target_folder: str # If papermill is supposed to be used, some target folder needs to be defined # here in order to write the target notebook to the corresponding folder. # If this is set to "-" the outpout is written to stdout. papermill_target_folder="s3://internal-airflow-papermill-output/notebooks", # s3_arguments : dict # Arguments used to authenticate to S3. Should be used to pass # AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and BOTO3_ENDPOINT_URL # which are required for authentication to external s3 for pushing # the results to. s3_arguments={ "AWS_ACCESS_KEY_ID": s3_connection.login, "AWS_SECRET_ACCESS_KEY": s3_connection.password, "BOTO3_ENDPOINT_URL": s3_connection.host, }, # dremio_connections : list of str, optional # List of connection ids that are used to configure spark. # The "lakehouse" connection is pre-configured and connects to # the same lakehouse as Dremio. dremio_connections=["lakehouse-rest"], ) output_stdout = STACKITSparkScriptOperator( task_id="run_prediction", image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2", get_logs=True, is_papermill=True, script="Demo/scripts/sales_prediction.ipynb", papermill_target_folder="-", s3_arguments={ "AWS_ACCESS_KEY_ID": s3_connection.login, "AWS_SECRET_ACCESS_KEY": s3_connection.password, "BOTO3_ENDPOINT_URL": s3_connection.host, }, dremio_connections=["lakehouse-rest"], ) output_stdout >> output_s3 spark_full_configuration()