Tuesday, January 14, 2025

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

test-tble-bucket-joins

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

Creating table Buckets

In [ ]:
%%bash
aws s3tables create-table-bucket \
    --region us-east-1 \
    --name demo-bucket1
In [20]:
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
os.environ['AWS_ACCESS_KEY_ID']='<>'
os.environ['AWS_SECRET_ACCESS_KEY']="<>"
In [2]:
conf = {
    "spark": {
        "spark.app.name": "iceberg_lab",

        "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.661,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.29.38,com.github.ben-manes.caffeine:caffeine:3.1.8,org.apache.commons:commons-configuration2:2.11.0,software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.3,org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.6.1",
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        
        # Managed Iceberg Catalog (S3-based)
        "spark.sql.catalog.ManagedIcebergCatalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.ManagedIcebergCatalog.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog",
        "spark.sql.catalog.ManagedIcebergCatalog.warehouse": "arn:XXXX/demo-bucket1",
        "spark.sql.catalog.ManagedIcebergCatalog.client.region": "us-east-1",

        "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
        "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",

        # Unmanaged Iceberg Catalog (Hadoop-based)
        "spark.sql.catalog.UnManagedIcebergCatalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.UnManagedIcebergCatalog.type": "hadoop",
        "spark.sql.catalog.UnManagedIcebergCatalog.warehouse": "file:////warehouse/"
    }
}

Create Spark Session

In [3]:
def create_spark_session(spark_config, protocol):
    builder = SparkSession.builder

    if protocol == "s3a":
        default = {
            "spark.hadoop.fs.s3a.access.key": os.getenv("AWS_ACCESS_KEY_ID", ""),
            "spark.hadoop.fs.s3a.secret.key": os.getenv("AWS_SECRET_ACCESS_KEY", ""),
            "spark.sql.catalog.ManagedIcebergCatalog.s3.endpoint": "https://s3.amazonaws.com",
            "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
            "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
        }
        for key, value in default.items():
            builder = builder.config(key, value)

    for key, value in spark_config.items():
        builder = builder.config(key, value)
    return builder.getOrCreate()
In [ ]:
spark = create_spark_session(spark_config=conf.get("spark"), protocol="s3a")
In [21]:
spark
Out[21]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
iceberg_lab

Creating Managed Iceberg tables

In [7]:
spark.sql("SHOW NAMESPACES IN ManagedIcebergCatalog").show()
spark.sql("CREATE NAMESPACE IF NOT EXISTS ManagedIcebergCatalog.demo_poc_bucket1")


spark.sql("""
CREATE TABLE IF NOT EXISTS ManagedIcebergCatalog.demo_poc_bucket1.customers (
  customer_id INT,
  name STRING,
  email STRING
) USING iceberg
""")

spark.sql("""
INSERT INTO ManagedIcebergCatalog.demo_poc_bucket1.customers VALUES
  (1, 'John Doe', 'john@example.com'),
  (2, 'Jane Smith', 'jane@example.com'),
  (3, 'Bob Johnson', 'bob@example.com'),
  (4, 'Alice Brown', 'alice@example.com'),
  (5, 'Charlie Davis', 'charlie@example.com')
""")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+----------------+
|       namespace|
+----------------+
|demo_poc_bucket1|
+----------------+

                                                                                
Out[7]:
DataFrame[]

Unmanaged Iceberg

In [16]:
spark.sql("""
CREATE TABLE IF NOT EXISTS UnManagedIcebergCatalog.default.orders (
  order_id INT,
  customer_id INT,
  order_date DATE,
  total_amount DECIMAL(10, 2)
) USING iceberg
""")

spark.sql("""
INSERT INTO  UnManagedIcebergCatalog.default.orders VALUES
  (101, 1, DATE '2023-01-15', 150.50),
  (102, 2, DATE '2023-01-16', 200.75),
  (103, 3, DATE '2023-01-17', 75.25),
  (104, 1, DATE '2023-01-18', 300.00),
  (105, 4, DATE '2023-01-19', 125.00),
  (106, 2, DATE '2023-01-20', 180.50),
  (107, 5, DATE '2023-01-21', 95.75)
""")
Out[16]:
DataFrame[]

Join Test Both catalog

In [17]:
spark.sql("SHOW NAMESPACES IN UnManagedIcebergCatalog").show()
spark.sql("SHOW NAMESPACES IN ManagedIcebergCatalog").show()
+---------+
|namespace|
+---------+
|  default|
+---------+

+----------------+
|       namespace|
+----------------+
|demo_poc_bucket1|
+----------------+

In [18]:
result = spark.sql("""
SELECT 
    c.customer_id,
    c.name,
    c.email,
    o.order_id,
    o.order_date,
    o.total_amount
FROM ManagedIcebergCatalog.demo_poc_bucket1.customers c
JOIN UnManagedIcebergCatalog.default.orders o
ON c.customer_id = o.customer_id
ORDER BY c.customer_id, o.order_date
""")

result.show()
[Stage 3:=============================>                             (1 + 1) / 2]
+-----------+-------------+-------------------+--------+----------+------------+
|customer_id|         name|              email|order_id|order_date|total_amount|
+-----------+-------------+-------------------+--------+----------+------------+
|          1|     John Doe|   john@example.com|     101|2023-01-15|      150.50|
|          1|     John Doe|   john@example.com|     104|2023-01-18|      300.00|
|          2|   Jane Smith|   jane@example.com|     102|2023-01-16|      200.75|
|          2|   Jane Smith|   jane@example.com|     106|2023-01-20|      180.50|
|          3|  Bob Johnson|    bob@example.com|     103|2023-01-17|       75.25|
|          4|  Alice Brown|  alice@example.com|     105|2023-01-19|      125.00|
|          5|Charlie Davis|charlie@example.com|     107|2023-01-21|       95.75|
+-----------+-------------+-------------------+--------+----------+------------+

                                                                                

Clean Up

In [19]:
%%bash
aws s3tables delete-table \
    --table-bucket-arn "XXX" \
    --namespace demo_poc_bucket1 \
    --name customers
In [ ]:

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...