DEV Community

Akmal Chaudhri for SingleStore

Posted on • Updated on

Quick tip: Adding SingleStoreDB to an Apache Iceberg Data Lake

Abstract

This short article will show how to create an Apache Iceberg Data Lake using Apache Spark. We'll then add SingleStoreDB to the mix. We'll use Deepnote as our development environment.

Introduction

In enterprise environments, data from different systems must be used together. In this quick example, we'll see how we can use Apache Spark Dataframes as a way to connect Apache Iceberg and SingleStoreDB.

Create a Deepnote account

We'll create a free account on the Deepnote website. Once logged in, we'll create a new Deepnote project to give us a new notebook. We'll also need to create three folders (jars, data and warehouse).

In the jars folder, we'll store the following files:

In the data folder, we'll store a CSV file containing the Iris flower data set.

The warehouse folder will be used to store our Apache Iceberg Data Lake.

Create a SingleStoreDB Cloud account

A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Iris Demo Group as our Workspace Group Name and iris-demo as our Workspace Name. We'll make a note of our password and host name. Finally, we'll create a new database using the SQL Editor:

CREATE DATABASE iris_demo;
Enter fullscreen mode Exit fullscreen mode

Deepnote notebook

Let's now start to fill out our notebook.

Install Apache Spark

First, we'll need to install Apache Spark:

! sudo apt-get update
! sudo mkdir -p /usr/share/man/man1
! sudo apt-get install -y openjdk-11-jdk
! pip install pyspark==3.2.1
Enter fullscreen mode Exit fullscreen mode

Once the installation is complete, we'll prepare our SparkSession:

from pyspark.sql import SparkSession

spark = (SparkSession
            .builder
            .config("spark.jars",
                "jars/singlestore-jdbc-client-1.0.1.jar, \
                jars/singlestore-spark-connector_2.12-4.0.0-spark-3.2.0.jar, \
                jars/spray-json_3-1.3.6.jar, \
                jars/iceberg-spark-runtime-3.2_2.12-0.14.1.jar"
            )
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
            )
            .getOrCreate()
        )
Enter fullscreen mode Exit fullscreen mode

We can check the version of Spark as follows:

spark.version
Enter fullscreen mode Exit fullscreen mode

The output should be:

'3.2.1'
Enter fullscreen mode Exit fullscreen mode

Create a Spark Dataframe

Next, we'll create a Spark Dataframe from our CSV data:

iris_df = spark.read.csv(
                    "data/iris.csv",
                    header = True,
                    inferSchema = True
                )
Enter fullscreen mode Exit fullscreen mode

and we can view the data:

iris_df.show(5)
Enter fullscreen mode Exit fullscreen mode

The output should be similar to the following:

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows
Enter fullscreen mode Exit fullscreen mode

Create Apache Iceberg Data Lake

First, we'll configure some settings:

spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
spark.conf.set("conf spark.sql.catalog.spark_catalog.type", "hive")
spark.conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.local.type", "hadoop")
spark.conf.set("spark.sql.catalog.local.warehouse", "warehouse")
Enter fullscreen mode Exit fullscreen mode

Next, we'll create a temporary table from the Spark Dataframe:

iris_df.createOrReplaceTempView("tempview")
Enter fullscreen mode Exit fullscreen mode

We'll drop the iris table in our Data Lake if it already exits:

spark.sql("""
    DROP TABLE IF EXISTS local.db.iris
""")
Enter fullscreen mode Exit fullscreen mode

and then create our iris table:

spark.sql("""
    CREATE TABLE local.db.iris
    USING iceberg
    PARTITIONED BY (species)
    AS ( SELECT *
         FROM tempview )
""")
Enter fullscreen mode Exit fullscreen mode

We are partitioning our data by flower species, of which there are three.

We can obtain more information about the table, as follows:

spark.sql("""
    SELECT file_path, file_format, partition, record_count
    FROM local.db.iris.files
""").show()
Enter fullscreen mode Exit fullscreen mode

The output should be similar to the following:

+--------------------+-----------+-----------------+------------+
|           file_path|file_format|        partition|record_count|
+--------------------+-----------+-----------------+------------+
|warehouse/db/iris...|    PARQUET|    {Iris-setosa}|          50|
|warehouse/db/iris...|    PARQUET|{Iris-versicolor}|          50|
|warehouse/db/iris...|    PARQUET| {Iris-virginica}|          50|
+--------------------+-----------+-----------------+------------+
Enter fullscreen mode Exit fullscreen mode

Let's now select a subset of the data in a new Dataframe:

df = spark.sql("""
        SELECT *
        FROM local.db.iris
        WHERE species = 'Iris-virginica'
""")
Enter fullscreen mode Exit fullscreen mode

Write Dataframe to SingleStoreDB

First, we'll provide connection details for SingleStoreDB:

host = "<TO DO>"
password = "<TO DO>"

port = "3306"
cluster = host + ":" + port
Enter fullscreen mode Exit fullscreen mode

We'll replace the <TO DO> for host and password with the values from our SingleStoreDB Cloud account.

We'll now set some parameters for the SingleStore Spark Connector:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
Enter fullscreen mode Exit fullscreen mode

When we are ready, we can save our Dataframe data into SingleStoreDB:

(df.write
   .format("singlestore")
   .option("loadDataCompression", "LZ4")
   .mode("overwrite")
   .save("iris_demo.iris")
)
Enter fullscreen mode Exit fullscreen mode

From SingleStoreDB Cloud, we can check that the iris table was created, and we can query the data:

USE iris_demo;

SELECT * FROM iris LIMIT 5;
Enter fullscreen mode Exit fullscreen mode

Summary

Using Spark Dataframes, we can work directly with our Data Lake data in Apache Iceberg and database data in SingleStoreDB. In our example, we wrote data into SingleStoreDB, but we could also retrieve existing data from SingleStoreDB into a Spark Dataframe, and use the data to perform queries with data already stored in an Apache Iceberg Data Lake.

Top comments (0)