182 lines
6.5 KiB
Text
182 lines
6.5 KiB
Text
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "8aae8ae7-4cb4-4b66-b325-aa3eabdeb455",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"run_date = \"2025-05-08\"\n",
|
|
"input_sales = \"local.retail_sales\"\n",
|
|
"input_inventory = \"local.retail_inventory\"\n",
|
|
"output_table = \"local.restock_plan\"\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "3598df66-6027-4f02-b7bd-c279b9d20c32",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# create spark session with authentication to the catalog\n",
|
|
"\n",
|
|
"def get_catalog_token(tokenendpoint, password):\n",
|
|
" import requests\n",
|
|
"\n",
|
|
" # Exchange Dremio PAT to Nessie token\n",
|
|
" token_request_body = {\n",
|
|
" \"grant_type\": \"urn:ietf:params:oauth:grant-type:token-exchange\",\n",
|
|
" \"scope\": \"dremio.all\",\n",
|
|
" \"subject_token_type\": \"urn:ietf:params:oauth:token-type:dremio:personal-access-token\",\n",
|
|
" \"subject_token\": password,\n",
|
|
" }\n",
|
|
" x = requests.post(tokenendpoint, data=token_request_body)\n",
|
|
" x.raise_for_status()\n",
|
|
" return x.json()[\"access_token\"]\n",
|
|
"\n",
|
|
"\n",
|
|
"def get_spark_session(host, catalog_token):\n",
|
|
" import stackit_spark\n",
|
|
"\n",
|
|
" catalog_name_in_spark = \"stackit\"\n",
|
|
" return stackit_spark.get_spark(\n",
|
|
" additional_config={\n",
|
|
" \"spark.jars.packages\": \"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1\",\n",
|
|
" f\"spark.sql.catalog.{catalog_name_in_spark}\": \"org.apache.iceberg.spark.SparkCatalog\",\n",
|
|
" f\"spark.sql.catalog.{catalog_name_in_spark}.warehouse\": \"catalog-s3\",\n",
|
|
" f\"spark.sql.catalog.{catalog_name_in_spark}.type\": \"rest\",\n",
|
|
" f\"spark.sql.catalog.{catalog_name_in_spark}.uri\": host,\n",
|
|
" f\"spark.sql.catalog.{catalog_name_in_spark}.token\": catalog_token,\n",
|
|
" }\n",
|
|
" )"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "f9071bfb-1767-47b3-907f-723ce4389d9e",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"catalog_token = get_catalog_token(\"https://dremio-internal.data-platform.stackit.run/oauth/token\", \"JnHzzS1LRFeZw4HIJQNP+iGJereEuCehcZwyGwSxcZPSrX4H7NL6FGqOxf/lRw==\")\n",
|
|
"spark = get_spark_session(\"https://dremio-internal-catalog.data-platform.stackit.run/iceberg/main\", catalog_token)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "8c250ade-1565-4c95-b1ae-134724a2e77b",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"spark.sql(f\"USE stackit\")\n",
|
|
"# Step 1: Create a namespace\n",
|
|
"spark.sql(\"CREATE NAMESPACE IF NOT EXISTS retail_demo\")\n",
|
|
"\n",
|
|
"# Step 2: Create synthetic sales data\n",
|
|
"sales_df = spark.range(100).selectExpr(\n",
|
|
" \"date_add('2025-05-01', cast(rand() * 7 as int)) as sale_date\",\n",
|
|
" \"cast(rand() * 10 + 1 as int) as units_sold\",\n",
|
|
" \"case when cast(rand() * 3 as int) = 0 then 'A123' \"\n",
|
|
" \" when cast(rand() * 3 as int) = 1 then 'B456' \"\n",
|
|
" \" else 'C789' end as product_id\",\n",
|
|
" \"case when cast(rand() * 2 as int) = 0 then '101' else '102' end as store_id\"\n",
|
|
")\n",
|
|
"\n",
|
|
"# Step 3: Create synthetic inventory data\n",
|
|
"inventory_df = spark.range(10).selectExpr(\n",
|
|
" \"'2025-05-08' as inventory_date\",\n",
|
|
" \"cast(rand() * 20 + 10 as int) as current_stock\",\n",
|
|
" \"case when cast(rand() * 3 as int) = 0 then 'A123' \"\n",
|
|
" \" when cast(rand() * 3 as int) = 1 then 'B456' \"\n",
|
|
" \" else 'C789' end as product_id\",\n",
|
|
" \"case when cast(rand() * 2 as int) = 0 then '101' else '102' end as store_id\"\n",
|
|
")\n",
|
|
"\n",
|
|
"# Step 4: Write to Iceberg tables\n",
|
|
"sales_df.write.mode(\"overwrite\").saveAsTable(\"retail_demo.sales_data\")\n",
|
|
"inventory_df.write.mode(\"overwrite\").saveAsTable(\"retail_demo.inventory_data\")\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "5bd53006-ddbf-4ecf-bf3b-fa173cf9893b",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyspark.sql.functions import expr\n",
|
|
"\n",
|
|
"# Load sales and inventory data\n",
|
|
"sales = spark.sql(\"\"\"\n",
|
|
" SELECT store_id, product_id, sale_date, units_sold\n",
|
|
" FROM retail_demo.sales_data\n",
|
|
"\"\"\")\n",
|
|
"\n",
|
|
"inventory = spark.sql(\"\"\"\n",
|
|
" SELECT store_id, product_id, inventory_date, current_stock\n",
|
|
" FROM retail_demo.inventory_data\n",
|
|
"\"\"\")\n",
|
|
"\n",
|
|
"# Step 2: Aggregate demand per store/product (e.g. last 7 days)\n",
|
|
"demand = sales.groupBy(\"store_id\", \"product_id\") \\\n",
|
|
" .agg({\"units_sold\": \"avg\"}) \\\n",
|
|
" .withColumnRenamed(\"avg(units_sold)\", \"predicted_demand\")\n",
|
|
"\n",
|
|
"# Step 3: Join with inventory\n",
|
|
"restock_plan = demand.join(inventory, on=[\"store_id\", \"product_id\"], how=\"inner\")\n",
|
|
"\n",
|
|
"# Step 4: Calculate restock quantity\n",
|
|
"restock_plan = restock_plan.withColumn(\n",
|
|
" \"restock_qty\",\n",
|
|
" expr(\"CASE WHEN predicted_demand - current_stock > 0 THEN int(predicted_demand - current_stock) ELSE 0 END\")\n",
|
|
")\n",
|
|
"\n",
|
|
"# Step 5: Add metadata\n",
|
|
"restock_plan = restock_plan.withColumn(\"run_date\", expr(\"current_date()\"))\n",
|
|
"\n",
|
|
"# Step 6: Save to Iceberg\n",
|
|
"restock_plan.write.mode(\"overwrite\").saveAsTable(\"retail_demo.restock_plan\")\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "5fea9096-b7b3-44d4-91aa-189ac9ea4a33",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Convert the Spark DataFrame to a Pandas DataFrame for better display in Jupyter\n",
|
|
"restock_df = restock_plan.toPandas()\n",
|
|
"\n",
|
|
"# Display the restock plan nicely in Jupyter using HTML table format\n",
|
|
"import IPython.display as display\n",
|
|
"display.display(display.HTML(restock_df.to_html(index=False)))"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.12.7"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|