150 lines
No EOL
5.8 KiB
Python
150 lines
No EOL
5.8 KiB
Python
import pendulum
|
|
|
|
from airflow.sdk import dag, task, Connection
|
|
from stackit_workflows.airflow_plugin.decorators import stackit
|
|
|
|
default_kwargs = {
|
|
"image": "schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2"
|
|
}
|
|
|
|
try:
|
|
lakehouse_connection = Connection.get("lakehouse-rest")
|
|
except Exception:
|
|
import warnings
|
|
warnings.warn(
|
|
"Airflow connection 'lakehouse-rest' is not configured. "
|
|
"This DAG will be visible but cannot be run until the connection is created. "
|
|
"Please create an Airflow connection with conn_id='lakehouse-rest' — "
|
|
"see the comments in Demo/06-lakehouse.py for the required fields.",
|
|
stacklevel=2,
|
|
)
|
|
lakehouse_connection = None
|
|
|
|
@dag(
|
|
schedule=None,
|
|
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
|
catchup=False,
|
|
tags=["demo","stackit-demo"],
|
|
dag_id="stackit_08_table_maintenance",
|
|
)
|
|
def table_maintenance():
|
|
"""
|
|
This DAG gives an example implementation on how you can operationalize your table maintenance using Airflow and Dremio.
|
|
There are two examples: One for the OPTIMZE/expire_snapshot and one for the VACUUM/rewrite_data_files command.
|
|
Full reference of all available iceberg optmization procedures can be found here:https://iceberg.apache.org/docs/1.8.0/spark-procedures/.
|
|
For further information refer to the StackIT Data Platform User Documentation: https://itdoc.schwarz/display/STACKIT/Dremio+-+Iceberg+Table+Maintenance+Guide.
|
|
"""
|
|
|
|
@stackit.spark_kubernetes_task(**default_kwargs, dremio_connections=["lakehouse-rest"])
|
|
def vacuum_table_using_spark():
|
|
"""
|
|
Each write/update/delete/upsert/compaction in Iceberg produces a new snapshot while keeping the old data
|
|
and metadata around for snapshot isolation and time travel. The expire_snapshots procedure can be used
|
|
to remove older snapshots and their files which are no longer needed.
|
|
"""
|
|
import stackit_spark
|
|
import os
|
|
|
|
for k, v in os.environ.items():
|
|
print(f"{k}: {v}")
|
|
|
|
catalog_name_in_spark = "catalog-s3"
|
|
target_table = "DEMO.user"
|
|
|
|
spark = stackit_spark.get_spark()
|
|
spark.sql(f"USE `{catalog_name_in_spark}`")
|
|
|
|
result = spark.sql(f"CALL `{catalog_name_in_spark}`.system.expire_snapshots( \
|
|
table => 'DEMO.user', \
|
|
retain_last => 100 \
|
|
)")
|
|
|
|
result.show()
|
|
|
|
@stackit.spark_kubernetes_task(**default_kwargs, dremio_connections=["lakehouse-rest"])
|
|
def rewrite_data_files():
|
|
"""
|
|
Iceberg tracks each data file in a table. More data files leads to more metadata stored in manifest files,
|
|
and small data files causes an unnecessary amount of metadata and less efficient queries from file open costs.
|
|
Iceberg can compact data files in parallel using Spark with the rewriteDataFiles action.
|
|
This will combine small files into larger files to reduce metadata overhead and runtime file open cost.
|
|
"""
|
|
import stackit_spark
|
|
import os
|
|
|
|
for k, v in os.environ.items():
|
|
print(f"{k}: {v}")
|
|
|
|
catalog_name_in_spark = "catalog-s3"
|
|
target_table = "DEMO.user"
|
|
|
|
spark = stackit_spark.get_spark(
|
|
additional_config={
|
|
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1"
|
|
})
|
|
spark.sql(f"USE `{catalog_name_in_spark}`")
|
|
|
|
result = spark.sql(f"CALL `{catalog_name_in_spark}`.system.rewrite_data_files(table => 'DEMO.user')")
|
|
|
|
result.show()
|
|
|
|
|
|
@task(task_id="vacuum_catalog_using_dremio")
|
|
def vacuum_catalog_using_dremio():
|
|
import sys
|
|
import pathlib
|
|
|
|
sys.path.append(str(pathlib.Path(__file__).parent))
|
|
from tools.execute_dremio_query import initialize_flight, read_query_pandas
|
|
from airflow.hooks.base import BaseHook
|
|
|
|
dremio_connection = BaseHook.get_connection("dremio-flight")
|
|
print(f"Host: {dremio_connection.host}")
|
|
print(f"Login: {dremio_connection.login}")
|
|
print(f"Password: {dremio_connection.password}")
|
|
print(f"Catalog: {dremio_connection.schema}")
|
|
client, auth = initialize_flight(
|
|
flight_uri=dremio_connection.host,
|
|
username=dremio_connection.login,
|
|
password=dremio_connection.password,
|
|
)
|
|
data = read_query_pandas(
|
|
client,
|
|
auth,
|
|
query=f'VACUUM CATALOG "{dremio_connection.schema}";',
|
|
)
|
|
print(data)
|
|
|
|
@task(task_id="optimize_table_using_dremio")
|
|
def optimize_table_using_dremio():
|
|
import sys
|
|
import pathlib
|
|
|
|
sys.path.append(str(pathlib.Path(__file__).parent))
|
|
from tools.execute_dremio_query import initialize_flight, read_query_pandas
|
|
from airflow.hooks.base import BaseHook
|
|
|
|
dremio_connection = BaseHook.get_connection("dremio-flight")
|
|
print(f"Host: {dremio_connection.host}")
|
|
print(f"Login: {dremio_connection.login}")
|
|
print(f"Password: {dremio_connection.password}")
|
|
print(f"Catalog: {dremio_connection.schema}")
|
|
client, auth = initialize_flight(
|
|
flight_uri=dremio_connection.host,
|
|
username=dremio_connection.login,
|
|
password=dremio_connection.password,
|
|
)
|
|
data = read_query_pandas(
|
|
client,
|
|
auth,
|
|
query=f'''
|
|
OPTIMIZE TABLE "{dremio_connection.schema}"."E2E_TEST"."bronze_user"
|
|
REWRITE DATA (MIN_FILE_SIZE_MB=100, MAX_FILE_SIZE_MB=1000, TARGET_FILE_SIZE_MB=512);
|
|
''',
|
|
)
|
|
print(data)
|
|
|
|
vacuum_table_using_spark() >> rewrite_data_files() >> [vacuum_catalog_using_dremio(), optimize_table_using_dremio()]
|
|
|
|
table_maintenance()
|
|
|