Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs¶
Creating table Buckets¶
In [ ]:
%%bash
aws s3tables create-table-bucket \
--region us-east-1 \
--name demo-bucket1
In [20]:
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
os.environ['AWS_ACCESS_KEY_ID']='<>'
os.environ['AWS_SECRET_ACCESS_KEY']="<>"
In [2]:
conf = {
"spark": {
"spark.app.name": "iceberg_lab",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.661,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.29.38,com.github.ben-manes.caffeine:caffeine:3.1.8,org.apache.commons:commons-configuration2:2.11.0,software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.3,org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.6.1",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
# Managed Iceberg Catalog (S3-based)
"spark.sql.catalog.ManagedIcebergCatalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.ManagedIcebergCatalog.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog",
"spark.sql.catalog.ManagedIcebergCatalog.warehouse": "arn:XXXX/demo-bucket1",
"spark.sql.catalog.ManagedIcebergCatalog.client.region": "us-east-1",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
# Unmanaged Iceberg Catalog (Hadoop-based)
"spark.sql.catalog.UnManagedIcebergCatalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.UnManagedIcebergCatalog.type": "hadoop",
"spark.sql.catalog.UnManagedIcebergCatalog.warehouse": "file:////warehouse/"
}
}
Create Spark Session¶
In [3]:
def create_spark_session(spark_config, protocol):
builder = SparkSession.builder
if protocol == "s3a":
default = {
"spark.hadoop.fs.s3a.access.key": os.getenv("AWS_ACCESS_KEY_ID", ""),
"spark.hadoop.fs.s3a.secret.key": os.getenv("AWS_SECRET_ACCESS_KEY", ""),
"spark.sql.catalog.ManagedIcebergCatalog.s3.endpoint": "https://s3.amazonaws.com",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
for key, value in default.items():
builder = builder.config(key, value)
for key, value in spark_config.items():
builder = builder.config(key, value)
return builder.getOrCreate()
In [ ]:
spark = create_spark_session(spark_config=conf.get("spark"), protocol="s3a")
In [21]:
spark
Out[21]:
Creating Managed Iceberg tables¶
In [7]:
spark.sql("SHOW NAMESPACES IN ManagedIcebergCatalog").show()
spark.sql("CREATE NAMESPACE IF NOT EXISTS ManagedIcebergCatalog.demo_poc_bucket1")
spark.sql("""
CREATE TABLE IF NOT EXISTS ManagedIcebergCatalog.demo_poc_bucket1.customers (
customer_id INT,
name STRING,
email STRING
) USING iceberg
""")
spark.sql("""
INSERT INTO ManagedIcebergCatalog.demo_poc_bucket1.customers VALUES
(1, 'John Doe', 'john@example.com'),
(2, 'Jane Smith', 'jane@example.com'),
(3, 'Bob Johnson', 'bob@example.com'),
(4, 'Alice Brown', 'alice@example.com'),
(5, 'Charlie Davis', 'charlie@example.com')
""")
Out[7]:
Unmanaged Iceberg¶
In [16]:
spark.sql("""
CREATE TABLE IF NOT EXISTS UnManagedIcebergCatalog.default.orders (
order_id INT,
customer_id INT,
order_date DATE,
total_amount DECIMAL(10, 2)
) USING iceberg
""")
spark.sql("""
INSERT INTO UnManagedIcebergCatalog.default.orders VALUES
(101, 1, DATE '2023-01-15', 150.50),
(102, 2, DATE '2023-01-16', 200.75),
(103, 3, DATE '2023-01-17', 75.25),
(104, 1, DATE '2023-01-18', 300.00),
(105, 4, DATE '2023-01-19', 125.00),
(106, 2, DATE '2023-01-20', 180.50),
(107, 5, DATE '2023-01-21', 95.75)
""")
Out[16]:
Join Test Both catalog¶
In [17]:
spark.sql("SHOW NAMESPACES IN UnManagedIcebergCatalog").show()
spark.sql("SHOW NAMESPACES IN ManagedIcebergCatalog").show()
In [18]:
result = spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
o.order_id,
o.order_date,
o.total_amount
FROM ManagedIcebergCatalog.demo_poc_bucket1.customers c
JOIN UnManagedIcebergCatalog.default.orders o
ON c.customer_id = o.customer_id
ORDER BY c.customer_id, o.order_date
""")
result.show()
Clean Up¶
In [19]:
%%bash
aws s3tables delete-table \
--table-bucket-arn "XXX" \
--namespace demo_poc_bucket1 \
--name customers
In [ ]:
No comments:
Post a Comment