I got an opportunity to host a session with elastic community which explains the integration of Elasticsearch with Apache Spark, focusing on how to write Spark data frames to Elasticsearch and perform searches using specific columns from Spark data frames. I have also dived into a use case I worked on for creating a query template with 'should' and 'must' clauses to search for specific column values from Elasticsearch. So, this blog runs through the steps on how to achieve the same.
Lets start with a recap on Spark
- Spark is a big data processing framework which addresses the inefficiencies of Hadoop - 100x Faster Than Hadoop
- Supports batch and real-time data processing.
- Supports High-Level APIs in Java, Scala, Python, and R
- Horizontally scalable architecture and fault tolerant
- Interactive Queries with Spark SQL
Execution Flow of a Spark Application
- App Submission: The user submits a Spark application to a cluster manager.
- Job Creation and DAG (Directed Acyclic Graph) Creation: The application is divided into jobs, which are represented as a DAG for execution planning.
- Stage Division and Task Scheduling: Jobs are divided into stages, and tasks are scheduled for execution on available workers.
- Task Execution on Worker: Tasks are executed on worker nodes, processing data in parallel.
- Result back to the driver: After execution, results are sent back to the driver program for further action.
Throughout this process, Spark uses several optimizations
- Lazy Evaluation: Spark delays execution until an action is called to optimize the processing plan.
- Data Locality: Tasks are scheduled to run on nodes where data resides to minimize data transfer time.
- In-memory Computing: Spark processes data in memory to speed up data access and computation.
- Speculative Execution: Tasks that are running slowly may be re-executed on other nodes to improve overall performance.
Let's continue with a recap on Elasticsearch
- Elasticsearch is a powerful search engine designed for quickly finding and analyzing large volumes of data.
- It stores data in a structured way using JSON format, making it easy to index and search.
- It provides full-text search capabilities, allowing users to perform complex searches efficiently.
- Built to scale easily by adding more servers, ensuring it can handle increased loads.
- Uses RESTful APIs for easy integration with applications.
How Elasticsearch Works
- Indexing Data: Data is stored as documents in indexes for fast retrieval.
- Querying Data: Users send search requests to Elasticsearch via APIs.
- Processing Queries: Elasticsearch quickly finds the relevant documents based on the search request.
- Returning Results: The search results are sent back to the user in JSON format.
Key Features of Elasticsearch
- Real-Time Search: Search results are updated almost instantly after data is added.
- Scalable: Can grow by adding more servers to handle more data and users.
- Easy to Use: Integrates well with various programming languages and tools
Why Integrate Spark with Elasticsearch?
- Combine Spark’s data processing capabilities with Elasticsearch’s powerful search and indexing features.
- Enables real-time querying, full-text search, and aggregation on large datasets processed by Spark.
- Spark provides fast iterative/functional-like capabilities over large data sets, typically by caching data in memory.
ES-Spark support
- Now, since Spark 2.1, Spark has included native ElasticSearch support, which they call Elasticsearch Hadoop.
- These connectors means you can run analytics against ElasticSearch data.
- ElasticSearch, by itself only supports Lucene Queries, meaning natural language queries. So you could write predictive and classification models to flag cybersecurity events or do other analysis, something that ES does not do by itself.
Finally, The Steps to setup the environment
- Install Spark on windows (https://spark.apache.org/downloads.html).
- Download spark version bundled with Hadoop binaries else download winutils.exe separately. (commonly used link for winutils: https://github.com/steveloughran/winutils)
Set the below environmental variables,
SPARK_HOME = E:<PATH_TO_SPARK_HDP_DIR>\spark-hdp
HADOOP_HOME = E:<PATH_TO_HDP_DIR>\hadoop\hdp
PATH = E:<>\hadoop\hdp\binInstall Python and add to PATH variable. (I have 3.9.11 version)
Path = Programs\Python\Python39\ ,
Path = Programs\Python\Python39\Scripts\Download ES-Hadoop Connector jar. (Download link: https://www.elastic.co/downloads/hadoop)
Check your spark version is compatible with your ES-Hadoop Connector (If it is >2, it should be fine.)
elasticsearch-hadoop supports Spark SQL 2.x and Spark SQL 3.x.Check your java version that is used by spark. It should be at least version 1.8.
(https://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html)Unzip the jar file and place it an directory
Set the specific configurations needed to connect to ES from Spark.
Example configs:
("es.nodes.wan.only", “true") # When set to true, Spark treats the listed es.nodes as standalone and avoids node discovery.
("es.nodes", "myesclient:9200") # you can pass multiple machines using comma(,) inside one single string("es1:9200,es2:9200,es3:9200")
("es.net.http.auth.user", "spark") # authorized user to read indexes. If you dont have any auth mechanism. You don't need this.
("es.net.http.auth.pass", "youruserpassword") # users password
Below command to be used to run Spark job with ES jar,
$SPARK_HOME/bin/spark-shell --packages org.elasticsearch:elasticsearch-hadoop:7.10.1Add the connector to your Spark application by adding the following line if you are building through build.sbt file:
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "7.10.1“Add the following line in pom.xml file, if you are building through maven ,
Next comes the code
The code changes for a basic example is available in the below gitHub,
https://github.com/AbinayaRam19/data-nuggets/tree/pyspark
Once the code is created, setup a virtual env in Python following the traditional steps below and run the spark-submit to see the data in Kibana.[P.S., Setting up Elasticsearch with Kibana is needed as a pre-requisite. I'm mostly focussing on establishing the connection from Spark and the required libraries for the same]
We can follow the steps below from the IDE terminal. I'm using PyCharm.
1) python -m venv myenv
2) myenv/Scripts/activate
3) spark-submit --jars E:<>\jars\elasticsearch-spark-30_2.12-7.12.1.jar,E:<>\jars\commons-httpclient-3.0.jar /SparkWithES.py
[Note: HTTPClient jar is needed here when Spark connects to Elasticsearch because Elasticsearch communicates over HTTP or HTTPS for its RESTful APIs. Without the httpclient JAR, Spark won't be able to execute HTTP operations to push or query data from Elasticsearch, which would result in connection failures.]
Link for the live session for detailed explanation., (Would recommend not to watch it though :P)
{https://www.youtube.com/watch?v=J6FIyhXRGXg}
Please reach out to me in case of any concerns or corrections!!
Thank you !!
Top comments (0)