Table of Content | Read Next: Part 7: Set up the Node.js server to retrieve API data and send the data to Elasticsearch
Resources
Would you rather watch a video to learn this content? Click on the link below!
Set up Elasticsearch for data transformation and data ingestion
In part 5, we figured out:
- a plan for transforming data before ingesting data into Elasticsearch
- a desired mapping for our data
In this blog, we will accomplish two tasks:
- create an
ingest pipelineto transform the retrieved data - create an index called
earthquakeswith the desired mapping
Before we get started, let’s talk about the data journey for our app.
We are building an app where users can search for earthquake data stored in Elasticsearch.
In part 7, we will set up our server to retrieve the data from the USGS API and send the data to Elasticsearch ingest pipeline.
Ingest pipeline is used for data transformation.
It consists of a series of configurable tasks called processors. Each processor performs a specialized task. For example, it can remove fields, extract values from text, enrich your data and etc.
Each processor runs in the order you set them up and they make specific changes to the incoming documents.
After the processors have run, Elasticsearch will add the transformed documents to the earthquake index we will create.
If you want to delve deeper into ingest pipelines, check out this documentation!
Set up Elasticsearch for data transformation and data ingestion
Step 1: Review the data transformation requirements
Our ingest pipeline will be used to transform the data retrieved from the USGS API.
Before we create an ingest pipeline, let's review what changes we want to make to the data.
- remove the unnecessary info from the retrieved data
- change the Unix epoch time in the field
timeto human readable timestamp - create fields
coordinates.latandcoordinates.lonas shown below
Step 2: Create an ingest pipeline
Ingest pipelines can be created and managed via Kibana's Ingest Pipelines feature or the ingest APIs.
We will be using Kibana to create this pipeline.
From the Kibana home page, click on the Stack Management option(red box).
From the Stack Management page, click on the Ingest Pipelines option(red box).
Click on the Create pipeline option(red box) and select the New pipeline option from the drop down menu(blue box).
Name your pipeline to whatever it makes sense to you.
For this project, I named mine earthquake_data_pipeline(red box).
Step 3: Add the desired processors to the pipeline
Click on the Add a processor option(red box).
You should see the following pop up menu.
Task 1: Remove the fields that we do not need from the retrieved data
Here is an example of an earthquake object from the USGS earthquake API:
{
"type":"Feature",
"properties":{
"mag":1.13,
"place":"11km ENE of Coachella, CA",
"time":1650316843970,
"updated":1650317059011,
"tz":null,
"url":"https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
"detail":"https://earthquake.usgs.gov/earthquakes/feed/v1.0/detail/ci40240408.geojson",
"felt":null,
"cdi":null,
"mmi":null,
"alert":null,
"status":"automatic",
"tsunami":0,
"sig":20,
"net":"ci",
"code":"40240408",
"ids":",ci40240408,",
"sources":",ci,",
"types":",nearby-cities,origin,phase-data,scitech-link,",
"nst":37,
"dmin":0.07687,
"rms":0.26,
"gap":48,
"magType":"ml",
"type":"earthquake",
"title":"M 1.1 - 11km ENE of Coachella, CA"
},
"geometry":{
"type":"Point",
"coordinates":[
-116.0736667,
33.7276667,
2.09
]
},
"id":"ci40240408"
}
The following is a sample document with the desired fields we want to store in Elasticsearch.
{
"mag": 1.13,
"place": "11km ENE of Coachella, CA",
"time": 2022-05-02T20:07:53.266Z,
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
"sig": 20,
"type": "earthquake",
"depth": 2.09,
"coordinates": {
"lat": 33.7276667,
"lon": -116.0736667
}
}
As you can see, API earthquake object has additional info we do not need.
By comparing the two, we can identify the fields that we want to remove from the retrieved data.
Here is the list of fields that we do not need.
updatedtzdetailfeltcdimmialertstatustsunaminetcodeidssourcestypesnstdminrmsgapmagTypetitle
To remove these fields, we can use the Remove processor.
Under the processor section, type Remove in the search bar. Click on the Remove processor(red box).
Elasticsearch will display the following menu. In the Fields section(red box), we will add the names of the fields that we wish to remove.
Type in the name of the field you wish to remove then press enter. You will see that the field name has been added.
In the same box, repeat the same process to specify the names of all the fields we wish to remove.
Your screen should look like the image below(red box).
Activate the Ignore missing option(red box).
Click on the Add button(red box).
You will see that the Remove processor has been added to the earthquake_data_pipeline(red box).
Task 2: Change the Unix epoch time in the field time to human readable timestamp
In order to make this change, you should use the date processor.
The date processor converts time from one format to another.
Click on the Add a processor option(red box).
Under the Processor section, type in Date and click on the Date option from the drop down menu(red box).
You should see the following drop down menu.
In the Field section(red box), type the name of the field we wish to convert (time).
In the Formats section(red box), we will specify the desired date formats. The format shown in the results card is called UNIX_MS.
Type it into this section and hit enter. Then, click on the Add button(green box) to add the date processor to the ingest pipeline.
You will see that the Date processor has been added to the earthquake_data_pipeline(red box).
When the data goes through the date processor, the content of the field time will be converted to the UNIX_MS format then stored in a new field called @timestamp.
After this process is finished, we do not need the original field time. Therefore, we will remove the field time after the data goes through the date processor.
From the Create pipeline page, click on the Add a processor option(red box).
Under the Processor section, type in Remove and hit enter(red box).
Under the Fields section, type in time and hit enter(red box). Activate the ignore missing option(blue box). Then, click on the Add button(green box).
You will see that the Remove processor for the field time has been added(red box).
Task 3: Create fields called coordinates.lat and coordinates.lon
From the same page, click on the Add a processor option(red box).
Under the processor section, type in Rename and hit enter(red box).
Under the Field section, type in latitude(red box). Under the Target field section, type in coordinates.lat(blue box).
This step will rename the field latitude in the incoming data to coordinates.lat.
Activate the Ignore missing option(yellow box) then click on the Add button(green box) to add this processor to the earthquake_data_pipeline.
You will see that the Rename processor for the field latitude has been added(red box).
Next, we will repeat the same process to add a Rename processor to rename the field longitude from the incoming data to coordinates.lon.
This process is identical to the steps you have performed for the field latitude.
Once this process is done, you will see that a Rename processor for the field longitude has been added to the earthquake_data_pipeline(red box).
We have added all the necessary processors to transform our data.
Before creating the earthquake_data_pipeline, make sure the order of the processors are listed in the order you want them to run.
These processors run sequentially!
Why does the order matter?
Let's say you accidentally reversed the order of date processor for the field time with the Remove processor for the field time.
When the data goes through the ingest pipeline, the data will go through the remove processor before the date processor. As a result, the field time will be removed before it gets to the date processor.
The date processor will have nothing to work with!
This is why we double check the order of the processors before creating the ingest_pipeline!
Next, we will create the earthquake_data_pipeline by clicking on the Create pipeline button(blue box).
You will see that the earthquake_data_pipeline(red box) has been created. If you scroll down on the Processors section(blue box), you will see the list of processors we have added to this pipeline.
Step 4: Create an index called earthquakes with the desired mapping
We will accomplish this step using Kibana Dev Tools.
Click on the menu icon(red box).
Scroll down on the drop down menu and click on Dev Tools(red box).
You should see Dev Tools(also known as Kibana console) on the screen.
In the left panel of the Kibana console, copy and paste the following.
PUT earthquakes
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"coordinates": {
"type": "geo_point"
},
"depth": {
"type": "float"
},
"mag": {
"type": "float"
},
"place": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"sig": {
"type": "short"
},
"type": {
"type": "keyword"
},
"url": {
"enabled": false
}
}
}
}
Your Kibana console should look the following:
Click on the green arrow to send the request.
Expected output from Elasticsearch:
Elasticsearch will create an index called earthquakes with the desired mapping we defined above!
Summary
In this blog, we have created:
- an
ingest pipeline(earthquake_data_pipeline) to transform the retrieved data from the USGS API - an index called
earthquakeswith the desired mapping
In the next blog, we will set up the server to retrieve earthquake data from the USGS API and send the data to the earthquake_data_pipeline in Elasticsearch.
Once the data transformation is complete, the transformed data will be ingested into the earthquakes index.
Move on to Part 7 to set up the Node.js server to retrieve API data and send the data to Elasticsearch for data transformation and data ingestion!








































Top comments (1)
If you don't fancy typing the entire pipeline or mapping, you can paste mine - I considered creating a PR for the repositories, but I noticed the author has moved from Elastic so I decided this to be the best alternative.
Pipeline I have used:
[
{
"remove": {
"field": [
"updated",
"tz",
"detail",
"felt",
"cdi",
"mmi",
"alert",
"status",
"tsunami",
"net",
"code",
"ids",
"sources",
"types",
"nst",
"dmin",
"rms",
"gap",
"magType",
"title"
],
"ignore_missing": true
}
},
{
"date": {
"field": "time",
"formats": [
"UNIX_MS"
]
}
},
{
"remove": {
"field": "time",
"ignore_missing": true
}
},
{
"rename": {
"field": "latitude",
"target_field": "coordinates.lat",
"ignore_missing": true
}
},
{
"rename": {
"field": "longitude",
"target_field": "coordinates.lon",
"ignore_missing": true
}
}
]
This is the mapping I used:
PUT earthquakes{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"coordinates": {
"type": "geo_point"
},
"depth": {
"type": "float"
},
"mag": {
"type": "float"
},
"place": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"sig": {
"type": "short"
},
"type": {
"type": "keyword"
},
"url": {
"enabled": false
}
}
}
}