Test to see if you can join two managed Iceberg tables in different S3 table buckets and how you should configure the Spark session.¶
Creating table Buckets¶
In [ ]:
%%bash
aws s3tables create-table-bucket \
--region us-east-1 \
--name demo-bucket1
aws s3tables create-table-bucket \
--region us-east-1 \
--name demo-bucket2
In [2]:
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
packages = [
"com.amazonaws:aws-java-sdk-bundle:1.12.661",
"org.apache.hadoop:hadoop-aws:3.3.4",
"software.amazon.awssdk:bundle:2.29.38",
"com.github.ben-manes.caffeine:caffeine:3.1.8",
"org.apache.commons:commons-configuration2:2.11.0",
"software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.3",
"org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.6.1"
]
Creating Two Tables in Each Buckets¶
In [21]:
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
WAREHOUSE_PATH = "arn:aws:s3tables:us-east-1:<ACCOUNT>:bucket/demo-bucket1"
spark = SparkSession.builder \
.appName("iceberg_lab") \
.config("spark.jars.packages", ",".join(packages)) \
.config("spark.sql.catalog.catalog1", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.catalog1.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.catalog1.warehouse", WAREHOUSE_PATH) \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.defaultCatalog", "catalog1") \
.config("spark.sql.catalog.catalog1.client.region", "us-east-1") \
.getOrCreate()
spark.sql("SHOW NAMESPACES IN catalog1").show()
spark.sql("CREATE NAMESPACE IF NOT EXISTS catalog1.demo_poc_bucket1")
spark.sql("""
CREATE TABLE IF NOT EXISTS catalog1.demo_poc_bucket1.customers (
customer_id INT,
name STRING,
email STRING
) USING iceberg
""")
spark.sql("""
INSERT INTO catalog1.demo_poc_bucket1.customers VALUES
(1, 'John Doe', 'john@example.com'),
(2, 'Jane Smith', 'jane@example.com'),
(3, 'Bob Johnson', 'bob@example.com'),
(4, 'Alice Brown', 'alice@example.com'),
(5, 'Charlie Davis', 'charlie@example.com')
""")
spark.stop()
In [22]:
print("pk")
Table Buckets 2¶
In [3]:
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
WAREHOUSE_PATH = "arn:aws:s3tables:us-east-1:<ACCOUNT>:bucket/demo-bucket2"
spark = SparkSession.builder \
.appName("iceberg_lab") \
.config("spark.jars.packages", ",".join(packages)) \
.config("spark.sql.catalog.catalog2", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.catalog2.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.catalog2.warehouse", WAREHOUSE_PATH) \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.defaultCatalog", "catalog2") \
.config("spark.sql.catalog.catalog2.client.region", "us-east-1") \
.getOrCreate()
spark.sql("SHOW NAMESPACES IN catalog2").show()
spark.sql("CREATE NAMESPACE IF NOT EXISTS catalog2.demo_poc_bucket2")
spark.sql("""
CREATE TABLE IF NOT EXISTS catalog2.demo_poc_bucket2.orders (
order_id INT,
customer_id INT,
order_date DATE,
total_amount DECIMAL(10, 2)
) USING iceberg
""")
spark.sql("""
INSERT INTO catalog2.demo_poc_bucket2.orders VALUES
(101, 1, DATE '2023-01-15', 150.50),
(102, 2, DATE '2023-01-16', 200.75),
(103, 3, DATE '2023-01-17', 75.25),
(104, 1, DATE '2023-01-18', 300.00),
(105, 4, DATE '2023-01-19', 125.00),
(106, 2, DATE '2023-01-20', 180.50),
(107, 5, DATE '2023-01-21', 95.75)
""")
Out[3]:
Join Test Both catalog¶
In [1]:
from pyspark.sql import SparkSession
import os
def create_spark_session(catalogs):
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
packages = [
"com.amazonaws:aws-java-sdk-bundle:1.12.661",
"org.apache.hadoop:hadoop-aws:3.3.4",
"software.amazon.awssdk:bundle:2.29.38",
"com.github.ben-manes.caffeine:caffeine:3.1.8",
"org.apache.commons:commons-configuration2:2.11.0",
"software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.3",
"org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.6.1"
]
builder = SparkSession.builder \
.appName("iceberg_lab") \
.config("spark.jars.packages", ",".join(packages)) \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
for catalog in catalogs:
catalog_name = catalog['catalog_name']
builder = builder \
.config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.warehouse", catalog['arn']) \
.config(f"spark.sql.catalog.{catalog_name}.client.region", "us-east-1")
builder = builder \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.config("spark.sql.defaultCatalog", "spark_catalog")
spark = builder.getOrCreate()
for catalog in catalogs:
spark.sql(f"SHOW NAMESPACES IN {catalog['catalog_name']}").show()
return spark
# Example usage:
catalogs = [
{
"catalog_name": "catalog1",
"arn": "arn:aws:s3tables:us-east-1:<ACCOUNT>:bucket/demo-bucket1"
},
{
"catalog_name": "catalog2",
"arn": "arn:aws:s3tables:us-east-1:<ACCOUNT>:bucket/demo-bucket2"
}
]
spark = create_spark_session(catalogs)
In [ ]:
spark
In [3]:
spark.sql("SHOW NAMESPACES IN catalog1").show()
spark.sql("SHOW NAMESPACES IN catalog2").show()
In [2]:
result = spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
o.order_id,
o.order_date,
o.total_amount
FROM catalog1.demo_poc_bucket1.customers c
JOIN catalog2.demo_poc_bucket2.orders o
ON c.customer_id = o.customer_id
ORDER BY c.customer_id, o.order_date
""")
result.show()
Clean Up¶
In [3]:
%%bash
aws s3tables delete-table \
--table-bucket-arn "arn:aws:s3tables:us-east-1:<ACCOUNT_ID>:bucket/demo-bucket1" \
--namespace <NAMESPACE_1> \
--name customers
aws s3tables delete-table \
--table-bucket-arn "arn:aws:s3tables:us-east-1:<ACCOUNT_ID>:bucket/demo-bucket2" \
--namespace <NAMESPACE_2> \
--name orders
aws s3tables delete-namespace \
--table-bucket-arn "arn:aws:s3tables:us-east-1:<ACCOUNT_ID>:bucket/demo-bucket1" \
--name <NAMESPACE_1>
aws s3tables delete-namespace \
--table-bucket-arn "arn:aws:s3tables:us-east-1:<ACCOUNT_ID>:bucket/demo-bucket2" \
--name <NAMESPACE_2>
aws s3tables delete-table-bucket \
--region us-east-1 \
--table-bucket-arn "arn:aws:s3tables:us-east-1:<ACCOUNT_ID>:bucket/demo-bucket1"
aws s3tables delete-table-bucket \
--region us-east-1 \
--table-bucket-arn "arn:aws:s3tables:us-east-1:<ACCOUNT_ID>:bucket/demo-bucket2"
No comments:
Post a Comment