Thursday, September 4, 2025

SPJ Joins in Iceberg how to use them | Faster Join Avoid Shuffle

demo

Create Spark Session

In [1]:
from pyspark.sql import SparkSession
import pyspark
import os, sys

# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SPARK_VERSION = '3.4'
ICEBERG_VERSION = "1.9.1"

SUBMIT_ARGS = f"--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION},software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160,org.apache.hadoop:hadoop-aws:3.3.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
print(SUBMIT_ARGS)


from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergVariantDemo") \
    .config(
    "spark.jars.packages",
    f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},"
    f"org.apache.iceberg:iceberg-core:{ICEBERG_VERSION}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.dev", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.dev.type", "hadoop") \
    .config("spark.sql.catalog.dev.warehouse", "file:///Users/soumilshah/Desktop/warehouse/") \
    .getOrCreate()
--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.9.1,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160,org.apache.hadoop:hadoop-aws:3.3.4 pyspark-shell
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5c3ef495-fc32-401b-a30d-d6891e613dd7;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.9.1 in central
	found software.amazon.awssdk#bundle;2.20.160 in central
	found software.amazon.eventstream#eventstream;1.0.1 in local-m2-cache
	found software.amazon.awssdk#url-connection-client;2.20.160 in central
	found software.amazon.awssdk#utils;2.20.160 in central
	found org.reactivestreams#reactive-streams;1.0.4 in local-m2-cache
	found software.amazon.awssdk#annotations;2.20.160 in central
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found software.amazon.awssdk#http-client-spi;2.20.160 in central
	found software.amazon.awssdk#metrics-spi;2.20.160 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in spark-list
:: resolution report :: resolve 801ms :: artifacts dl 33ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.9.1 from central in [default]
	org.reactivestreams#reactive-streams;1.0.4 from local-m2-cache in [default]
	org.slf4j#slf4j-api;1.7.30 from local-m2-cache in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from spark-list in [default]
	software.amazon.awssdk#annotations;2.20.160 from central in [default]
	software.amazon.awssdk#bundle;2.20.160 from central in [default]
	software.amazon.awssdk#http-client-spi;2.20.160 from central in [default]
	software.amazon.awssdk#metrics-spi;2.20.160 from central in [default]
	software.amazon.awssdk#url-connection-client;2.20.160 from central in [default]
	software.amazon.awssdk#utils;2.20.160 from central in [default]
	software.amazon.eventstream#eventstream;1.0.1 from local-m2-cache in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   13  |   0   |   0   |   0   ||   13  |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-5c3ef495-fc32-401b-a30d-d6891e613dd7
	confs: [default]
	0 artifacts copied, 13 already retrieved (0kB/20ms)
25/09/04 17:13:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/04 17:13:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
In [2]:
spark
Out[2]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
IcebergVariantDemo

Create Mock Tables

In [3]:
# ----------------------------
# Create Iceberg tables with 5 buckets
# ----------------------------
spark.sql("""
CREATE TABLE IF NOT EXISTS dev.ice_bucketed1_5 (
    id BIGINT,
    value BIGINT
)
USING iceberg
PARTITIONED BY (bucket(5, id))
TBLPROPERTIES (
  'write.format.default'='parquet'
)
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS dev.ice_bucketed2_5 (
    id BIGINT,
    value BIGINT
)
USING iceberg
PARTITIONED BY (bucket(5, id))
TBLPROPERTIES (
  'write.format.default'='parquet'
)
""")

# ----------------------------
# Insert 1 to 10 into ice_bucketed1_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed1_5
SELECT id, id * 10 AS value
FROM range(1, 100)
""")

# ----------------------------
# Insert 1 to 10 into ice_bucketed2_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed2_5
SELECT id, id * 10 AS value
FROM range(1, 100)
""")
                                                                                
Out[3]:
DataFrame[]

View Data

In [4]:
spark.sql("""
SELECT * FROM   dev.ice_bucketed1_5
""").show(2)

spark.sql("""
SELECT * FROM   dev.ice_bucketed2_5
""").show(2)
+---+-----+
| id|value|
+---+-----+
|  3|   30|
|  4|   40|
+---+-----+
only showing top 2 rows

+---+-----+
| id|value|
+---+-----+
|  3|   30|
|  4|   40|
+---+-----+
only showing top 2 rows

Join test

NO SPJ

In [7]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "false")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "false")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")

# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
                                                                                
+---+---+
| id|cnt|
+---+---+
|  1|  1|
|  2|  1|
+---+---+
only showing top 2 rows

                                                                                
Out[7]:
99
In [ ]:

In [8]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")

# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
+---+---+
| id|cnt|
+---+---+
|  1|  1|
|  2|  1|
+---+---+
only showing top 2 rows

Out[8]:
99

Parition from both side does not match

In [9]:
# ----------------------------
# Insert 1 to 10 into ice_bucketed2_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed2_5
SELECT id, id * 10 AS value
FROM range(100, 110)
""")
Out[9]:
DataFrame[]

View table

In [10]:
spark.sql("""
SELECT * FROM   dev.ice_bucketed1_5 where id > 98
""").show(5)

spark.sql("""
SELECT * FROM   dev.ice_bucketed2_5 where id > 98
""").show(5)
+---+-----+
| id|value|
+---+-----+
| 99|  990|
+---+-----+

+---+-----+
| id|value|
+---+-----+
|100| 1000|
|102| 1020|
|106| 1060|
|104| 1040|
|108| 1080|
+---+-----+
only showing top 5 rows

Join

In [11]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")

# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
+---+---+
| id|cnt|
+---+---+
|  1|  1|
|  2|  1|
+---+---+
only showing top 2 rows

Out[11]:
99
In [ ]:

No comments:

Post a Comment

SPJ Joins in Iceberg how to use them | Faster Join Avoid Shuffle

demo Create Spark Session ¶ In [1]: from pyspark.sql import SparkSe...