DEV Community

Bidhan Khatri
Bidhan Khatri

Posted on • Originally published at bidhankhatri.com.np

Network Traffic Analysis using ElastiFlow

Introduction (NetFlow, IPFIX, sFLOW)

Network monitoring is a systematic effort to monitor parameters of a computer network in order to detect issues that degrade network performance. Network Traffic Analysis is used to deduce information from patterns collected during network monitoring.

Packet analysis and flow analysis are two technologies that we can choose from when we perform traffic analysis on the network.

Packet Analysis uses packet capturing technologies such as SPAN, RSPAN, ERSPAN to get raw copies of traffic. Packet analysis is suitable for the in depth analysis of a specific conversation as the full packet header along with the payload is collected. In contrast to packet analysis, flow analysis is about collecting the metadata from network traffic used for statistical purposes (e.g. top talkers, traffic by protocols, bandwidth usage etc.).

What is a Flow?
A flow is a sequence of packets sharing the same properties that are sent between a sending and a receiving host. For instance, when we watch live streaming video, packets sent from server to PC create a flow as they are part of the same conversation.

What is NetFlow?
NetFlow is a Cisco proprietary network protocol used for flow analysis. NetFlow collects and aggregates information about network traffic flowing through a device with an enabled NetFlow feature. The output of NetFlow is flow records that are sent to a centralized place in a network (flow collector) as NetFlow messages.

Why do we need NetFlow?
Flow statistics collected by the NetFlow protocol are typically used for:

  • bandwidth monitoring
  • network threat detection (DoS attacks) and forensic analysis
  • accounting or billing based on usage
  • investigation of network problems that cause congestion and slowness of applications.

What are the NetFlow Infrastructure Elements?

  • NetFlow exporters
  • NetFlow collector
  • Analysis application

How are the Flows Created?
NetFlow enabled devices (NetFlow exporters) create NetFlow records aggregating packets into flows based on the criteria below:

IP Source Address, IP Destination Address, Source Layer 4 port, Destination Layer 4 port, Class of Service, IP Protocol, Source Interface.

Each packet that is going to be forwarded is examined for the above parameters. The first unique packet creates a flow as an entry in the NetFlow cache (flow record). The packet is then forwarded out of the router. The other packets matching the same parameters are aggregated to this flow and the bytes counter for the flow increases. If any of the parameters is not matched, a new flow is created in the cache.

NetFlow Record is Created:
There are hundreds of thousands flows recorded in the NetFlow cache. Obviously, flows do not live in cache forever, instead they are exported from the cache to a flow collector on a regular basis. A flow is exported when it is inactive for a certain time e. g. no new packets are received for the flow. By default, the inactive flow timer is set to 15 seconds. The flow is also exported when it is long-lived (active) and lasts longer than the active timer. By default, the active timer is set to 30 minutes. For instance, a large file download that lasts longer than 30 minutes may be broken into multiple flows. It is the role of the flow collector to combine these flows showing the total download.

NetFlow Collector:
Multiple NetFlow exporters send records periodically to one or more NetFlow collectors using the User Datagram Protocol (UDP) or Stream Control Transmission Protocol (SCTP) when reliable transport is required. The role of a collector is to gather, record, combine or aggregate the exported flows to produce the reports used for traffic and security analysis.

The Analysis Applications:
Such applications like ElastiFlow analyze the received flow data for the purpose of intrusion detection or traffic profiling. They are also responsible for the presentation of data and the creation of reports.

NetFlow variations by other Vendors:
Vendors other than Cisco have their own versions of Netflow, such as jFlow (Juniper), rFow (Ericcson), sFlow (HP) etc.

Sampled Flow aka sFlow

Although sFlow stands for Sampled Flow, actual packets are being sampled here instead of flows. Sampling involves either copying headers of packets or extracting features from packets. The goal of packet sampling and filtering is to forward only certain packets. Based on a defined sampling rate, an average of 1 out of n packets is randomly sampled. sFlow agent running within a router or a switch, packages interface counters and flow samples into sFlow datagrams. The datagrams are continuously sent using UDP to the sFlow Collector where they are further analyzed.

NetFlow versus sFlow

SFlow has an ability to monitor L2-L7 headers, the ability to monitor L2 headers (MAC, VLAN ID) has been added to NetFlow v9.

Packet sampling is hardware-based and is performed by switching ASICs, achieving wire-speed performance. It makes sFlow a scalable technology that is able to monitor the links with a speed of up to 10 Gbps.

sFlow datagrams are continuously sent across the network in real-time, while the export of NetFlow records depends on active/inactive timers. It may take up to 30 minutes to export flow when NetFlow is used. Obviously, sFlow is better in traffic visibility than NetFlow. It makes sFlow good at massive DoS attacks detection, as the sampled network patterns are sent on the fly to the sFlow collector.

Nevertheless, measurements provided by sFlow are only an approximation of the real traffic because sampled packets do not reflect all network traffic. As a result, sFlow lacks accuracy provided by NetFlow as it cannot track every network communication. The accuracy, however, is highly required in digital forensics so sFlow cannot fully qualify for forensic investigation.

IPFIX

Internet Protocol Flow Information Export (IPFIX) is a standard for exporting the information about network flows from devices. It is derived from Cisco’s proprietary NetFlow v9. A metering process generates flow records collecting data packets at an Observation Point, filters them and aggregates information about these packets. Flow records are sent by the Exporting process running on exporter as IPFIX messages encapsulated by layer 4 protocols (SCTP, UDP or TCP) to a collector. The messages are pushed to the collector without any interaction by the collector.

IPFIX can be used to export any traffic information from L2-L7 to flow collector. It is a flexible protocol that supports variable length fields. It allows collecting information such as HTTP URL or host (e.g. facebook.com) as well as the user-defined data types. For instance, Syslog or SNMP data or even room temperature values can be continuously exported to the collector inside the IPFIX messages.

All above information collected from here.

Introduction about ElastiFlow

Alt Text
ElastiFlow is a NetFlow analyzer that works with ELK Stack. It provides network flow data collection and visualization using Elastic Stack. To install and configure ElastiFlow, you must first have a working Elastic Stack environment.

It supports Netflow v5/v9, sFlow and IPFIX flow types (1.x versions support only Netflow v5/v9).

Refer to the following compatibility chart to choose a release of ElastiFlow™ that is compatible with the version of the Elastic Stack you are using.

Elastic Stack ElastiFlow 3.x ElastiFlow 4.x
7.8+ v4.0.x
7.5.-7.7 ✓ v4.0.0-beta
7.0-7.4 ✓ v3.5.x
6.7 ✓ v3.4.2
6.6 ✓ v3.4.1
6.5
6.4
6.3

Use the following table as a guide to understanding the volume of flow data your network will produce and tune your logstash accordingly.

Flows/Sec (v)CPUs Memory Disk (30-days) ES JVM Heap LS JVM Heap
250 4 32 GB 512 GB 12 GB 4 GB
500 6 48 GB 1 TB 16 GB 4 GB
1000 8 64 GB 2 TB 24 GB 6 GB
1500 12 96 GB 3 TB 31 GB 6 GB

I have ELasti Stack version 7.5.0 in a cluster environment. Therefore, I will be using ElastiFlow v4.0.0-beta.

Install and update required Elastiflow plugins for logstash.

sudo /usr/share/logstash/bin/logstash-plugin install logstash-codec-sflow
sudo /usr/share/logstash/bin/logstash-plugin update logstash-codec-netflow
sudo /usr/share/logstash/bin/logstash-plugin update logstash-input-udp
sudo /usr/share/logstash/bin/logstash-plugin update logstash-input-tcp
sudo /usr/share/logstash/bin/logstash-plugin update logstash-filter-dns
sudo /usr/share/logstash/bin/logstash-plugin update logstash-filter-geoip
sudo /usr/share/logstash/bin/logstash-plugin update logstash-filter-translate
Enter fullscreen mode Exit fullscreen mode

Install and configure ElastiFLow

For ElastiStack 7.5.0, download ElastiFlow v4.0.0-beta.

cd /usr/local/src/
wget https://github.com/robcowart/elastiflow/archive/v4.0.0-beta1.tar.gz
tar xvf v4.0.0-beta1.tar.gz
Enter fullscreen mode Exit fullscreen mode

Copy elastiflow configuration to logstash directory.

cp -a elastiflow-4.0.0-beta1 /etc/logstash/
Enter fullscreen mode Exit fullscreen mode

There are five sets of configuration files provided within the /etc/logstash/elastiflow folder:

logstash
  `- elastiflow
       |- conf.d  (contains the logstash pipeline)
       |- definitions  (custom Netflow and IPFIX field definitions)
       |- dictionaries (yaml files used to enrich raw flow data)
       |- geoipdbs  (contains GeoIP databases)
       |- templates  (contains index templates)
       `- user_settings (yaml files intended for user customization)
Enter fullscreen mode Exit fullscreen mode

Now Add the ElastiFlow pipeline to pipelines.yml.

vim /etc/logstash/pipelines.yaml

- pipeline.id: elastiflow
  path.config: "/etc/logstash/elastiflow/conf.d/*.conf"
Enter fullscreen mode Exit fullscreen mode

Elastiflow variables

Rather than directly editing the pipeline configuration files which is in /etc/logstash/elastiflow/conf.d directory, environment variables are used to provide a single location for most configuration options.

Remember that for your changes to take effect, you must issue the command

systemctl daemon-reload.

Here I have defined credential for ElasticSearch and enabled multiple ES nodes. In Logstash Port 2055 will be listenting for IPFIX flow data. You can change to different port.

cp -a /usr/local/src/elastiflow-4.0.0-beta1/logstash.service.d /etc/systemd/system/
Enter fullscreen mode Exit fullscreen mode
[Service]
# ElastiFlow global configuration
Environment="ELASTIFLOW_DICT_PATH=/etc/logstash/elastiflow/dictionaries"
Environment="ELASTIFLOW_USER_SETTINGS_PATH=/etc/logstash/elastiflow/user_settings"
Environment="ELASTIFLOW_DEFINITION_PATH=/etc/logstash/elastiflow/definitions"
Environment="ELASTIFLOW_TEMPLATE_PATH=/etc/logstash/elastiflow/templates"
Environment="ELASTIFLOW_GEOIP_DB_PATH=/etc/logstash/elastiflow/geoipdbs"
Environment="ELASTIFLOW_GEOIP_CACHE_SIZE=8192"
Environment="ELASTIFLOW_GEOIP_LOOKUP=true"
Environment="ELASTIFLOW_ASN_LOOKUP=true"
Environment="ELASTIFLOW_OUI_LOOKUP=false"
Environment="ELASTIFLOW_POPULATE_LOGS=true"
Environment="ELASTIFLOW_KEEP_ORIG_DATA=true"
Environment="ELASTIFLOW_DEFAULT_APPID_SRCTYPE=__UNKNOWN"

# Elasticsearch connection settings
Environment="ELASTIFLOW_ES_USER=elastic"
Environment="ELASTIFLOW_ES_PASSWD=password"

# It is also necessary to rename the output files to disable single node output, and enable multi-node.
Environment="ELASTIFLOW_ES_HOST_1=xxx.xxx.xxx.xxx:9200"
Environment="ELASTIFLOW_ES_HOST_2=xxx.xxx.xxx.xxx:9200"
Environment="ELASTIFLOW_ES_HOST_3=xxx.xxx.xxx.xxx:9200"

# IPFIX - IPv4
Environment="ELASTIFLOW_IPFIX_UDP_IPV4_HOST=0.0.0.0"
Environment="ELASTIFLOW_IPFIX_UDP_IPV4_PORT=2055"

# IPFIX - UDP input options
Environment="ELASTIFLOW_IPFIX_UDP_WORKERS=4"
Environment="ELASTIFLOW_IPFIX_UDP_QUEUE_SIZE=4096"
Environment="ELASTIFLOW_IPFIX_UDP_RCV_BUFF=33554432"
Enter fullscreen mode Exit fullscreen mode

I am going to enable only IPFIX flow and disabling all other elastiflow configuration.

cd /etc/logstash/elastiflow/conf.d/
mv 10_input_ipfix_ipv4.logstash.conf.disabled 10_input_ipfix_ipv4.logstash.conf
mv 30_output_10_single.logstash.conf 30_output_10_single.logstash.conf.disabled
mv 30_output_20_multi.logstash.conf.disabled 30_output_20_multi.logstash.conf
Enter fullscreen mode Exit fullscreen mode

That is all that needs to be modified so save the file and we are ready to create the Logstash startup script which will take the variables we just edited and use them in the actual config files for the Elastiflow pipeline. If you make more changes to elastiflow.conf later, make sure to rerun this script and restart Logstash.

Restart logstash service

/usr/share/logstash/bin/system-install
systemctl daemon-reload
systemctl start logstash
Enter fullscreen mode Exit fullscreen mode

In logstash version 7.5.1 you might hit by the bug *"Exception in inputworker"** seen in logstash log file and unable to listen in UDP port 2055.*

[2020-12-31T15:42:21,436][ERROR][logstash.inputs.udp ][elastiflow] Exception in inputworker {"exception"=>java.lang.NullPointerException, "backtrace"=>["org.jruby.runtime.invokedynamic.InvokeDynamicSupport.callMethodMissing(InvokeDynamicSupport.java:86)", "org.jruby.runtime.invokedynamic.MathLinker.callMethod(MathLinker.java:418)", "org.jruby.runtime.invokedynamic.MathLinker.fixnumOperatorFail(MathLinker.java:209)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_netflow_minus_4_dot_2_dot_1.lib.logstash.codecs.netflow.RUBY$method$uint_field$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netflow-4.2.1/lib/logstash/codecs/netflow.rb:391)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_netflow_minus_4_dot_2_dot_1.lib.logstash.codecs.netflow.RUBY$method$netflow_field_for$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netflow-4.2.1/lib/logstash/codecs/netflow.rb:443)", "usr.share.logstash.vendor.bundle.jruby.
[2020-12-31T15:42:45,436][ERROR][logstash.inputs.udp ][elastiflow] Exception in inputworker {"exception"=>java.lang.NullPointerException, "backtrace"=>["org.jruby.runtime.invokedynamic.InvokeDynamicSupport.callMethodMissing(InvokeDynamicSupport.java:86)", "org.jruby.runtime.invokedynamic.MathLinker.callMethod(MathLinker.java:418)", "org.jruby.runtime.invokedynamic.MathLinker.fixnumOperatorFail(MathLinker.java:209)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_netflow_minus_4_dot_2_dot_1.lib.logstash.codecs.netflow.RUBY$method$uint_field$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netflow-4.2.1/lib/logstash/codecs/netflow.rb:391)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_netflow_minus_4_dot_2_dot_1.lib.logstash.codecs.netflow.RUBY$method$netflow_field_for$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netflow-4.2.1/lib/logstash/codecs/netflow.rb:443)", "usr.share.logstash.vendor.bundle.jruby.
[2020-12-31T15:43:18,436][ERROR][logstash.inputs.udp ][elastiflow] Exception in inputworker {"exception"=>java.lang.NullPointerException, "backtrace"=>["org.jruby.runtime.invokedynamic.InvokeDynamicSupport.callMethodMissing(InvokeDynamicSupport.java:86)", "org.jruby.runtime.invokedynamic.MathLinker.callMethod(MathLinker.java:418)", "org.jruby.runtime.invokedynamic.MathLinker.fixnumOperatorFail(MathLinker.java:209)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_netflow_minus_4_dot_2_dot_1.lib.logstash.codecs.netflow.RUBY$method$uint_field$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netflow-4.2.1/lib/logstash/codecs/netflow.rb:391)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_netflow_minus_4_dot_2_dot_1.lib.logstash.codecs.netflow.RUBY$method$netflow_field_for$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netflow-4.2.1/lib/logstash/codecs/netflow.rb:443)", "usr.share.logstash.vendor.bundle.jruby.
[2020-12-31T15:43:58,436][ERROR][logstash.inputs.udp ][elastiflow] Exception in inputworker {"exception"=>java.lang.NullPointerException, "backtrace"=>["org.jruby.runtime.invokedynamic.InvokeDynamicSupport.callMethodMissing(InvokeDynamicSupport.java:86)", "org.jruby.runtime.invokedynamic.MathLinker.callMethod(MathLinker.java:418)", "org.jruby.runtime.invokedynamic.MathLinker.fixnumOperatorFail(MathLinker.java:209)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_netflow_minus_4_dot_2_dot_1.lib.logstash.codecs.netflow.RUBY$method$uint_field$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netflow-4.2.1/lib/logstash/codecs/netflow.rb:391)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_netflow_minus_4_dot_2_dot_1.lib.logstash.codecs.netflow.RUBY$method$netflow_field_for$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-netflow-4.2.1/lib/logstash/codecs/netflow.rb:443)", "usr.share.logstash.vendor.bundle.jruby.

Enter fullscreen mode Exit fullscreen mode

java.lang.NullPointerException was coming from JRuby and is resolved in version 9.2.9. JRuby 9.2.9 is only reached to LS 7.6 so 7.4/7.5 users will experience this bug.
To resolve this issue you have 2 choices. Either rollback to previous version 7.3 or update to latest logstash. Logstash doesn't need to be the same version as Elasticsearch and Kibana. I upgraded logstash to v7.10.1 and the error was resolved.

yum update logstash
systemctl restart logstash
Enter fullscreen mode Exit fullscreen mode
rpm -qa | grep logstash
    logstash-7.10.1-1.x86_64
Enter fullscreen mode Exit fullscreen mode

Also, you may also see this warning in /var/log/logstash/logstash-plain.log

[2020-12-31T16:43:05,836][WARN ][logstash.codecs.netflow ][elastiflow] Can't (yet) decode flowset id 260 from observation domain id 56756, because no template to decode it with has been received. This message will usually go away after 1 minute.

This means the flow collecter is not been able to decode incoming Netflow v9 or IPFIX flow records until it has received a template from the exporter which describes the structure of those records. For this reason, you may see a log message similar to that described above, for a few minutes after Logstash has been started with the ElastiFlow pipeline.

If the template received from the exporter refers to an Information Element (IE)/field that is not known to the Logstash Netflow CODEC, it will silently discard the template and these messages will persist.

Logstash takes a little time to start... BE PATIENT!

If using Netflow v9 or IPFIX you will likely see warning messages related to the flow templates not yet being received. They will disappear after templates are received from the network devices, which should happen every few minutes. Some devices can take a bit longer to send templates. Fortinet in particular sends templates rather infrequently.

That's it. Logstash setup is now complete. If you are receiving flow data, you should have an elastiflow-4.0.0-YY.MM.DD index in Elasticsearch.

Now Tune Linux for improved UDP Throughput

Add below line at the end of file in /etc/sysctl.conf

vim /etc/sysctl.conf

net.core.rmem_max=33554432
Enter fullscreen mode Exit fullscreen mode
sysctl -p
Enter fullscreen mode Exit fullscreen mode

.

Import ElastiFlow Dashboard in Kibana

The visualizations and dashboards can be loaded into Kibana by importing the kibana/elastiflow.kibana..ndjson file from within the Kibana UI.

Management > Kibana Section > Saved Objects > Import

In my case I will be importing file "elastiflow.kibana.7.5.x.ndjson" from the Elastiflow folder which I downloaded before.

Reverse proxy (or small Kibana max payload size) can prevent Kibana dashboard import.

You may face dashboard import issue if your kibana is behind the proxy. For that you may need to tweak some parameters in Nginx.

proxy_read_timeout 900s;

client_max_body_size 8388608;

you may also need to define that in /etc/kibana/kibana.yaml file.

server.maxPayloadBytes: 8388608
Enter fullscreen mode Exit fullscreen mode
systemctl restart kibana

nginx -t
service nginx reload
Enter fullscreen mode Exit fullscreen mode

Now you can send IPFIX flow data from network devices.

ElastiFlow Kibana Dashboard
Alt Text

Alt Text

Oldest comments (0)