Lets Migrate iceberg Tables from one Catalog to Another Simple Hands on Labs¶
Spin up this container¶
services:
metastore_db:
image: postgres:11
hostname: metastore_db
ports:
- 5432:5432
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore
command: ["postgres", "-c", "wal_level=logical"]
healthcheck:
test: ["CMD", "psql", "-U", "hive", "-c", "SELECT 1"]
interval: 10s
timeout: 5s
retries: 5
Define Imports and packages¶
In [1]:
import os
import os
import sys
from pyspark.sql import SparkSession
os.environ['AWS_ACCESS_KEY_ID']=''
os.environ['AWS_SECRET_ACCESS_KEY']=""
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
os.environ["PYSPARK_PYTHON"] = sys.executable
SPARK_VERSION = '3.4'
ICEBERG_VERSION = '1.6.1'
SUBMIT_ARGS = (
f"--packages "
f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},"
f"software.amazon.awssdk:bundle:2.20.160,"
f"software.amazon.awssdk:url-connection-client:2.20.160,"
f"org.postgresql:postgresql:42.7.2,"
f"org.apache.hadoop:hadoop-aws:3.3.4 "
f"pyspark-shell"
)
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
Define Spark Conf for Both the Catalog¶
In [9]:
conf = {
"spark": {
"spark.app.name": "iceberg_lab",
"spark.jars.packages": ",".join([
f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION}",
"software.amazon.awssdk:bundle:2.20.160",
"software.amazon.awssdk:url-connection-client:2.20.160",
"org.postgresql:postgresql:42.7.2",
"org.apache.hadoop:hadoop-aws:3.3.4"
]),
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.access.key": os.getenv("AWS_ACCESS_KEY_ID"),
"fs.s3a.secret.key":os.getenv("AWS_SECRET_ACCESS_KEY"),
"spark.hadoop.fs.s3a.endpoint": "s3.amazonaws.com",
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
"spark.sql.catalog.glueCatalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.glueCatalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.glueCatalog.warehouse": "s3a://soumil-dev-bucket-1995/warehouse/",
"spark.sql.catalog.jdbcCatalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.jdbcCatalog.type": "jdbc",
"spark.sql.catalog.jdbcCatalog.uri": "jdbc:postgresql://localhost:5432/metastore",
"spark.sql.catalog.jdbcCatalog.jdbc.user": "hive",
"spark.sql.catalog.jdbcCatalog.jdbc.password": "hive",
"spark.sql.catalog.jdbcCatalog.warehouse": "s3a://soumil-dev-bucket-1995/warehouse1/"
}
}
Create Spark Session¶
In [ ]:
def create_spark_session(spark_config):
builder = SparkSession.builder
for key, value in spark_config.items():
builder = builder.config(key, value)
return builder.getOrCreate()
spark = create_spark_session(spark_config=conf["spark"])
In [3]:
spark
Out[3]:
Lets create Table in Glue Catalog¶
In [5]:
# Create the Iceberg table (without specifying LOCATION)
spark.sql("""
CREATE TABLE IF NOT EXISTS glueCatalog.default.demo_table (
id bigint,
data string
)
USING iceberg
""")
print("Table created successfully.")
spark.sql("""
INSERT INTO glueCatalog.default.demo_table VALUES (2, 'b')
""")
Out[5]:
Migration to Another Catalog¶
Get Metadata for source catalog¶
In [4]:
latest_file = spark.sql("""
SELECT file FROM glueCatalog.default.demo_table.metadata_log_entries
ORDER BY timestamp DESC
LIMIT 1
""").collect()[0]['file']
In [5]:
latest_file
Out[5]:
Write to target Catalog¶
In [6]:
spark.sql(f"""
CALL jdbcCatalog.system.register_table(
table => 'jdbcCatalog.default.demo_table',
metadata_file => '{latest_file}'
)
""").show()
Query Both the Catalog¶
In [7]:
spark.sql("""
SELECT * FROM jdbcCatalog.default.demo_table LIMIT 2
""").show()
In [8]:
spark.sql("""
SELECT * FROM glueCatalog.default.demo_table LIMIT 2
""").show()