Install LakeFS¶
pip install lakefs
-----
python -m lakefs.quickstart
Step 1 : Define Imports¶
In [1]:
import os
import sys
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SPARK_VERSION = '3.4'
ICEBERG_VERSION = '1.3.0'
LAKEFS_ICEBERG_VERSION = '0.1.2'
SUBMIT_ARGS = (
"--packages "
f"org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION},"
"com.amazonaws:aws-java-sdk-bundle:1.12.661,"
"org.apache.hadoop:hadoop-aws:3.3.4,"
f"io.lakefs:lakefs-iceberg:v{LAKEFS_ICEBERG_VERSION} "
"pyspark-shell"
)
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# lakeFS quickstart credentials
lakefs_access_key = "XX"
lakefs_secret_key = "X"
lakefs_endpoint = "http://127.0.0.1:8000"
repo_name = "learn-demo"
Create Spark Session¶
In [2]:
spark = SparkSession.builder \
.appName("Iceberg-lakeFS-Example") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") \
.config("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") \
.config("spark.sql.catalog.lakefs.uri", lakefs_endpoint) \
.config("spark.sql.defaultCatalog", "lakefs") \
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", lakefs_endpoint) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.access.key", lakefs_access_key) \
.config("spark.hadoop.fs.s3a.secret.key", lakefs_secret_key) \
.getOrCreate()
In [3]:
spark
Out[3]:
Write Data to Main Branch¶
In [6]:
from pyspark.sql import Row
# Create a sample DataFrame
data = [Row(id=1, data="foo"), Row(id=2, data="bar")]
df = spark.createDataFrame(data)
df.show()
df.writeTo("main.sampledb.my_table").using("iceberg").createOrReplace()
In [7]:
spark.sql("select * from main.sampledb.my_table").show()
Commit to Main Branch¶
In [8]:
! lakectl commit lakefs://demo/main -m "Add foo and bar rows to my_table"
Lets Create New branch¶
In [12]:
! lakectl branch create lakefs://learn-demo/dev -s lakefs://learn-demo/main
In [9]:
#spark.sql("select * from dev.sampledb.my_table").show()
In [10]:
from pyspark.sql import Row
# Create a sample DataFrame
data = [Row(id=3, data="foo***"), Row(id=4, data="bar***")]
df = spark.createDataFrame(data)
df.show()
df.writeTo("dev.sampledb.my_table").append()
In [11]:
spark.sql("select * from dev.sampledb.my_table order by id asc").show()
In [16]:
! lakectl commit lakefs://learn-demo/dev -m "Add foo*** and bar*** rows to my_table"
Lets See both Branch¶
In [12]:
spark.sql("select * from main.sampledb.my_table").show()
spark.sql("select * from dev.sampledb.my_table").show()
Create MR¶
In [15]:
! lakectl merge lakefs://learn-demo/dev lakefs://learn-demomain
In [13]:
spark.sql("select * from main.sampledb.my_table").show()
spark.sql("select * from dev.sampledb.my_table").show()
Delete Dev Branch¶
In [14]:
! lakectl branch delete lakefs://learn-demo/dev -y