Tuesday, January 14, 2025

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

test-tble-bucket-joins

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

Creating table Buckets

In [ ]:
%%bash
aws s3tables create-table-bucket \
    --region us-east-1 \
    --name demo-bucket1
In [20]:
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
os.environ['AWS_ACCESS_KEY_ID']='<>'
os.environ['AWS_SECRET_ACCESS_KEY']="<>"
In [2]:
conf = {
    "spark": {
        "spark.app.name": "iceberg_lab",

        "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.661,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.29.38,com.github.ben-manes.caffeine:caffeine:3.1.8,org.apache.commons:commons-configuration2:2.11.0,software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.3,org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.6.1",
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        
        # Managed Iceberg Catalog (S3-based)
        "spark.sql.catalog.ManagedIcebergCatalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.ManagedIcebergCatalog.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog",
        "spark.sql.catalog.ManagedIcebergCatalog.warehouse": "arn:XXXX/demo-bucket1",
        "spark.sql.catalog.ManagedIcebergCatalog.client.region": "us-east-1",

        "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
        "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",

        # Unmanaged Iceberg Catalog (Hadoop-based)
        "spark.sql.catalog.UnManagedIcebergCatalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.UnManagedIcebergCatalog.type": "hadoop",
        "spark.sql.catalog.UnManagedIcebergCatalog.warehouse": "file:////warehouse/"
    }
}

Create Spark Session

In [3]:
def create_spark_session(spark_config, protocol):
    builder = SparkSession.builder

    if protocol == "s3a":
        default = {
            "spark.hadoop.fs.s3a.access.key": os.getenv("AWS_ACCESS_KEY_ID", ""),
            "spark.hadoop.fs.s3a.secret.key": os.getenv("AWS_SECRET_ACCESS_KEY", ""),
            "spark.sql.catalog.ManagedIcebergCatalog.s3.endpoint": "https://s3.amazonaws.com",
            "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
            "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
        }
        for key, value in default.items():
            builder = builder.config(key, value)

    for key, value in spark_config.items():
        builder = builder.config(key, value)
    return builder.getOrCreate()
In [ ]:
spark = create_spark_session(spark_config=conf.get("spark"), protocol="s3a")
In [21]:
spark
Out[21]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
iceberg_lab

Creating Managed Iceberg tables

In [7]:
spark.sql("SHOW NAMESPACES IN ManagedIcebergCatalog").show()
spark.sql("CREATE NAMESPACE IF NOT EXISTS ManagedIcebergCatalog.demo_poc_bucket1")


spark.sql("""
CREATE TABLE IF NOT EXISTS ManagedIcebergCatalog.demo_poc_bucket1.customers (
  customer_id INT,
  name STRING,
  email STRING
) USING iceberg
""")

spark.sql("""
INSERT INTO ManagedIcebergCatalog.demo_poc_bucket1.customers VALUES
  (1, 'John Doe', 'john@example.com'),
  (2, 'Jane Smith', 'jane@example.com'),
  (3, 'Bob Johnson', 'bob@example.com'),
  (4, 'Alice Brown', 'alice@example.com'),
  (5, 'Charlie Davis', 'charlie@example.com')
""")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+----------------+
|       namespace|
+----------------+
|demo_poc_bucket1|
+----------------+

                                                                                
Out[7]:
DataFrame[]

Unmanaged Iceberg

In [16]:
spark.sql("""
CREATE TABLE IF NOT EXISTS UnManagedIcebergCatalog.default.orders (
  order_id INT,
  customer_id INT,
  order_date DATE,
  total_amount DECIMAL(10, 2)
) USING iceberg
""")

spark.sql("""
INSERT INTO  UnManagedIcebergCatalog.default.orders VALUES
  (101, 1, DATE '2023-01-15', 150.50),
  (102, 2, DATE '2023-01-16', 200.75),
  (103, 3, DATE '2023-01-17', 75.25),
  (104, 1, DATE '2023-01-18', 300.00),
  (105, 4, DATE '2023-01-19', 125.00),
  (106, 2, DATE '2023-01-20', 180.50),
  (107, 5, DATE '2023-01-21', 95.75)
""")
Out[16]:
DataFrame[]

Join Test Both catalog

In [17]:
spark.sql("SHOW NAMESPACES IN UnManagedIcebergCatalog").show()
spark.sql("SHOW NAMESPACES IN ManagedIcebergCatalog").show()
+---------+
|namespace|
+---------+
|  default|
+---------+

+----------------+
|       namespace|
+----------------+
|demo_poc_bucket1|
+----------------+

In [18]:
result = spark.sql("""
SELECT 
    c.customer_id,
    c.name,
    c.email,
    o.order_id,
    o.order_date,
    o.total_amount
FROM ManagedIcebergCatalog.demo_poc_bucket1.customers c
JOIN UnManagedIcebergCatalog.default.orders o
ON c.customer_id = o.customer_id
ORDER BY c.customer_id, o.order_date
""")

result.show()
[Stage 3:=============================>                             (1 + 1) / 2]
+-----------+-------------+-------------------+--------+----------+------------+
|customer_id|         name|              email|order_id|order_date|total_amount|
+-----------+-------------+-------------------+--------+----------+------------+
|          1|     John Doe|   john@example.com|     101|2023-01-15|      150.50|
|          1|     John Doe|   john@example.com|     104|2023-01-18|      300.00|
|          2|   Jane Smith|   jane@example.com|     102|2023-01-16|      200.75|
|          2|   Jane Smith|   jane@example.com|     106|2023-01-20|      180.50|
|          3|  Bob Johnson|    bob@example.com|     103|2023-01-17|       75.25|
|          4|  Alice Brown|  alice@example.com|     105|2023-01-19|      125.00|
|          5|Charlie Davis|charlie@example.com|     107|2023-01-21|       95.75|
+-----------+-------------+-------------------+--------+----------+------------+

                                                                                

Clean Up

In [19]:
%%bash
aws s3tables delete-table \
    --table-bucket-arn "XXX" \
    --namespace demo_poc_bucket1 \
    --name customers
In [ ]:

No comments:

Post a Comment

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

test-tble-bucket-joins Learn How to configure your Spark Session to Join Managed (S...