Create Spark Session¶
In [1]:
from pyspark.sql import SparkSession
import pyspark
import os, sys
# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SPARK_VERSION = '3.4'
ICEBERG_VERSION = "1.9.1"
SUBMIT_ARGS = f"--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION},software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160,org.apache.hadoop:hadoop-aws:3.3.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
print(SUBMIT_ARGS)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergVariantDemo") \
.config(
"spark.jars.packages",
f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},"
f"org.apache.iceberg:iceberg-core:{ICEBERG_VERSION}") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.dev", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.dev.type", "hadoop") \
.config("spark.sql.catalog.dev.warehouse", "file:///Users/soumilshah/Desktop/warehouse/") \
.getOrCreate()
In [2]:
spark
Out[2]:
Create Mock Tables¶
In [3]:
# ----------------------------
# Create Iceberg tables with 5 buckets
# ----------------------------
spark.sql("""
CREATE TABLE IF NOT EXISTS dev.ice_bucketed1_5 (
id BIGINT,
value BIGINT
)
USING iceberg
PARTITIONED BY (bucket(5, id))
TBLPROPERTIES (
'write.format.default'='parquet'
)
""")
spark.sql("""
CREATE TABLE IF NOT EXISTS dev.ice_bucketed2_5 (
id BIGINT,
value BIGINT
)
USING iceberg
PARTITIONED BY (bucket(5, id))
TBLPROPERTIES (
'write.format.default'='parquet'
)
""")
# ----------------------------
# Insert 1 to 10 into ice_bucketed1_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed1_5
SELECT id, id * 10 AS value
FROM range(1, 100)
""")
# ----------------------------
# Insert 1 to 10 into ice_bucketed2_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed2_5
SELECT id, id * 10 AS value
FROM range(1, 100)
""")
Out[3]:
View Data¶
In [4]:
spark.sql("""
SELECT * FROM dev.ice_bucketed1_5
""").show(2)
spark.sql("""
SELECT * FROM dev.ice_bucketed2_5
""").show(2)
Join test¶
NO SPJ¶
In [7]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.enabled", "false")
# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "false")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "false")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
Out[7]:
In [ ]:
In [8]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.enabled", "false")
# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
Out[8]:
Parition from both side does not match¶
In [9]:
# ----------------------------
# Insert 1 to 10 into ice_bucketed2_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed2_5
SELECT id, id * 10 AS value
FROM range(100, 110)
""")
Out[9]:
View table¶
In [10]:
spark.sql("""
SELECT * FROM dev.ice_bucketed1_5 where id > 98
""").show(5)
spark.sql("""
SELECT * FROM dev.ice_bucketed2_5 where id > 98
""").show(5)
Join¶
In [11]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", "false")
# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
Out[11]:
In [ ]:
No comments:
Post a Comment