Abstract
Continuing our series on using Apache Spark with SingleStore, we'll use a simple example to demonstrate how we can write a Spark DataFrame to SingleStore and read the data back from SingleStore into a new Spark DataFrame, using the SingleStore Spark Connector.
The notebook file used in this article is available on GitHub.
Create a SingleStore Cloud account
A previous article showed the steps to create a free SingleStore Cloud account. We'll use the following settings:
- Workspace Group Name: Spark Demo Group
- Cloud Provider: AWS
- Region: US East 1 (N. Virginia)
- Workspace Name: spark-demo
- Size: S-00
We'll make a note of the password and store it in the secrets vault using the name password
.
Create a new notebook
From the left navigation pane in the cloud portal, we'll select DEVELOP > Data Studio.
In the top right of the web page, we'll select New Notebook > New Notebook, as shown in Figure 1.
We'll call the notebook spark_connector_demo, select a Blank notebook template from the available options, and save it in the Personal location.
Fill out the notebook
First, let's install Java:
!conda install -y --quiet -c conda-forge openjdk=8
Next, we'll create a directory to store some jar files:
os.makedirs("jars", exist_ok = True)
We'll now download some jar files, as follows:
def download_jar(url, destination):
response = requests.get(url)
with open(destination, "wb") as f:
f.write(response.content)
jar_urls = [
("https://repo1.maven.org/maven2/com/singlestore/singlestore-jdbc-client/1.2.4/singlestore-jdbc-client-1.2.4.jar", "jars/singlestore-jdbc-client-1.2.4.jar"),
("https://repo1.maven.org/maven2/com/singlestore/singlestore-spark-connector_2.12/4.1.8-spark-3.5.0/singlestore-spark-connector_2.12-4.1.8-spark-3.5.0.jar", "jars/singlestore-spark-connector_2.12-4.1.8-spark-3.5.0.jar"),
("https://repo1.maven.org/maven2/org/apache/commons/commons-dbcp2/2.12.0/commons-dbcp2-2.12.0.jar", "jars/commons-dbcp2-2.12.0.jar"),
("https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.12.0/commons-pool2-2.12.0.jar", "jars/commons-pool2-2.12.0.jar"),
("https://repo1.maven.org/maven2/io/spray/spray-json_3/1.3.6/spray-json_3-1.3.6.jar", "jars/spray-json_3-1.3.6.jar")
]
for url, destination in jar_urls:
download_jar(url, destination)
print("JAR files downloaded successfully")
These jar files include the SingleStore JDBC Client and the SingleStore Spark Connector, as well as several other jar files needed for connectivity and data management.
Now we are ready to create a SparkSession
:
# Create a Spark session
spark = (SparkSession
.builder
.config("spark.jars", ",".join([destination for _, destination in jar_urls]))
.appName("Spark Connector Test")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
Next, we'll create a simple DataFrame
:
# Create a DataFrame
data = [("Peter", 27), ("Paul", 28), ("Mary", 29)]
df = spark.createDataFrame(data, ["Name", "Age"])
Now we'll show the DataFrame
:
# Show the content of the DataFrame
df.show()
The output should be as follows:
+-----+---+
| Name|Age|
+-----+---+
|Peter| 27|
| Paul| 28|
| Mary| 29|
+-----+---+
A database is required, so we'll create one:
DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;
We'll now prepare the connection to SingleStore:
from sqlalchemy import *
db_connection = create_engine(connection_url)
url = db_connection.url
Now we'll create the Spark connection to SingleStore:
password = get_secret("password")
host = url.host
port = url.port
cluster = host + ":" + str(port)
We also need to set some configuration parameters:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
We'll now write the DataFrame
:
(df.write
.format("singlestore")
.option("loadDataCompression", "LZ4")
.mode("overwrite")
.save("spark_demo.demo")
)
In this case, the demo
table will be created for us.
Next, we'll read the data back into a new DataFrame
:
new_df = (spark.read
.format("singlestore")
.load("spark_demo.demo")
)
Now we'll show the new DataFrame
:
# Show the content of the DataFrame
new_df.show()
The output could be as follows:
+-----+---+
| Name|Age|
+-----+---+
| Paul| 28|
| Mary| 29|
|Peter| 27|
+-----+---+
Finally, we'll stop the SparkSession
:
# Stop the Spark session
spark.stop()
Summary
In this simple example, we saw how to configure Spark with the appropriate jar files, create a simple Spark DataFrame and write this to a SingleStore database. We then read the data back into a new DataFrame and confirmed that the data were the same. In the next article, we'll look at some additional features of the SingleStore Spark Connector.
Top comments (0)