Sunday, June 30, 2024

Apache Hudi Using Spark SQL on AWS S3 | Insert | Update | Deletes | Stored Procedures on AWS using Glue Notebooks a Hands on Guide

sparksql (1)

Apache Hudi Using Spark SQL on AWS S3 | Insert | Update | Deletes | Stored Procedures on AWS using Glue Notebooks a Hands on Guide

AWS Glue Notebook please set following Configurations


%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
%%configure
{
    "--conf": "spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
    "--enable-glue-datacatalog": "true",
    "--datalake-formats": "hudi"
}
  • Since I am running an AWS Glue notebook locally using a Docker container, I will set the configurations below. If you are using a Glue notebook in an AWS environment, please set the above configurations.
In [ ]:
# 
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}

Spark SQL Commands

In [2]:
%%sql
show databases
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
0NonepysparkidleNone
SparkSession available as 'spark'.
In [3]:
%%sql
use  default;
In [4]:
%%sql
show tables;

Create Hudi Table

In [11]:
%%sql
SET hoodie.metadata.enable=true;
In [12]:
%%sql
SET hoodie.metadata.column.stats.enable=true;
In [13]:
%%sql
CREATE TABLE hudi_table (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING hudi
OPTIONS (
    primaryKey = 'uuid',
    preCombineField = 'ts',
    path 's3://soumilshah-dev-1995/hudi/table_name=hudi_table'
)
PARTITIONED BY (city);

Insert Items using Spark SQL

In [14]:
%%sql
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');

Selecting Items from Table

In [15]:
%%sql 
SELECT ts, fare, rider, driver, city FROM  hudi_table WHERE fare > 20.0;

Updating Items with Spark SQL

In [17]:
%%sql
UPDATE hudi_table SET fare = 25.0 WHERE rider = 'rider-D';
In [18]:
%%sql 
SELECT * FROM hudi_table WHERE rider = 'rider-D';

Deleting Items from table

In [19]:
%%sql
DELETE FROM hudi_table WHERE uuid = '3f3d9565-7261-40e6-9b39-b8aa784f95e2';

Stored procedures

Show commits

In [20]:
%%sql
call show_commits(table => 'hudi_table', limit => 10);

cleaner

In [21]:
%%sql
CALL run_clean(
  table => 'hudi_table',
  trigger_max_commits => 2,
  clean_policy => 'KEEP_LATEST_FILE_VERSIONS',
  file_versions_retained => 1
);

Save points

In [22]:
%%sql
call create_savepoint(table => 'hudi_table', commit_time => '20240630132252427');
In [23]:
%%sql
call show_savepoints(table => 'hudi_table');

Clustering

In [25]:
%%sql

CALL run_clustering(
  table => 'hudi_table',
  op => 'schedule',
  options => 'hoodie.clustering.plan.strategy.target.file.max.bytes=1024*1024*1024,hoodie.clustering.plan.strategy.max.bytes.per.group=2*1024*1024*1024'
);
In [26]:
%%sql

CALL run_clustering(
  table => 'hudi_table',
  op => 'execute',
  options => 'hoodie.clustering.plan.strategy.target.file.max.bytes=1024*1024*1024,hoodie.clustering.plan.strategy.max.bytes.per.group=2*1024*1024*1024'
);
In [27]:
%%sql

CALL run_clustering(
  table => 'hudi_table',
  op => 'execute',
  options => 'hoodie.clustering.plan.strategy.target.file.max.bytes=1024*1024*1024,hoodie.clustering.plan.strategy.max.bytes.per.group=2*1024*1024*1024'
);
In [29]:
%%sql
call show_clustering(table => 'hudi_table');

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

test-tble-bucket-joins Learn How to configure your Spark Session to Join Managed (S...