Saturday, January 25, 2025

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

Create Sample Iceberg Table using Athena DDL

CREATE TABLE default.customers (
    customer_id STRING,
    name STRING,
    email STRING,
    address STRING,
    phone STRING
)
PARTITIONED BY (customer_id)
    LOCATION 's3://<BUCKET>/warehouse/customers'
    TBLPROPERTIES (
    'table_type' = 'ICEBERG',
    'format' = 'PARQUET'
);

INSERT INTO default.customers (customer_id, name, email, address, phone) VALUES
('C001', 'John Doe', 'john.doe@email.com', '123 Main St, City A', '555-0101'),
('C002', 'Jane Smith', 'jane.smith@email.com', '456 Oak Rd, City B', '555-0202'),
('C003', 'Bob Johnson', 'bob.johnson@email.com', '789 Pine Ave, City C', '555-0303'),
('C004', 'Alice Brown', 'alice.brown@email.com', '321 Elm St, City D', '555-0404'),
('C005', 'Charlie Davis', 'charlie.davis@email.com', '654 Maple Dr, City E', '555-0505');

image.png

Define imports

In [8]:
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
catalog_name = "mydatacatalog"
aws_account_id = ""
aws_region = "us-east-1"

Define Packages

In [9]:
packages = [
    "com.amazonaws:aws-java-sdk-bundle:1.12.661",
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "software.amazon.awssdk:bundle:2.29.38",
    "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.6.1"
]

define spark conf

In [10]:
conf = {
    "spark": {
        "spark.app.name": "iceberg_lab",
        "spark.jars.packages": ",".join(packages),
        "spark.sql.defaultCatalog": catalog_name,
        "spark.sql.catalog.mydatacatalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.mydatacatalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.mydatacatalog.warehouse": f"s3://{aws_account_id}/iceberg-warehouse/",
        "spark.sql.catalog.mydatacatalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    }
}

# Rest of the code remains the same

def create_spark_session(spark_config, protocol):
    builder = SparkSession.builder

    if protocol == "s3a":
        default = {
            "spark.hadoop.fs.s3a.access.key": os.getenv("AWS_ACCESS_KEY_ID", ""),
            "spark.hadoop.fs.s3a.secret.key": os.getenv("AWS_SECRET_ACCESS_KEY", ""),
            "spark.hadoop.fs.s3a.endpoint": f"s3.{aws_region}.amazonaws.com",
            "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
            "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
        }
        for key, value in default.items():
            builder = builder.config(key, value)

    for key, value in spark_config.items():
        builder = builder.config(key, value)
    return builder.getOrCreate()

Create Spark Session

In [11]:
spark = create_spark_session(spark_config=conf.get("spark"), protocol="s3a")
In [12]:
spark
Out[12]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
iceberg_lab

Query data

In [13]:
# Show catalogs
print("Showing catalogs:")
spark.sql("SHOW CATALOGS").show()

# Show namespaces
print("\nShowing namespaces in mydatacatalog:")
spark.sql("SHOW NAMESPACES IN mydatacatalog").show()

# Show tables
print("\nShowing tables in mydatacatalog.default:")
spark.sql("SHOW TABLES IN mydatacatalog.default").show()

# Describe customers table
print("\nDescribing mydatacatalog.default.customers table:")
spark.sql("DESCRIBE TABLE mydatacatalog.default.customers").show(truncate=False)

# Select from customers table
print("\nSelecting from mydatacatalog.default.customers table:")
spark.sql("SELECT * FROM mydatacatalog.default.customers LIMIT 5").show()
Showing catalogs:
+-------------+
|      catalog|
+-------------+
|mydatacatalog|
|spark_catalog|
+-------------+


Showing namespaces in mydatacatalog:
+---------+
|namespace|
+---------+
|  default|
+---------+


Showing tables in mydatacatalog.default:
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|customers|      false|
+---------+---------+-----------+


Describing mydatacatalog.default.customers table:
+-----------------------+---------+-------+
|col_name               |data_type|comment|
+-----------------------+---------+-------+
|customer_id            |string   |null   |
|name                   |string   |null   |
|email                  |string   |null   |
|address                |string   |null   |
|phone                  |string   |null   |
|# Partition Information|         |       |
|# col_name             |data_type|comment|
|customer_id            |string   |null   |
+-----------------------+---------+-------+


Selecting from mydatacatalog.default.customers table:
                                                                                
+-----------+-------------+--------------------+--------------------+--------+
|customer_id|         name|               email|             address|   phone|
+-----------+-------------+--------------------+--------------------+--------+
|       C001|     John Doe|  john.doe@email.com| 123 Main St, City A|555-0101|
|       C002|   Jane Smith|jane.smith@email.com|  456 Oak Rd, City B|555-0202|
|       C003|  Bob Johnson|bob.johnson@email...|789 Pine Ave, City C|555-0303|
|       C004|  Alice Brown|alice.brown@email...|  321 Elm St, City D|555-0404|
|       C005|Charlie Davis|charlie.davis@ema...|654 Maple Dr, City E|555-0505|
+-----------+-------------+--------------------+--------------------+--------+

In [ ]:

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...