Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint¶
Create Sample Iceberg Table using Athena DDL¶
CREATE TABLE default.customers (
customer_id STRING,
name STRING,
email STRING,
address STRING,
phone STRING
)
PARTITIONED BY (customer_id)
LOCATION 's3://<BUCKET>/warehouse/customers'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'PARQUET'
);
INSERT INTO default.customers (customer_id, name, email, address, phone) VALUES
('C001', 'John Doe', 'john.doe@email.com', '123 Main St, City A', '555-0101'),
('C002', 'Jane Smith', 'jane.smith@email.com', '456 Oak Rd, City B', '555-0202'),
('C003', 'Bob Johnson', 'bob.johnson@email.com', '789 Pine Ave, City C', '555-0303'),
('C004', 'Alice Brown', 'alice.brown@email.com', '321 Elm St, City D', '555-0404'),
('C005', 'Charlie Davis', 'charlie.davis@email.com', '654 Maple Dr, City E', '555-0505');
Define imports¶
In [8]:
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
catalog_name = "mydatacatalog"
aws_account_id = ""
aws_region = "us-east-1"
Define Packages¶
In [9]:
packages = [
"com.amazonaws:aws-java-sdk-bundle:1.12.661",
"org.apache.hadoop:hadoop-aws:3.3.4",
"software.amazon.awssdk:bundle:2.29.38",
"org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.6.1"
]
define spark conf¶
In [10]:
conf = {
"spark": {
"spark.app.name": "iceberg_lab",
"spark.jars.packages": ",".join(packages),
"spark.sql.defaultCatalog": catalog_name,
"spark.sql.catalog.mydatacatalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.mydatacatalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.mydatacatalog.warehouse": f"s3://{aws_account_id}/iceberg-warehouse/",
"spark.sql.catalog.mydatacatalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
}
}
# Rest of the code remains the same
def create_spark_session(spark_config, protocol):
builder = SparkSession.builder
if protocol == "s3a":
default = {
"spark.hadoop.fs.s3a.access.key": os.getenv("AWS_ACCESS_KEY_ID", ""),
"spark.hadoop.fs.s3a.secret.key": os.getenv("AWS_SECRET_ACCESS_KEY", ""),
"spark.hadoop.fs.s3a.endpoint": f"s3.{aws_region}.amazonaws.com",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
for key, value in default.items():
builder = builder.config(key, value)
for key, value in spark_config.items():
builder = builder.config(key, value)
return builder.getOrCreate()
Create Spark Session¶
In [11]:
spark = create_spark_session(spark_config=conf.get("spark"), protocol="s3a")
In [12]:
spark
Out[12]:
Query data¶
In [13]:
# Show catalogs
print("Showing catalogs:")
spark.sql("SHOW CATALOGS").show()
# Show namespaces
print("\nShowing namespaces in mydatacatalog:")
spark.sql("SHOW NAMESPACES IN mydatacatalog").show()
# Show tables
print("\nShowing tables in mydatacatalog.default:")
spark.sql("SHOW TABLES IN mydatacatalog.default").show()
# Describe customers table
print("\nDescribing mydatacatalog.default.customers table:")
spark.sql("DESCRIBE TABLE mydatacatalog.default.customers").show(truncate=False)
# Select from customers table
print("\nSelecting from mydatacatalog.default.customers table:")
spark.sql("SELECT * FROM mydatacatalog.default.customers LIMIT 5").show()
In [ ]:
No comments:
Post a Comment