Saturday, April 26, 2025

Let's Migrate Iceberg Tables from One Catalog to Another | Simple Hands-on Lab

gluecat

Lets Migrate iceberg Tables from one Catalog to Another Simple Hands on Labs

Spin up this container

services:
  metastore_db:
    image: postgres:11
    hostname: metastore_db
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: hive
      POSTGRES_PASSWORD: hive
      POSTGRES_DB: metastore
    command: ["postgres", "-c", "wal_level=logical"]
    healthcheck:
      test: ["CMD", "psql", "-U", "hive", "-c", "SELECT 1"]
      interval: 10s
      timeout: 5s
      retries: 5

Define Imports and packages

In [1]:
import os
import os
import sys
from pyspark.sql import SparkSession

os.environ['AWS_ACCESS_KEY_ID']=''
os.environ['AWS_SECRET_ACCESS_KEY']=""
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
os.environ["PYSPARK_PYTHON"] = sys.executable

SPARK_VERSION = '3.4'
ICEBERG_VERSION = '1.6.1'

SUBMIT_ARGS = (
    f"--packages "
    f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},"
    f"software.amazon.awssdk:bundle:2.20.160,"
    f"software.amazon.awssdk:url-connection-client:2.20.160,"
    f"org.postgresql:postgresql:42.7.2,"
    f"org.apache.hadoop:hadoop-aws:3.3.4 "
    f"pyspark-shell"
)
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

Define Spark Conf for Both the Catalog

In [9]:
conf = {
    "spark": {
        "spark.app.name": "iceberg_lab",
        "spark.jars.packages": ",".join([
            f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION}",
            "software.amazon.awssdk:bundle:2.20.160",
            "software.amazon.awssdk:url-connection-client:2.20.160",
            "org.postgresql:postgresql:42.7.2",
            "org.apache.hadoop:hadoop-aws:3.3.4"
        ]),
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        
        "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
        "spark.hadoop.fs.s3a.access.key": os.getenv("AWS_ACCESS_KEY_ID"),
        "fs.s3a.secret.key":os.getenv("AWS_SECRET_ACCESS_KEY"),
        "spark.hadoop.fs.s3a.endpoint": "s3.amazonaws.com",
        "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",

        
        "spark.sql.catalog.glueCatalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.glueCatalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.glueCatalog.warehouse": "s3a://soumil-dev-bucket-1995/warehouse/",
        
        
        "spark.sql.catalog.jdbcCatalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.jdbcCatalog.type": "jdbc",
        "spark.sql.catalog.jdbcCatalog.uri": "jdbc:postgresql://localhost:5432/metastore",
        "spark.sql.catalog.jdbcCatalog.jdbc.user": "hive",
        "spark.sql.catalog.jdbcCatalog.jdbc.password": "hive",
        "spark.sql.catalog.jdbcCatalog.warehouse": "s3a://soumil-dev-bucket-1995/warehouse1/"

        
    }
}

Create Spark Session

In [ ]:
def create_spark_session(spark_config):
    builder = SparkSession.builder
    for key, value in spark_config.items():
        builder = builder.config(key, value)
    return builder.getOrCreate()

spark = create_spark_session(spark_config=conf["spark"])
In [3]:
spark
Out[3]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
iceberg_lab

Lets create Table in Glue Catalog

In [5]:
# Create the Iceberg table (without specifying LOCATION)
spark.sql("""
CREATE TABLE IF NOT EXISTS glueCatalog.default.demo_table (
  id bigint,
  data string
)
USING iceberg
""")

print("Table created successfully.")

spark.sql("""
INSERT INTO glueCatalog.default.demo_table VALUES (2, 'b')
""")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Table created successfully.
                                                                                
Out[5]:
DataFrame[]

Migration to Another Catalog

Get Metadata for source catalog

In [4]:
latest_file = spark.sql("""
SELECT file FROM glueCatalog.default.demo_table.metadata_log_entries
ORDER BY timestamp DESC
LIMIT 1
""").collect()[0]['file']
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
In [5]:
latest_file
Out[5]:
's3a://soumil-dev-bucket-1995/warehouse/default.db/demo_table/metadata/00001-c4f232d6-83cc-420d-b3e8-af4c73bc64c3.metadata.json'

Write to target Catalog

In [6]:
spark.sql(f"""
CALL jdbcCatalog.system.register_table(
  table => 'jdbcCatalog.default.demo_table',
  metadata_file => '{latest_file}'
)
""").show()
25/04/26 10:35:38 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
25/04/26 10:35:38 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
+-------------------+-------------------+----------------------+
|current_snapshot_id|total_records_count|total_data_files_count|
+-------------------+-------------------+----------------------+
|7229686390025629118|                  1|                     1|
+-------------------+-------------------+----------------------+

Query Both the Catalog

In [7]:
spark.sql("""
SELECT * FROM jdbcCatalog.default.demo_table  LIMIT 2
""").show()
+---+----+
| id|data|
+---+----+
|  2|   b|
+---+----+

                                                                                
In [8]:
spark.sql("""
SELECT * FROM glueCatalog.default.demo_table  LIMIT 2
""").show()
+---+----+
| id|data|
+---+----+
|  2|   b|
+---+----+

Let's Migrate Iceberg Tables from One Catalog to Another | Simple Hands-on Lab

gluecat Lets Migrate iceberg Tables from one Catalog to Another Simple Hands on Lab...