DEV Community

Valery C. Briz
Valery C. Briz

Posted on

Bulk load to Elastic Search with PySpark

If you use Spark to transform your data and you are planning on load it directly from spark to Elastic Search then perhaps this short article is for you.

Not so long ago I was in the same situation and started reading the Elastic search documentation to build my PySpark job, but to be honest at the end of my reading I felt like I still had a lot of questions about it.

So thanks to the help of multiple questions answered in StackOverflow and other forums I was able to implement a PySpark job that actually works.

That is why I decided to write this small tutorial and to explain a little bit what are the options available.

If you want to check the full code then you can find it in this Gist.

Connection parameters

The base parameters we need to send in order to connect to Elastic.

df.write.format("org.elasticsearch.spark.sql") \
.option("es.nodes", <elastic_host>) \
.option("es.port", <elastic_port>) 
Enter fullscreen mode Exit fullscreen mode

If you are using a cloud environment like AWS or Google Cloud you would also need to add the following options.

.option("es.nodes.wan.only", "true") \
.option("es.nodes.discovery", "false")
Enter fullscreen mode Exit fullscreen mode

Authentication and SSL

This part will depend a little bit on the kind of configurations you currently have on your Elastic Search instance but these are the main options:

SSL enabled or disabled:

This option set as true well enable the ssl for the connection.

.option("es.net.ssl", "true") \
Enter fullscreen mode Exit fullscreen mode

Authentication by user name and password:

If you choose to use the authentication by user name and password then these two need to be added.

.option("es.net.http.auth.user", <user_name>) \
.option("es.net.http.auth.pass", <password>) \
Enter fullscreen mode Exit fullscreen mode

Authentication by ApiKey:

A better way to authenticate is to create an ApiKey and use the Bearer header as an option.

AUTH_TOKEN = f"ApiKey {API_KEY}"
.option("es.net.http.header.Authorization", AUTH_TOKEN) \
Enter fullscreen mode Exit fullscreen mode

Use of a SSL Certificate:

This one can be a little bit tricky since you need to load the .jks truststore file of your Elastic Search instance to all of the cluster nodes and then add the following options.

.option("es.net.ssl.truststore.location", <truststore_location>) \
.option("es.net.ssl.truststore.pass", <truststore_pass>) \
.option("es.net.ssl.cert.allow.self.signed", "true")
Enter fullscreen mode Exit fullscreen mode

Remember that this trustore_location most be accessible from your Spark application, otherwise it will fail.

Index resource options

WRITE_MODE = "append"

# If you want to create the index if it doesn't exists
.option("es.index.auto.index", "true") \
# This is the type of index loading ['update', 'index', 'create'...]
.option("es.write.operation", "index") \
# This is the actual name of the index
.option("es.resource", <elastic_index>) \
# Mapping for the _id field in Elastic
.option("es.mapping.id", "id") \
# Save all the empty values as null
.option("es.field.read.empty.as.null", "true") \
# The write mode can be 'append' or 'overwrite'
.mode(WRITE_MODE) \
Enter fullscreen mode Exit fullscreen mode

In general the configurations for the spark elastic loader are self explanatory, we just need to find the correct options for our specific case.

Hope this can help you!

Top comments (0)