How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide¶
from pyspark.sql import SparkSession
import pyspark
import os, sys
import datetime
from datetime import datetime
# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SPARK_VERSION = '3.4'
ICEBERG_VERSION = '1.3.0'
SUBMIT_ARGS = f"--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Define the local warehouse path
local_warehouse_path = "/Users/soumilshah/IdeaProjects/SparkProject/iceberg"
spark = SparkSession.builder \
.appName("IcebergReadExample") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.dev", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.dev.type", "hadoop") \
.config("spark.sql.catalog.dev.warehouse", local_warehouse_path) \
.getOrCreate()
spark
SparkSession - in-memory
Step 2: Create the Marketing Campaign Table in Iceberg¶
# Drop the table if it exists and create a new one
spark.sql("DROP TABLE IF EXISTS dev.default.marketing_campaign")
# Create the marketing_campaign table
spark.sql("""
CREATE TABLE IF NOT EXISTS dev.default.marketing_campaign (
campaign_id INT,
campaign_name STRING,
status STRING,
budget DOUBLE,
target_demographic STRING
) USING iceberg
""")
spark.sql(f"""
ALTER TABLE dev.default.marketing_campaign
DROP BRANCH IF EXISTS audit
""").show(truncate=False)
++ || ++ ++
Step 3: Insert Initial Data into the Main Branch¶
Next, we insert some sample campaigns into the main branch.
# Insert initial campaign data into the main branch
spark.sql("""
INSERT INTO dev.default.marketing_campaign VALUES
(1, 'Spring Sale', 'Active', 15000.00, 'All'),
(2, 'Summer Promotion', 'Active', 20000.00, 'Adults'),
(3, 'Fall Discounts', 'Inactive', 10000.00, 'Families')
""")
DataFrame[]
Step 4: Create an Audit Branch¶
Now, we will create a new audit branch for making changes to our campaign data.
# Create a new branch for auditing
spark.sql("ALTER TABLE dev.default.marketing_campaign CREATE BRANCH IF NOT EXISTS audit_branch")
24/11/03 10:17:05 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
DataFrame[]
Step 5: Make Changes in the Audit Branch¶
Here, we can insert new campaigns and update existing ones in the audit branch.
# Insert and update records in the audit branch
spark.sql("""
INSERT INTO dev.default.marketing_campaign.branch_audit_branch VALUES
(4, 'Winter Clearance', 'Pending', 12000.00, 'Seniors')
""")
DataFrame[]
Step 6: View Changes Before Publishing¶
Before merging, let’s view the records in both branches
spark.sql(f"SELECT * FROM dev.default.marketing_campaign.refs").show(truncate=False)
# Show records in the audit branch (before merge)
print("*** AUDIT BRANCH ***")
spark.sql("SELECT * FROM dev.default.marketing_campaign.branch_audit_branch").show()
# Show records in the main branch (before merge)
print("*** MAIN BRANCH BEFORE MERGE ***")
spark.sql("SELECT * FROM dev.default.marketing_campaign").show()
+------------+------+-------------------+-----------------------+---------------------+----------------------+ |name |type |snapshot_id |max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms| +------------+------+-------------------+-----------------------+---------------------+----------------------+ |audit_branch|BRANCH|7795978767973798506|null |null |null | |main |BRANCH|6952653409920777163|null |null |null | +------------+------+-------------------+-----------------------+---------------------+----------------------+ *** AUDIT BRANCH *** +-----------+----------------+--------+-------+------------------+ |campaign_id| campaign_name| status| budget|target_demographic| +-----------+----------------+--------+-------+------------------+ | 4|Winter Clearance| Pending|12000.0| Seniors| | 1| Spring Sale| Active|15000.0| All| | 2|Summer Promotion| Active|20000.0| Adults| | 3| Fall Discounts|Inactive|10000.0| Families| +-----------+----------------+--------+-------+------------------+ *** MAIN BRANCH BEFORE MERGE *** +-----------+----------------+--------+-------+------------------+ |campaign_id| campaign_name| status| budget|target_demographic| +-----------+----------------+--------+-------+------------------+ | 1| Spring Sale| Active|15000.0| All| | 2|Summer Promotion| Active|20000.0| Adults| | 3| Fall Discounts|Inactive|10000.0| Families| +-----------+----------------+--------+-------+------------------+
Step 7: Merge Changes from the Audit Branch to the Main Branch¶
Once the changes are validated, we can merge the updates from the audit branch into the main branch.
table_name = "marketing_campaign"
# Perform merge
merge_query = f"""
MERGE INTO dev.default.{table_name}.branch_main t
USING dev.default.{table_name}.branch_audit_branch s
ON t.campaign_id = s.campaign_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
"""
spark.sql(merge_query)
DataFrame[]
Step 8: Verify the Merge and Clean Up the Audit Branch¶
spark.sql(f"SELECT * FROM dev.default.marketing_campaign.refs").show(truncate=False)
delete_branch_query = f"""
ALTER TABLE dev.default.marketing_campaign DROP BRANCH IF EXISTS audit_branch
"""
spark.sql(delete_branch_query)
spark.sql(f"SELECT * FROM dev.default.marketing_campaign.refs").show(truncate=False)
+------------+------+-------------------+-----------------------+---------------------+----------------------+ |name |type |snapshot_id |max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms| +------------+------+-------------------+-----------------------+---------------------+----------------------+ |audit_branch|BRANCH|7795978767973798506|null |null |null | |main |BRANCH|6605149902935913243|null |null |null | +------------+------+-------------------+-----------------------+---------------------+----------------------+ +----+------+-------------------+-----------------------+---------------------+----------------------+ |name|type |snapshot_id |max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms| +----+------+-------------------+-----------------------+---------------------+----------------------+ |main|BRANCH|6605149902935913243|null |null |null | +----+------+-------------------+-----------------------+---------------------+----------------------+
24/11/03 10:17:07 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
Step 9: Final View of Main Table¶
# Final view of the main table after merge and cleanup
print("*** FINAL PRODUCT CATALOG ***")
spark.sql("SELECT * FROM dev.default.marketing_campaign").show()
*** FINAL PRODUCT CATALOG *** +-----------+----------------+--------+-------+------------------+ |campaign_id| campaign_name| status| budget|target_demographic| +-----------+----------------+--------+-------+------------------+ | 1| Spring Sale| Active|15000.0| All| | 2|Summer Promotion| Active|20000.0| Adults| | 3| Fall Discounts|Inactive|10000.0| Families| | 4|Winter Clearance| Pending|12000.0| Seniors| +-----------+----------------+--------+-------+------------------+