Saturday, April 4, 2026

Getting Started with LocalStack and AWS S3Tables

Jupyter Notebook — generated with runcell
 

Getting Started with LocalStack and AWS S3Tables

 
`
export LOCALSTACK_AUTH_TOKEN=""

mkdir -p .localstack-volume
docker rm -f localstack-main 2>/dev/null || true

docker run -d \
--name localstack-main \
-p 4566:4566 \
-p 4510-4559:4510-4559 \
-e LOCALSTACK_AUTH_TOKEN \
-e SERVICES=s3tables \
-v /var/run/docker.sock:/var/run/docker.sock \
-v "$(pwd)/.localstack-volume:/var/lib/localstack" \
localstack/localstack
`
In [32]:
! java --version
openjdk 11.0.25 2024-10-15 OpenJDK Runtime Environment Homebrew (build 11.0.25+0) OpenJDK 64-Bit Server VM Homebrew (build 11.0.25+0, mixed mode)
 
#### Export ENV Variable
In [33]:
import os
os.environ["AWS_ACCESS_KEY_ID"] = "test"
os.environ["AWS_SECRET_ACCESS_KEY"] = "test"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
os.environ['AWS_ENDPOINT_URL']="http://localhost.localstack.cloud:4566"
os.environ["AWS_PROFILE"] = "localstack"
 

Create S3Tables

In [34]:
! aws  s3tables list-table-buckets
{ "tableBuckets": [] }
In [35]:
! aws s3tables create-table-bucket --name my-table-bucket
{ "arn": "arn:aws:s3tables:us-east-1:000000000000:bucket/my-table-bucket" }
 

🧪 Step 1 — Add Iceberg + AWS Packages

In [36]:
def iceberg_runtime_package():
    import pyspark
    major, minor = map(int, pyspark.__version__.split(".")[:2])
    scala = "2.12" if major < 4 else "2.13"
    iceberg_ver = "1.10.0"
    return f"org.apache.iceberg:iceberg-spark-runtime-{major}.{minor}_{scala}:{iceberg_ver}"

ICEBERG_RUNTIME = iceberg_runtime_package()

packages = ",".join([
    "com.amazonaws:aws-java-sdk-bundle:1.12.661",
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "software.amazon.awssdk:bundle:2.29.38",
    "com.github.ben-manes.caffeine:caffeine:3.1.8",
    "org.apache.commons:commons-configuration2:2.11.0",
    "software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.8",
    ICEBERG_RUNTIME,
])

os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages {packages} pyspark-shell"
print("Using Iceberg package:", ICEBERG_RUNTIME)
Using Iceberg package: org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.10.0
 

🧪 Step 2 — Create Spark Session


In [37]:
from pyspark.sql import SparkSession
import os

# Config (ensure these are already set)
TABLE_BUCKET_ARN = "arn:aws:s3tables:us-east-1:000000000000:bucket/my-table-bucket"
CATALOG_NAME = "ManagedIcebergCatalog"

spark = (
    SparkSession.builder.appName("LocalStackS3TablesIceberg")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config(f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", TABLE_BUCKET_ARN)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.s3.endpoint", os.getenv('AWS_ENDPOINT_URL'))  # <-- fixed
    .config(f"spark.sql.catalog.{CATALOG_NAME}.s3.path-style-access", "true")
    .config(f"spark.sql.catalog.{CATALOG_NAME}.s3.access-key-id", os.getenv("AWS_ACCESS_KEY_ID"))
    .config(f"spark.sql.catalog.{CATALOG_NAME}.s3.secret-access-key", os.getenv("AWS_SECRET_ACCESS_KEY"))
    .getOrCreate()
)

print("✅ Spark Session Created")
✅ Spark Session Created
 

Perform Actions

In [38]:
cat = CATALOG_NAME
ns = "my_namespace"
table = "demo_table1"

spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {CATALOG_NAME}.{ns}")
print("[ok] CREATE NAMESPACE IF NOT EXISTS", f"{CATALOG_NAME}.{ns}")
[ok] CREATE NAMESPACE IF NOT EXISTS ManagedIcebergCatalog.my_namespace
In [39]:
spark.sql(
        f"""
        CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{ns}.{table} (
            id STRING,
            name STRING,
            region STRING
        )
        USING iceberg
        PARTITIONED BY (region)
        TBLPROPERTIES ('format-version' = '2')
        """
    ).show()
++ || ++ ++
In [40]:
! aws  s3tables list-table-buckets
{ "tableBuckets": [ { "arn": "arn:aws:s3tables:us-east-1:000000000000:bucket/my-table-bucket", "name": "my-table-bucket", "ownerAccountId": "000000000000", "createdAt": "2026-04-04T14:16:27.393500+00:00" } ] }
In [41]:
! aws  s3tables list-tables --table-bucket-arn arn:aws:s3tables:us-east-1:000000000000:bucket/my-table-bucket
{ "tables": [ { "namespace": [ "my_namespace" ], "name": "demo_table1", "type": "customer", "tableARN": "arn:aws:s3tables:us-east-1:000000000000:bucket/my-table-bucket/table/demo_table1", "createdAt": "2026-04-04T14:17:29.839974+00:00", "modifiedAt": "2026-04-04T14:17:29.839974+00:00" } ] }
 
In [42]:
spark.sql(
        f"""
        INSERT INTO {cat}.{ns}.{table} (id, name, region) VALUES
        ('1', 'Alice', 'us-east-1'),
        ('2', 'Bob', 'eu-west-1')
        """
    )
print("[ok] INSERT INTO ... (id, name, region) VALUES ...")

spark.sql(f"SELECT * FROM {cat}.{ns}.{table} ORDER BY id").show(truncate=False)
print("[ok] SELECT * ORDER BY id")
[ok] INSERT INTO ... (id, name, region) VALUES ... +---+-----+---------+ |id |name |region | +---+-----+---------+ |1 |Alice|us-east-1| |2 |Bob |eu-west-1| +---+-----+---------+ [ok] SELECT * ORDER BY id
In [ ]:
Exported with runcell — convert notebooks to HTML or PDF anytime at runcell.dev.

Thursday, September 4, 2025

SPJ Joins in Iceberg how to use them | Faster Join Avoid Shuffle

demo

Create Spark Session

In [1]:
from pyspark.sql import SparkSession
import pyspark
import os, sys

# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SPARK_VERSION = '3.4'
ICEBERG_VERSION = "1.9.1"

SUBMIT_ARGS = f"--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION},software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160,org.apache.hadoop:hadoop-aws:3.3.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
print(SUBMIT_ARGS)


from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergVariantDemo") \
    .config(
    "spark.jars.packages",
    f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},"
    f"org.apache.iceberg:iceberg-core:{ICEBERG_VERSION}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.dev", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.dev.type", "hadoop") \
    .config("spark.sql.catalog.dev.warehouse", "file:///Users/soumilshah/Desktop/warehouse/") \
    .getOrCreate()
--packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.9.1,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160,org.apache.hadoop:hadoop-aws:3.3.4 pyspark-shell
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5c3ef495-fc32-401b-a30d-d6891e613dd7;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.9.1 in central
	found software.amazon.awssdk#bundle;2.20.160 in central
	found software.amazon.eventstream#eventstream;1.0.1 in local-m2-cache
	found software.amazon.awssdk#url-connection-client;2.20.160 in central
	found software.amazon.awssdk#utils;2.20.160 in central
	found org.reactivestreams#reactive-streams;1.0.4 in local-m2-cache
	found software.amazon.awssdk#annotations;2.20.160 in central
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found software.amazon.awssdk#http-client-spi;2.20.160 in central
	found software.amazon.awssdk#metrics-spi;2.20.160 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in spark-list
:: resolution report :: resolve 801ms :: artifacts dl 33ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.9.1 from central in [default]
	org.reactivestreams#reactive-streams;1.0.4 from local-m2-cache in [default]
	org.slf4j#slf4j-api;1.7.30 from local-m2-cache in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from spark-list in [default]
	software.amazon.awssdk#annotations;2.20.160 from central in [default]
	software.amazon.awssdk#bundle;2.20.160 from central in [default]
	software.amazon.awssdk#http-client-spi;2.20.160 from central in [default]
	software.amazon.awssdk#metrics-spi;2.20.160 from central in [default]
	software.amazon.awssdk#url-connection-client;2.20.160 from central in [default]
	software.amazon.awssdk#utils;2.20.160 from central in [default]
	software.amazon.eventstream#eventstream;1.0.1 from local-m2-cache in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   13  |   0   |   0   |   0   ||   13  |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-5c3ef495-fc32-401b-a30d-d6891e613dd7
	confs: [default]
	0 artifacts copied, 13 already retrieved (0kB/20ms)
25/09/04 17:13:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/04 17:13:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
In [2]:
spark
Out[2]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
IcebergVariantDemo

Create Mock Tables

In [3]:
# ----------------------------
# Create Iceberg tables with 5 buckets
# ----------------------------
spark.sql("""
CREATE TABLE IF NOT EXISTS dev.ice_bucketed1_5 (
    id BIGINT,
    value BIGINT
)
USING iceberg
PARTITIONED BY (bucket(5, id))
TBLPROPERTIES (
  'write.format.default'='parquet'
)
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS dev.ice_bucketed2_5 (
    id BIGINT,
    value BIGINT
)
USING iceberg
PARTITIONED BY (bucket(5, id))
TBLPROPERTIES (
  'write.format.default'='parquet'
)
""")

# ----------------------------
# Insert 1 to 10 into ice_bucketed1_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed1_5
SELECT id, id * 10 AS value
FROM range(1, 100)
""")

# ----------------------------
# Insert 1 to 10 into ice_bucketed2_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed2_5
SELECT id, id * 10 AS value
FROM range(1, 100)
""")
                                                                                
Out[3]:
DataFrame[]

View Data

In [4]:
spark.sql("""
SELECT * FROM   dev.ice_bucketed1_5
""").show(2)

spark.sql("""
SELECT * FROM   dev.ice_bucketed2_5
""").show(2)
+---+-----+
| id|value|
+---+-----+
|  3|   30|
|  4|   40|
+---+-----+
only showing top 2 rows

+---+-----+
| id|value|
+---+-----+
|  3|   30|
|  4|   40|
+---+-----+
only showing top 2 rows

Join test

NO SPJ

In [7]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "false")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "false")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")

# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
                                                                                
+---+---+
| id|cnt|
+---+---+
|  1|  1|
|  2|  1|
+---+---+
only showing top 2 rows

                                                                                
Out[7]:
99
In [ ]:

In [8]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")

# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
+---+---+
| id|cnt|
+---+---+
|  1|  1|
|  2|  1|
+---+---+
only showing top 2 rows

Out[8]:
99

Parition from both side does not match

In [9]:
# ----------------------------
# Insert 1 to 10 into ice_bucketed2_5
# ----------------------------
spark.sql("""
INSERT INTO dev.ice_bucketed2_5
SELECT id, id * 10 AS value
FROM range(100, 110)
""")
Out[9]:
DataFrame[]

View table

In [10]:
spark.sql("""
SELECT * FROM   dev.ice_bucketed1_5 where id > 98
""").show(5)

spark.sql("""
SELECT * FROM   dev.ice_bucketed2_5 where id > 98
""").show(5)
+---+-----+
| id|value|
+---+-----+
| 99|  990|
+---+-----+

+---+-----+
| id|value|
+---+-----+
|100| 1000|
|102| 1020|
|106| 1060|
|104| 1040|
|108| 1080|
+---+-----+
only showing top 5 rows

Join

In [11]:
# ----------------------------
# SPJ / bucketing configuration
# ----------------------------
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Enable Iceberg bucketing/SPJ
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")

# ----------------------------
# Join query with SPJ
# ----------------------------
df_join = spark.sql("""
SELECT a.id, COUNT(*) AS cnt
FROM dev.ice_bucketed1_5 a
JOIN dev.ice_bucketed2_5 b
ON a.id = b.id
GROUP BY a.id
ORDER BY a.id
""")
df_join.show(2)
df_join.count()
+---+---+
| id|cnt|
+---+---+
|  1|  1|
|  2|  1|
+---+---+
only showing top 2 rows

Out[11]:
99
In [ ]:

Getting Started with LocalStack and AWS S3Tables

Jupyter Notebook — generated with runcell   Getti...