Apache Hudi Using Spark SQL on AWS S3 | Insert | Update | Deletes | Stored Procedures on AWS using Glue Notebooks a Hands on Guide

AWS Glue Notebook please set following Configurations

%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
    "--conf": "spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
    "--enable-glue-datacatalog": "true",
    "--datalake-formats": "hudi"
  • Since I am running an AWS Glue notebook locally using a Docker container, I will set the configurations below. If you are using a Glue notebook in an AWS environment, please set the above configurations.
In [ ]:
%%configure -f
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Spark SQL Commands

In [2]:
show databases
In [3]:
use  default;
In [4]:
show tables;

Create Hudi Table

In [11]:
SET hoodie.metadata.enable=true;
In [12]:
SET hoodie.metadata.column.stats.enable=true;
In [13]:
CREATE TABLE hudi_table (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING hudi
    primaryKey = 'uuid',
    preCombineField = 'ts',
    path 's3://soumilshah-dev-1995/hudi/table_name=hudi_table'

Insert Items using Spark SQL

In [14]:
INSERT INTO hudi_table
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),

Selecting Items from Table

In [15]:
SELECT ts, fare, rider, driver, city FROM  hudi_table WHERE fare > 20.0;

Updating Items with Spark SQL

In [17]:
UPDATE hudi_table SET fare = 25.0 WHERE rider = 'rider-D';
In [18]:
SELECT * FROM hudi_table WHERE rider = 'rider-D';

Deleting Items from table

In [19]:
DELETE FROM hudi_table WHERE uuid = '3f3d9565-7261-40e6-9b39-b8aa784f95e2';

Stored procedures

Show commits

In [20]:
call show_commits(table => 'hudi_table', limit => 10);


In [21]:
CALL run_clean(
  table => 'hudi_table',
  trigger_max_commits => 2,
  clean_policy => 'KEEP_LATEST_FILE_VERSIONS',
  file_versions_retained => 1

Save points

In [22]:
call create_savepoint(table => 'hudi_table', commit_time => '20240630132252427');
In [23]:
call show_savepoints(table => 'hudi_table');


In [25]:

CALL run_clustering(
  table => 'hudi_table',
  op => 'schedule',
  options => '*1024*1024,*1024*1024*1024'
In [26]:

CALL run_clustering(
  table => 'hudi_table',
  op => 'execute',
  options => '*1024*1024,*1024*1024*1024'
In [27]:

CALL run_clustering(
  table => 'hudi_table',
  op => 'execute',
  options => '*1024*1024,*1024*1024*1024'
In [29]:
call show_clustering(table => 'hudi_table');

