DEV Community

Kiran Gutha
Kiran Gutha

Posted on

Building a Cluster Using Elasticsearch, Kibana, Zookeeper, Kafka and Rsyslog

Here in this tutorial you will learn about how to build a cluster by using elasticsearch, kibana, zookeeper, kafka and rsyslog. Developers can face many difficult situation when building a cluster, here we clearly explained step by step procedure to create a cluster.

Environmental difficulties:
  1. The developer cannot log in to the server
  2. Each system has a log, the log data is scattered and difficult to find
  3 large amount of log data, the query is busy, can not be real-time

Environmental requirements:
  1. The log needs to be standardized

Required software for building this cluster:

The above software can be downloaded from the official website:

Deployment steps:

  1. ES cluster installation and configuration
  2. Rsyslog client configuration
  3. Kafka (zookeeper) cluster configuration
  4. Kibana deployment 5. Case: nginx log collection and message log collection
  5. Kibana report basic use

Pre-deployment operations:

Close the firewall and close selinux (production environment is turned off or on as needed)
Synchronize server time, select public network ntpd server or self-built ntpd server

[root@es1 ~]# crontab -l # To facilitate the direct use of the public network server

update time

*/5 * * * * /usr/bin/rdate -s &>/dev/null

1. ES cluster installation and configuration

1 Install jvm dependent environment

The elk runtime needs the jvm environment. The 2.x version needs the oracale JDK 1.7 or open-jdk1.7.0 version. The 5.X version requires the oracale JDK 1.8 or the open-jdk1.8.0 version. The multi-node JDK version ensures that it always contains minor versions. No., otherwise it may be an error when joining the cluster, which is why the younger brother did not use yum to install the JDK.

[root@es1 ~]# rpm -ivh jdk-8u25- x64.rpm # Because 5.X version requires 1.8, in order to upgrade later to install 1.8 directly
Preparing... ########################################### [100%]
1:jdk1.8.0_131 ########################################### [100%]

Setting up the Java environment

[root@es1 ~]# cat /etc/profile.d/ #Edit the Java environment configuration file
export JAVA_HOME=/usr/java/latest
export CLASSPATH=$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
[root@es1 ~]# . /etc/profile.d/

[root@es1 ~]# java - version #Confirm Configuration
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

2. Install and configure elasticsearch

[root@es1 ~]# tar xf elasticsearch-2.1.0.tar.gz -C /usr/local/
[root@es1 ~]# cd /usr/local/
[root@es1 local]# ln -sv elasticsearch-2.1.0 elasticsearch
"elasticsearch" -> "elasticsearch-2.1.0"
[root@es1 local]# cd elasticsearch
[root @ es1 elasticsearch] # vim config / elasticsearch.yml
[root@es1 elasticsearch]# grep "^[a-Z]" config/elasticsearch.yml pwb - cluster #Cluster name, which must be configured in the same cluster pwb - node1 # Cluster node name, unique within the cluster /Data/es/ data #data directory
Path.logs: /Data/es/ logs #log directory
bootstrap.mlockall: true
http.port: 9200 ["", ""]
discovery.zen.minimum_master_nodes: 1

3. Create related directories

[root@es1 elasticsearch]# mkdir -pv /Data/es/{data,logs}
Mkdir: created directory " /Data "
mkdir: created directory " /Data/es "
mkdir: created directory " /Data/es/data "
mkdir: created directory " /Data/es/logs

4. Elasticsearch does not allow root startup for security reasons. Create a new user and use this user for related operations.

Create a normal user and suggest adding the appropriate sudo permissions
[root@es1 elasticsearch]# useradd elasticsearch
[root@es1 elasticsearch]# chown -R elasticsearch:elasticsearch /Data/es/
[root@es1 elasticsearch]# chown -R elasticsearch:elasticsearch /usr/local/elasticsearch-2.1.0/

5. Configure other environment parameters (must, otherwise it will start after the error)

[root@es1 elasticsearch]# echo "elasticsearch hard nofile 65536" >> /etc/security/limits.conf
[root@es1 elasticsearch]# echo "elasticsearch soft nofile 65536" >> /etc/security/limits.conf
[root@es1 elasticsearch]# sed -i 's/1024/2048/g' /etc/security/limits.d/90-nproc.conf
[root@es1 elasticsearch]# echo "vm.max_map_count=262144 " >> /etc/sysctl.conf
[root@es1 elasticsearch]# sysctl -p

[root@es1 elasticsearch]# grep " ES_HEAP_SIZE= " " bin/ elasticsearch # Set elasticsearch memory size, in principle, the bigger the better, but do not exceed 32G
Export ES_HEAP_SIZE =100m # The test environment has limited memory

The configuration of other nodes is the same as that of the other nodes. 10.1 . 1.243 # Local IP Address pwb -node1 # The assigned node name

6. Start elasticsearch

[root@es1 elasticsearch]# su - elasticsearch
[elasticsearch@es1 ~]$ cd /usr/local/elasticsearch
[elasticsearch@es1 elasticsearch]$ bin/elasticsearch&

Through the output you can see the service startup and add other nodes in the cluster through auto discovery.

Check if the service is normal

[root@es1 ~]# netstat -tnlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0* LISTEN 944/sshd
Tcp 0 0 ::ffff: 10.1 . 1.243 : 9300 :::* LISTEN 3722 / java Communication between # ES nodes
tcp 0 0 :::22 :::* LISTEN 944/sshd
Tcp 0 0 ::ffff: 10.1 . 1.243 : 9200 :::* LISTEN 3722 / java # ES node and external communication use
[root@es1 ~]# curl http: // # If the following message appears, the installation and configuration is successful
" name " : " pwb-node1 " ,
" cluster_name " : " pwb-cluster " ,
" version " : {
" number " : " 2.1.0 " ,
" build_hash " : " 72cd1f1a3eee09505e036106146dc1949dc5dc87 " ,
"build_timestamp" : "2015-11-18T22:40:03Z",
"build_snapshot" : false,
"lucene_version" : "5.3.1"
"tagline" : "You Know, for Search"

7. Install elasticsearch common plug-ins

[root@es1 ~]# /usr/local/elasticsearch/bin/plugin install mobz/elasticsearch-head

After the installation is complete, visit the URL Since there is no data in the cluster for the moment, the display is empty (the five-pointed star indicates the master node, and the dot indicates the data node).

Other commonly used plug-in installation methods (not demonstrated here, are interested in their own installation)

./bin/plugin install lukas-vlcek/bigdesk # 2 .0 version change commands
. /bin/plugin install hlstudio/bigdesk # 2 .0 or later Use this command to install
, / bin / plugin install lmenezes / elasticsearch-head / version

2. Logstash cluster installation and configuration

Logstash needs to rely on the java environment, so here we still need to install the JVM, this step is omitted

1. Install logstash

[root@logstash1 ~]# tar xf logstash-2.0.0.tar.gz -C /usr/local/
[root@logstash1 ~]# cd /usr/local/
[root@logstash1 local]# ln -sv logstash-2.0.0 logstash
"logstash" -> "logstash-2.0.0"
[root@logstash1 local]# cd logstash
[root@logstash1 logstash]# grep "LS_HEAP_SIZE" bin/
LS_HEAP_SIZE = " ${LS_HEAP_SIZE:=100m} " # Set memory size to use

2. Test logstash writes logs to elasticsearch, taking the system messages file as an example

(1) Write a logstash configuration file

[root@logstash1 logstash]# cat conf/messages.conf
input {
File { # data input using input file plugin, read from messages file
path => "/var/log/messages"

output {
Elasticsearch { # data output points to ES cluster
Hosts => [ " " , " " ] # ES Node Host IP and Port
[root@logstash1 logstash]# /usr/local/logstash/bin/logstash -f conf/messages.conf --configtest --verbose
Configuration OK
[root@logstash1 logstash]# /usr/local/logstash/bin/logstash -f conf/messages.conf
Default settings used: Filter workers: 1
Logstash startup completed

(2) Write some files to message, we install some software

[root@logstash1 log]# yum install httpd -y

Check the changes in the messages file

[root@logstash1 log]# tail /var/log/messages
Oct 24 13:44:25 localhost kernel: ata2.00: configured for UDMA/33
Oct 24 13:44:25 localhost kernel: ata2: EH complete
Oct 24 13:49:34 localhost rz[3229]: [root] logstash-2.0.0.tar.gz/ZMODEM: error: zgethdr returned 16
Oct 24 13:49:34 localhost rz[3229]: [root] logstash-2.0.0.tar.gz/ZMODEM: error
Oct 24 13:49:34 localhost rz[3229]: [root] got error
Nov 8 22:21:25 localhost rz[3245]: [root] logstash-2.0.0.tar.gz/ZMODEM: 80604867 Bytes, 2501800 BPS
Nov 8 22:24:27 localhost rz[3248]: [root] jdk-8u25-x64.rpm/ZMODEM: 169983496 Bytes, 1830344 BPS
Nov 8 22:50:49 localhost yum[3697]: Installed: apr-util-ldap-1.3.9-3.el6_0.1.x86_64
Nov 8 22:50:50 localhost yum[3697]: Installed: httpd-tools-2.2.15-60.el6.centos.6.x86_64
Nov 8 22:51:07 localhost yum[3697]: Installed: httpd-2.2.15-60.el6.centos.6.x86_64

Visit web page for elasticsearch head plugin. It has been seen that logstash can be normally written to the elasticsearch cluster and the logstash configuration is completed (the other nodes are configured the same).

3. Kafka+Zookeeper cluster installation and configuration

When building a kafka cluster, you need to install the zookeeper cluster in advance. You can install it separately or you can use the kafka to install the zookeeper program. Choose the zookeeper program that comes with kafka.

1. Extract the installation package

[root@kafka1 ~]# tar xf kafka_2.11- -C /usr/local/
[root@kafka1 ~]# cd /usr/local/
[root@kafka1 local]# ln -sv kafka_2.11- kafka
"kafka" -> "kafka_2.11-"
[root@kafka1 local]# cd kafka

2. Configure the zookeeper cluster and modify the configuration file

[root@kafka1 kafka]# grep "^[a-Z]" config/


tickTime: This time is used as the interval between heartbeats maintained by ZooKeeper servers or between the client and the server. That is, a heartbeat is sent every tickTime.

Port 2888: indicates the port where this server exchanges information with the leader server in the cluster.

port: indicates that if the leader server in the cluster is hung up, a port is required to re-election and a new leader is selected. This port is the port through which the servers communicate with each other during the election.

3. Create directory and myid file needed by zookeeper

[root@kafka1 kafka]# mkdir -pv /Data/zookeeper
Mkdir: created directory " /Data "
mkdir: created directory " /Data/zookeeper "
[root@kafka1 kafka]# echo " 1 " > /Data/zookeeper/myid # myid file, the contents of which are numbers for Identify the host, if this file does not, zookeeper can not start

4. Kafka configuration

[root@kafka1 kafka]# grep "^[a-Z]" config/ = 1 # unique
in the Listeners = PLAINTEXT: // # server IP address and port = 3 = 8
socket.send.buffer.bytes = 102400
Socket.Receive .buffer.bytes = 102400
socket.request.max.bytes = 104857600
log.dirs = / the Data / kafka- logs need to create in advance #
Num.partitions = 10 # need to be configured larger, sharding affects write and read speeds = 1
offsets.topic.replication.factor = 1
transaction.state.log.replication.factor = 1
Transaction.state.log.min.isr = 1
log.retention.hours = 168 # expiration time
log.segment.bytes = 1073741824 = 300000
zookeeper.connect = 10.1 . 1.247 : 2181 , 10.1 . 1.248 : 2181 # zookeeper server IP and port

The other nodes have the same configuration except the following:

( 1 ) Zookeeper configuration
echo " x " > /Data/zookeeper/ myid # Unique
( 2 ) Configuration of Kafka =1 # unique = local IP

5. Start Zookeeper and Kafka

1) start zookeeper

[root@kafka1 kafka]# /usr/local/kafka/bin/ /usr/local/kafka/config/ &

The following two stations perform the same operation. During the startup process, the following error message appears Connection refused
at Method)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(
at org.apache.zookeeper.server.quorum.QuorumCnxManager.toSend(
at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.process(
at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$
[2018-06-05 23:44:36,351] INFO Resolved hostname: to address: / (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-06-05 23:44:36,490] WARN Cannot open channel to 2 at election address / (org.apache.zookeeper.server.quorum.QuorumCnxManager) Connection refused
at Method)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(
at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(

Since the zookeeper cluster is started, each node tries to connect to other nodes in the cluster. The first startup is certainly not connected to the following nodes. Therefore, the exceptions in the previous section of the log are negligible. As you can see from the latter part, the cluster is finally stable after selecting a leader.
Other nodes may also have similar conditions, which are normal

Zookeeper service check

[rootkafka1 ~]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 :::2181 :::* LISTEN 33644/java
tcp 0 0 ::ffff: :::* LISTEN 33644/java

[root@kafka2 ~]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 :::2181 :::* LISTEN 35016/java
Tcp 0 0 ::ffff: 10.1 . 1.248 : 2888 :::* LISTEN 35016 / java # which is the leader, then he has 2888 ports
tcp 0 0 ::ffff: :::* LISTEN 35016/java

2) Start kafka

/usr/local/kafka/bin/ /usr/local/kafka/config/ &

If you have the following error

[2018-06-05 23:52:30,323] ERROR Processor got uncaught exception. (
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
Caused by: Unknown name or service
... 10 more
Caused by: Unknown name or service
at Method)
... 12 more

Edit the hosts file and add to resolve the current host name
[root@kafka1 ~]# cat /etc/hosts localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

Start zookeeper and kafka on other nodes
After the startup is complete, perform some tests

(1) Create a theme

[root@kafka1 ~]# /usr/local/kafka/bin/ --create --zookeeper localhost: 2181 --replication-factor 2 --partitions 1 -- topic summer # NOTE :factor size cannot More than the number of brokers, otherwise an error occurs. The current cluster broker value is 2
Created topic "summer".

(2) Check which topics have been created

[root @ kafka1 ~] /usr/local/kafka/bin/ --list --zookeeper 10.1 . 1.247 : 2181 # cluster list all Topic

(3) View topic details

[root@kafka1 ~]# /usr/local/kafka/bin/ --describe --zookeeper --topic summer
Topic:summer PartitionCount:1 ReplicationFactor:2 Configs:
Topic: summer Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1

Subject Name:summer

Partition: Only one, starting from 0

leader : Broker with id 2

The # Replicas copy exists on top of the broker id 2, 1 .

Isr: active broker

(4) Send message, here is the producer role

[rootkafka1 ~]# /bin/bash /usr/local/kafka/bin/ --broker-list 10.1 . 1.247 : 9092 -- topic summer

Hello,MR.John #Enter something, Enter

(5) receive messages, here is the consumer role

[root@kafka2 kafka]# /usr/local/kafka/bin/ --zookeeper --topic summer --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of zookeeper.

If you can receive the message from the producer as above, then the kafka-based zookeeper cluster is successful.

6. Configure the client rsyslog

1. View the current rsyslog version

[root@log-client1 ~]# rsyslogd -v
rsyslogd 5.8.10, compiled with:

Rsyslog support for kafka is provided after v8.7.0 release, all need to upgrade the system rsyslog version

2. Upgrade rsyslog

Wget -O /etc/yum.repos.d/rsyslog.repo # Download the yum source

yum update rsyslog -y   
Yum install rsyslog -kafka -y # install rsyslog- kafka module
Ll /lib64/rsyslog/ # Check whether the module is installed
/etc/init.d/rsyslog restart # Restart the service

Check for updated version

[root@log-client1 yum.repos.d]# wrsyslogd -v
rsyslogd 8.30.0, compiled with:

3. The following test passes nginx logs to kafka through rsyslog (nginx is installed in advance)

(1) Edit the rsyslog configuration file

[root@log-client1 yum.repos.d]# cat /etc/rsyslog.d/nginx_kafka.conf

Load omkafka and imfile modules


nginx template

template(name="nginxAccessTemplate" type="string" string="%hostname%<-+>%syslogtag%<-+>%msg%\n")


ruleset(name="nginx-kafka") {
# Log forwarding kafka
action (
confParam=["compression.codec=snappy", "queue.buffering.max.messages=400000"]

Define the source and settings related actions

input(type="imfile" Tag="nginx,aws" File="/var/log/nginx/access.log" Ruleset="nginx-kafka")

Test conf file for syntax error

[root@log-client1 yum.repos.d]# rsyslogd -N 1
rsyslogd: version 8.30.0, config validation run (level 1), master config /etc/rsyslog.conf
rsyslogd: End of config validation run. Bye.

Restart rsyslog after the test is complete. Otherwise, the configuration does not take effect.

[root@log-client1 yum.repos.d]# /etc/init.d/rsyslog restart

Start nginx, add two test pages, access

Switch to kafka cluster server and check the topic list

[root@localhost ~]# /usr/local/kafka/bin/ --list --zookeeper

You can see that in addition to the summer created by the previous test, an additional test_nginx topic is configured.

7. Logstash to transmit kafka data to elasticsearch

[root@logstash1 ~]# cat /usr/local/logstash/conf/test_nginx.conf
input {
zk_connect => "," # kafka
group_id => "logstash"
topic_id => "test_nginx"
reset_beginning => false
consumer_threads => 5
decorate_events => true
output {
elasticsearch {
hosts => ["",""] # elasticsearch
index => "test-nginx-%{+YYYY-MM}"

Test grammar

[root@logstash1 ~]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf/test_nginx.conf -t
Configuration OK

Start the service, the remaining nodes also start the service

[root@logstash1 log]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf/test_nginx.conf
Default settings used: Filter workers: 1
Logstash startup completed

Switch to the ES cluster node to see:

[root@es1 ~]# curl -XGET ''
health status index pri rep docs.count docs.deleted store.size
green open logstash-2017.11.08 5 1 4 0 40.7kb 20.3kb
green open test-nginx-2017-11 5 1 1 0 12.5kb 6.2kb

As you can see, the test-nginx index already has.
In the web interface to access the head plugin, retry the test, use forced refresh. Visit the head plugin web interface, the latest visit to two records has come out.

8. Deployment of Kibana

Install kibana on ES cluster nodes

[root@es1 ~]# tar xf kibana-4.2.1-linux-x64.tar.gz -C /usr/local/
[root@es1 ~]# cd /usr/local/
[root@es1 local]# ln -sv kibana-4.2.1-linux-x64/ kibana
"kibana" -> "kibana-4.2.1-linux-x64/"
[root@es1 local]# cd kibana
[root@es1 kibana]# grep " ^[aZ] " config/ kibana.yml # Configure the host port to have the elasticsearch server IP address and port
server.port: 5601 ""
elasticsearch.url: ""

Now let’s open the kibana, configure an index and create a view to test the data.

Conclusion: That’s all for now, here we learned about how to build a cluster using ELK, Zookeeper, Kafka and Rsyslog. Let’s make your hands dirty, if you have any questions regarding this post please drop your comment in the below comment section. Happy learning.

Author Bio: Kiran Gutha

The author has an experience of more than 6 years of corporate experience in various technology platforms such as Big Data, AWS, Data Science, Machine Learning, Linux, Python, SQL, JAVA, Oracle, Digital Marketing etc. He is a technology nerd and loves contributing to various open platforms through blogging. He is currently in association with a leading professional training provider, Mindmajix Technologies INC. and strives to provide knowledge to aspirants and professionals through personal blogs, research, and innovative ideas.

Top comments (2)

aymenoss profile image
Rahal Aymen

Hello Kiran, thanks for this tutorial, I am working with diffrent versions Elasticsearch 6.3.0 and I am working with spark-kafka-streaming, I need to read from csv file which are opened and every time they will right in it (CDR file) and every time I have to extract the new lines from the doc to save it to Elasticsearch. could you help me please, I just start with kafka.

rusty_sys_dev profile image

I loved the post! It was just what I needed to help get me started with kafka + logstash integration, been using kafka for a while now, but just looking into Elasticsearch / Logstash recently..

The article is a bit hard to read though. I recommend using code blocks for your config files and shell snippets, as it would greatly improve readability.

I made this account to see if I could edit that for you, but it seems I cannot.