Abstract
This short article will show how to create an Apache Iceberg Data Lake using Apache Spark. We'll then add SingleStoreDB to the mix.
The notebook file used in this article is available on GitHub.
Introduction
In enterprise environments, data from different systems must be used together. In this quick example, we'll see how we can use Apache Spark Dataframes as a way to connect Apache Iceberg and SingleStoreDB.
For production environments, please use a robust file system for your Data Lake.
Create a SingleStoreDB Cloud account
A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Iris Demo Group as our Workspace Group Name and iris-demo as our Workspace Name. We'll make a note of the password and store it in the secrets vault using the name password
.
Notebook
Let's now start to fill out our notebook.
Install Apache Spark
First, we'll need to install Apache Spark:
!conda install -y --quiet -c conda-forge openjdk=8
!pip install pyspark --quiet
Once the installation is complete, we'll prepare our SparkSession:
# Create a Spark session
spark = (SparkSession
.builder
.config("spark.jars", ",".join([destination for _, destination in jar_urls]))
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.appName("Spark Iceberg Test")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
Create a Spark Dataframe
Next, we'll create a Spark Dataframe from CSV data:
url = "https://gist.githubusercontent.com/VeryFatBoy/9af771d443f5ec4dd6eec8d69a062638/raw/c03ef25a97f23a48ee408ac02114195b663a2364/iris.csv"
pandas_df = pd.read_csv(url)
iris_df = spark.createDataFrame(pandas_df)
and we can view the data:
iris_df.show(5)
The output should be similar to the following:
+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width| species|
+------------+-----------+------------+-----------+-----------+
| 5.1| 3.5| 1.4| 0.2|Iris-setosa|
| 4.9| 3.0| 1.4| 0.2|Iris-setosa|
| 4.7| 3.2| 1.3| 0.2|Iris-setosa|
| 4.6| 3.1| 1.5| 0.2|Iris-setosa|
| 5.0| 3.6| 1.4| 0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows
Create Apache Iceberg Data Lake
First, we'll configure some settings:
spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
spark.conf.set("conf spark.sql.catalog.spark_catalog.type", "hive")
spark.conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.local.type", "hadoop")
spark.conf.set("spark.sql.catalog.local.warehouse", "warehouse")
Next, we'll create a temporary table from the Spark Dataframe:
iris_df.createOrReplaceTempView("tempview")
We'll drop the iris
table in our Data Lake if it already exits:
spark.sql("""
DROP TABLE IF EXISTS local.db.iris
""")
and then create our iris
table:
spark.sql("""
CREATE TABLE local.db.iris
USING iceberg
PARTITIONED BY (species)
AS ( SELECT *
FROM tempview )
""")
We are partitioning our data by flower species, of which there are three.
We can obtain more information about the table, as follows:
spark.sql("""
SELECT file_path, file_format, partition, record_count
FROM local.db.iris.files
""").show()
The output should be similar to the following:
+--------------------+-----------+-----------------+------------+
| file_path|file_format| partition|record_count|
+--------------------+-----------+-----------------+------------+
|warehouse/db/iris...| PARQUET| {Iris-setosa}| 50|
|warehouse/db/iris...| PARQUET|{Iris-versicolor}| 50|
|warehouse/db/iris...| PARQUET| {Iris-virginica}| 50|
+--------------------+-----------+-----------------+------------+
Let's now select a subset of the data in a new Dataframe:
df = spark.sql("""
SELECT *
FROM local.db.iris
WHERE species = 'Iris-virginica'
""")
Write Dataframe to SingleStoreDB
First, we'll provide connection details for SingleStoreDB:
from sqlalchemy import *
db_connection = create_engine(connection_url)
url = db_connection.url
password = get_secret("password")
host = url.host
port = url.port
cluster = host + ":" + str(port)
We'll now set some parameters for the SingleStore Spark Connector:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
When we are ready, we can save our Dataframe data into SingleStoreDB:
(df.write
.format("singlestore")
.option("loadDataCompression", "LZ4")
.mode("overwrite")
.save("iris_db.iris")
)
From SingleStoreDB Cloud, we can check that the iris
table was created, and we can query the data:
USE iris_db;
SELECT * FROM iris LIMIT 5;
Summary
Using Spark Dataframes, we can work directly with our Data Lake data in Apache Iceberg and database data in SingleStoreDB. In our example, we wrote data into SingleStoreDB, but we could also retrieve existing data from SingleStoreDB into a Spark Dataframe, and use the data to perform queries with data already stored in an Apache Iceberg Data Lake.
Top comments (0)