<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Cey</title>
    <description>The latest articles on DEV Community by Cey (@akarce).</description>
    <link>https://dev.to/akarce</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1701777%2F051a703d-067f-4959-add4-d5823e1f75d9.jpeg</url>
      <title>DEV Community: Cey</title>
      <link>https://dev.to/akarce</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/akarce"/>
    <language>en</language>
    <item>
      <title>ELK Stack Mastery: Building a Scalable Log Management System</title>
      <dc:creator>Cey</dc:creator>
      <pubDate>Mon, 04 Nov 2024 23:22:49 +0000</pubDate>
      <link>https://dev.to/akarce/elk-stack-mastery-building-a-scalable-log-management-system-5hcn</link>
      <guid>https://dev.to/akarce/elk-stack-mastery-building-a-scalable-log-management-system-5hcn</guid>
      <description>&lt;h2&gt;
  
  
  Youtube Tutorial
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://youtu.be/U1i5sIZzEQM" rel="noopener noreferrer"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgzo18fca6c2omrxaszsh.jpg" alt="Watch the tutorial" width="480" height="360"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;This project sets up an Elastic Cluster with 3 nodes using Virtualbox virtual machines. It includes the setup of Elasticsearch, Logstash, and Kibana (ELK stack) for log management and analysis.&lt;/p&gt;

&lt;h2&gt;
  
  
  Project Goals
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Set up Elastic Cluster with all necessary components.&lt;/li&gt;
&lt;li&gt;Create an index with a retention period of 10 days in Hot, 10 days in Cold, and 10 days in Frozen tiers.&lt;/li&gt;
&lt;li&gt;Load logs using one of the methods listed in the setup.&lt;/li&gt;
&lt;li&gt;Create a Dashboard with drilldown capabilities.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;VirtualBox installed on your system&lt;/li&gt;
&lt;li&gt;Debian 12 ISO image&lt;/li&gt;
&lt;li&gt;Sufficient system resources to run 3 VMs&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  VM Configuration
&lt;/h2&gt;

&lt;p&gt;Create 3 VMs with the following specifications:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;elktest1&lt;/strong&gt; (Master + Data_Hot + Data_Content, Kibana, Logstash)

&lt;ul&gt;
&lt;li&gt;8 GB RAM, 4 CPU, 40 GB storage&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;elktest2&lt;/strong&gt; (Data_Cold, Logstash)

&lt;ul&gt;
&lt;li&gt;8 GB RAM, 4 CPU, 40 GB storage&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;elktest3&lt;/strong&gt; (Data_Frozen, Logstash)

&lt;ul&gt;
&lt;li&gt;6 GB RAM, 3 CPU, 50 GB storage&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Setup Instructions
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. VM Installation
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Download Debian 12 ISO:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-12.7.0-amd64-netinst.iso&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Install Debian on each VM.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In VM settings, change network from NAT to Bridged Adapter.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  2. SSH Setup
&lt;/h3&gt;

&lt;p&gt;Install SSH on each VM:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;su -
apt-get update
apt-get &lt;span class="nb"&gt;install &lt;/span&gt;openssh-server
systemctl start ssh
systemctl &lt;span class="nb"&gt;enable &lt;/span&gt;ssh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Reboot and get IP addresses:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;reboot now
ip addr show
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Connect from host machine:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;ssh &amp;lt;username&amp;gt;@&amp;lt;your_ip_address&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  3. Elasticsearch Installation
&lt;/h3&gt;

&lt;p&gt;On all VMs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;apt &lt;span class="nb"&gt;install &lt;/span&gt;curl
curl &lt;span class="nt"&gt;-fsSL&lt;/span&gt; &amp;lt;https://artifacts.elastic.co/GPG-KEY-elasticsearch&amp;gt; | gpg &lt;span class="nt"&gt;--dearmor&lt;/span&gt; &lt;span class="nt"&gt;-o&lt;/span&gt; /usr/share/keyrings/elastic.gpg
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"deb [signed-by=/usr/share/keyrings/elastic.gpg] &amp;lt;https://artifacts.elastic.co/packages/8.x/apt&amp;gt; stable main"&lt;/span&gt; | &lt;span class="nb"&gt;tee&lt;/span&gt; &lt;span class="nt"&gt;-a&lt;/span&gt; /etc/apt/sources.list.d/elastic-8.x.list
apt update
apt &lt;span class="nb"&gt;install &lt;/span&gt;elasticsearch
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  4. Elasticsearch Configuration
&lt;/h3&gt;

&lt;h3&gt;
  
  
  On elktest1:
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Edit &lt;code&gt;/etc/elasticsearch/elasticsearch.yml&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;cluster.name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;elktestcluster&lt;/span&gt;
&lt;span class="na"&gt;node.name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;elktest1&lt;/span&gt;
&lt;span class="na"&gt;node.roles&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;master"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data_hot"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data_content"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
&lt;span class="na"&gt;cluster.initial_master_nodes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;elktest1"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
&lt;span class="na"&gt;path.data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/var/lib/elasticsearch&lt;/span&gt;
&lt;span class="na"&gt;path.logs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/var/log/elasticsearch&lt;/span&gt;
&lt;span class="na"&gt;network.host&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;0.0.0.0&lt;/span&gt;
&lt;span class="na"&gt;http.port&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;9200&lt;/span&gt;
&lt;span class="na"&gt;discovery.seed_hosts&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;elktest1"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
&lt;span class="na"&gt;xpack.security.enabled&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
&lt;span class="na"&gt;xpack.security.enrollment.enabled&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
&lt;span class="na"&gt;xpack.security.http.ssl&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;enabled&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
  &lt;span class="na"&gt;keystore.path&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;certs/http.p12&lt;/span&gt;
&lt;span class="na"&gt;xpack.security.transport.ssl&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;enabled&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
  &lt;span class="na"&gt;verification_mode&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;certificate&lt;/span&gt;
  &lt;span class="na"&gt;keystore.path&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;certs/transport.p12&lt;/span&gt;
  &lt;span class="na"&gt;truststore.path&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;certs/transport.p12&lt;/span&gt;
&lt;span class="na"&gt;http.host&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;0.0.0.0&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Start Elasticsearch:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;systemctl start elasticsearch
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Reset elastic user password:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;/usr/share/elasticsearch/bin/elasticsearch-reset-password &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-u&lt;/span&gt; elastic
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Generate enrollment tokens for other nodes:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cd&lt;/span&gt; /usr/share/elasticsearch/bin
./elasticsearch-create-enrollment-token &lt;span class="nt"&gt;-s&lt;/span&gt; node
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  On elktest2 and elktest3:
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Reconfigure node with enrollment token:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cd&lt;/span&gt; /usr/share/elasticsearch/bin
./elasticsearch-reconfigure-node &lt;span class="nt"&gt;--enrollment-token&lt;/span&gt; &amp;lt;your_enrollment_token&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Edit &lt;code&gt;/etc/elasticsearch/elasticsearch.yml&lt;/code&gt;:&lt;/p&gt;

&lt;p&gt;For elktest2:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;cluster.name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;elktestcluster&lt;/span&gt;
&lt;span class="na"&gt;node.name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;elktest2&lt;/span&gt;
&lt;span class="na"&gt;node.roles&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data_cold"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
&lt;span class="na"&gt;path.data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/var/lib/elasticsearch&lt;/span&gt;
&lt;span class="na"&gt;path.logs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/var/log/elasticsearch&lt;/span&gt;
&lt;span class="na"&gt;network.host&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;0.0.0.0&lt;/span&gt;
&lt;span class="na"&gt;http.port&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;9200&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;


&lt;p&gt;For elktest3:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;cluster.name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;elktestcluster&lt;/span&gt;
&lt;span class="na"&gt;node.name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;elktest3&lt;/span&gt;
&lt;span class="na"&gt;node.roles&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data_frozen"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
&lt;span class="na"&gt;path.data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/var/lib/elasticsearch&lt;/span&gt;
&lt;span class="na"&gt;path.logs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/var/log/elasticsearch&lt;/span&gt;
&lt;span class="na"&gt;network.host&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;0.0.0.0&lt;/span&gt;
&lt;span class="na"&gt;http.port&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;9200&lt;/span&gt;
&lt;span class="na"&gt;xpack.searchable.snapshot.shared_cache.size&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;30%&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Start Elasticsearch on both nodes:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;systemctl start elasticsearch
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  5. Index Lifecycle Management
&lt;/h3&gt;

&lt;p&gt;Create ILM policy:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;PUT _ilm/policy/elktestcluster_logs_policy
&lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"policy"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="s2"&gt;"phases"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="s2"&gt;"hot"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="s2"&gt;"actions"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="s2"&gt;"rollover"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                        &lt;span class="s2"&gt;"max_size"&lt;/span&gt;: &lt;span class="s2"&gt;"40gb"&lt;/span&gt;,
                        &lt;span class="s2"&gt;"max_age"&lt;/span&gt;: &lt;span class="s2"&gt;"10d"&lt;/span&gt;
                    &lt;span class="o"&gt;}&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;,
            &lt;span class="s2"&gt;"warm"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="s2"&gt;"min_age"&lt;/span&gt;: &lt;span class="s2"&gt;"10d"&lt;/span&gt;,
                &lt;span class="s2"&gt;"actions"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="s2"&gt;"forcemerge"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                        &lt;span class="s2"&gt;"max_num_segments"&lt;/span&gt;: 1
                    &lt;span class="o"&gt;}&lt;/span&gt;,
                    &lt;span class="s2"&gt;"allocate"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                        &lt;span class="s2"&gt;"require"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                            &lt;span class="s2"&gt;"data"&lt;/span&gt;: &lt;span class="s2"&gt;"cold"&lt;/span&gt;
                        &lt;span class="o"&gt;}&lt;/span&gt;
                    &lt;span class="o"&gt;}&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;,
            &lt;span class="s2"&gt;"cold"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="s2"&gt;"min_age"&lt;/span&gt;: &lt;span class="s2"&gt;"20d"&lt;/span&gt;,
                &lt;span class="s2"&gt;"actions"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="s2"&gt;"freeze"&lt;/span&gt;: &lt;span class="o"&gt;{}&lt;/span&gt;,
                    &lt;span class="s2"&gt;"allocate"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                        &lt;span class="s2"&gt;"require"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                            &lt;span class="s2"&gt;"data"&lt;/span&gt;: &lt;span class="s2"&gt;"frozen"&lt;/span&gt;
                        &lt;span class="o"&gt;}&lt;/span&gt;
                    &lt;span class="o"&gt;}&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Assign policy to index template:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;PUT _index_template/elktestcluster_logs_template
&lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s2"&gt;"index_patterns"&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"elktestcluster-logs-*"&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;,
  &lt;span class="s2"&gt;"template"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"settings"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="s2"&gt;"number_of_shards"&lt;/span&gt;: 1,
      &lt;span class="s2"&gt;"number_of_replicas"&lt;/span&gt;: 1,
      &lt;span class="s2"&gt;"index.lifecycle.name"&lt;/span&gt;: &lt;span class="s2"&gt;"elktestcluster_logs_policy"&lt;/span&gt;,
      &lt;span class="s2"&gt;"index.lifecycle.rollover_alias"&lt;/span&gt;: &lt;span class="s2"&gt;"elktestcluster-logs"&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  6. Logstash Setup
&lt;/h3&gt;

&lt;p&gt;Install Logstash on all VMs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;apt &lt;span class="nb"&gt;install &lt;/span&gt;logstash &lt;span class="nt"&gt;-y&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Add logstash user to elasticsearch group:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;sudo &lt;/span&gt;usermod &lt;span class="nt"&gt;-aG&lt;/span&gt; elasticsearch logstash
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create Logstash pipeline configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;nano /etc/logstash/conf.d/elktestcluster-logs.con
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Add the following content:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;input {
  file {
    path =&amp;gt; [
      "/var/log/elasticsearch/elktestcluster*.json"
    ]
    start_position =&amp;gt; "beginning"
    sincedb_path =&amp;gt; "/dev/null"
    codec =&amp;gt; "json"
  }
}

output {
  elasticsearch {
    hosts =&amp;gt; ["&amp;lt;https://elktest1:9200&amp;gt;", "&amp;lt;https://elktest2:9200&amp;gt;", "&amp;lt;https://elktest3:9200&amp;gt;"]
    index =&amp;gt; "elktestcluster-logs-%{+YYYY.MM.dd}"
    user =&amp;gt; "elastic"
    password =&amp;gt; "elastic"
    ssl =&amp;gt; true
    cacert =&amp;gt; "/etc/elasticsearch/certs/http_ca.crt"
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start Logstash on all VMs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;systemctl start logstash
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  7. Kibana Setup
&lt;/h3&gt;

&lt;p&gt;Install Kibana on one VM (preferably elktest1 or elktest2):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;apt &lt;span class="nb"&gt;install &lt;/span&gt;kibana &lt;span class="nt"&gt;-y&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Reset kibana_system user password:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;/usr/share/elasticsearch/bin/elasticsearch-reset-password &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-u&lt;/span&gt; kibana_system
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Configure Kibana:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;nano /etc/kibana/kibana.yml
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Add/edit the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;server.port&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;5601&lt;/span&gt;
&lt;span class="na"&gt;server.host&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;0.0.0.0"&lt;/span&gt;
&lt;span class="na"&gt;elasticsearch.hosts&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;&amp;lt;https://elktest1:9200&amp;gt;"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;&amp;lt;https://elktest2:9200&amp;gt;"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;&amp;lt;https://elktest3:9200&amp;gt;"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
&lt;span class="na"&gt;elasticsearch.username&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kibana_system"&lt;/span&gt;
&lt;span class="na"&gt;elasticsearch.password&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kibana"&lt;/span&gt;
&lt;span class="na"&gt;elasticsearch.ssl.verificationMode&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;none&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Start Kibana:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;systemctl start kibana
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  8. Accessing Kibana
&lt;/h3&gt;

&lt;p&gt;Open a web browser and go to:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;http://&amp;lt;your_kibana_machine_ip&amp;gt;:5601
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Use the Elasticsearch credentials:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Username: elastic&lt;/li&gt;
&lt;li&gt;Password: elastic&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Final Steps
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Create a data view from cluster logs in Kibana.&lt;/li&gt;
&lt;li&gt;Create a dashboard from the data view.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/kibana_snapshot.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/kibana_snapshot.png" alt="Kibana Snapshot" width="" height=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Congratulations! You have now set up a complete ELK stack for log management and analysis.&lt;/p&gt;

</description>
      <category>elasticsearch</category>
      <category>kibana</category>
      <category>logstash</category>
      <category>virtualbox</category>
    </item>
    <item>
      <title>End to End Data Engineering OTP Pipeline Project</title>
      <dc:creator>Cey</dc:creator>
      <pubDate>Mon, 04 Nov 2024 18:21:17 +0000</pubDate>
      <link>https://dev.to/akarce/end-to-end-data-engineering-otp-pipeline-project-28gf</link>
      <guid>https://dev.to/akarce/end-to-end-data-engineering-otp-pipeline-project-28gf</guid>
      <description>&lt;h1&gt;
  
  
  End to End OTP Pipeline Project using Docker, Airflow, Kafka, KafkaUI, Cassandra, MongoDB, EmailOperator, SlackWebhookOperator and DiscordWebhookOperator
&lt;/h1&gt;

&lt;h2&gt;
  
  
  Project Overview
&lt;/h2&gt;

&lt;p&gt;Project contains 3 Kafka brokers, UI for Apache Kafka, Zookeeper, Cassandra, Mongo, Mongo-Express, airflow-init, postgres, airflow-webserver, airflow-scheduler, airflow-triggerer, airflow-cli containers.&lt;/p&gt;

&lt;p&gt;The Pipeline performs &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;creating a kafka topic named email_topic with 3 partitions, write an email address at a time triggered by user.&lt;/li&gt;
&lt;li&gt;creating cassandra keyspace and table named email_namespace and email_table and consuming kafka topic messages into cassandra.&lt;/li&gt;
&lt;li&gt;creating mongodb database and collection named email_database and email_collection and consuming topic messages into mongodb.&lt;/li&gt;
&lt;li&gt;checking existence of the email and otp code in these two databases and pushing email notifications, slack channel messages and discord messages simultaneously.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Configuring Email Notifications in Airflow
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Obtain an App Password for your Microsoft Account:
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Go to the Security tab and select Manage how I sign in to Microsoft.&lt;/li&gt;
&lt;li&gt;Create a new app password as instructed.&lt;/li&gt;
&lt;li&gt;For detailed guidance, refer to the &lt;a href="https://support.microsoft.com/en-us/account-billing/how-to-get-and-use-app-passwords-5896ed9b-4263-e681-128a-a6f2979a7944" rel="noopener noreferrer"&gt;Microsoft support article&lt;/a&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Update docker-compose.yaml file
&lt;/h4&gt;

&lt;p&gt;AIRFLOW_&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_HOST='smtp-mail.outlook.com' \&lt;br&gt;
AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_MAIL_FROM='&lt;a href="mailto:your_email@outlook.com"&gt;your_email@outlook.com&lt;/a&gt;' \&lt;br&gt;
AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_USER='&lt;a href="mailto:your_email@outlook.com"&gt;your_email@outlook.com&lt;/a&gt;' \&lt;br&gt;
AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_PASSWORD='your_app_password' \&lt;br&gt;
AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_PORT='587' \&lt;br&gt;
AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_STARTTLS='True' \&lt;br&gt;
AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_SSL='False' \&lt;br&gt;
AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_TIMEOUT='5' \&lt;br&gt;
AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;_SMTP_RETRY_LIMIT='3' &lt;/p&gt;

&lt;h3&gt;
  
  
  Setting up receiver emails using using &lt;a href="https://www.mailslurp.com/" rel="noopener noreferrer"&gt;MailSlurp&lt;/a&gt;.
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Sign up for a free account.&lt;/li&gt;
&lt;li&gt;From inboxes section create several temporary email addresses.&lt;/li&gt;
&lt;li&gt;In the emails section find and copy the email addresses into email_list.txt file in your project directory before running.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting up Slack
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Go to your apps section of &lt;a href="https://api.slack.com/apps" rel="noopener noreferrer"&gt;Slack api website&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;Create an app from scratch.&lt;/li&gt;
&lt;li&gt;Assign the app to your workspace.&lt;/li&gt;
&lt;li&gt;After creating your app, go to incoming webhooks section from dropdown menu and toggle Activate Incoming Webhooks.&lt;/li&gt;
&lt;li&gt;Go to bottom of the page and add new webhook to workspace, assign it to data-engineering channel.&lt;/li&gt;
&lt;li&gt;In the Webhook URL section, you can copy your information, that will be used in airflow connection setup.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting up Discord
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Go to Discord Web or Desktop &lt;a href="https://discord.com/" rel="noopener noreferrer"&gt;Application&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;Create a server at the bottom of the lef pane of servers, for create my own, for me and for my friends, name it whatever you want.&lt;/li&gt;
&lt;li&gt;Forward to Server Settings, Go to Apps/Integrations/Webhooks, Create Webhook. That will create a default webhook. Copy Webhook URL, and store it for airflow connection setup.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Initialize the containers using docker_run.sh shell script:
&lt;/h2&gt;

&lt;p&gt;&lt;code&gt;$ ./docker_run.sh&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;This will create required uid variable, network named cassandra-kafka and initialize the postgres database that holds the airflow metadata, download the container images and finally start the project containers which will be orchestrated by airflow.&lt;/strong&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  WebUI Links:
&lt;/h4&gt;

&lt;p&gt;&lt;code&gt;Airflow&lt;/code&gt;       : &lt;a href="http://localhost:8080/" rel="noopener noreferrer"&gt;http://localhost:8080/&lt;/a&gt; \&lt;br&gt;
    Username: &lt;code&gt;admin&lt;/code&gt; Password: &lt;code&gt;admin&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;Kafka UI&lt;/code&gt;      : &lt;a href="http://localhost:8888/" rel="noopener noreferrer"&gt;http://localhost:8888/&lt;/a&gt; &lt;/p&gt;

&lt;p&gt;&lt;code&gt;Mongo Express&lt;/code&gt; : &lt;a href="http://localhost:8082" rel="noopener noreferrer"&gt;http://localhost:8082&lt;/a&gt; \&lt;br&gt;
    Username: &lt;code&gt;admin&lt;/code&gt; Password: &lt;code&gt;pass&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Access the cassandra shell using :&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;$ docker exec -it cassandra cqlsh -u cassandra -p cassandra&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Establish airflow connections for slack and discord from Admin/Connections panel.
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Use Webhook URL obtained from slack app and add a new record.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Connection Id *         : slack_webhook \&lt;br&gt;
Connection Type *       : Slack Incoming Webhook \&lt;br&gt;
Slack Webhook Endpoint  : hooks.slack.com/services \&lt;br&gt;
Webhook Token           : T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX (found in the webhook url, usually starts with "T")&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Use Copied Webhook URL from Discord and add a new record.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Connection Id *         : discord_webhook \&lt;br&gt;
Connection Type *       : Discord \&lt;br&gt;
Host                    : &lt;a href="https://discord.com/api/" rel="noopener noreferrer"&gt;https://discord.com/api/&lt;/a&gt; \&lt;br&gt;
Webhook Endpoint        : webhooks/{webhook.id}/{webhook.token} (found in the webhook url)&lt;/p&gt;

&lt;h3&gt;
  
  
  Trigger the main_dag several times.
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Once you triggered the dag you can track created kafka messages from &lt;a href="http://localhost:8888" rel="noopener noreferrer"&gt;kafkaui&lt;/a&gt;, data inserted into cassandra table using cassandra shell (cqlsh) and inserted documents into mongodb from &lt;a href="http://localhost:8082" rel="noopener noreferrer"&gt;mongo-express-ui&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Feel free to visit for extensive explanation of project in my &lt;a href="https://medium.com/@akarce/end-to-end-data-engineering-project-airflow-kafka-cassandra-mongodb-docker-emailoperator-07d48d27bee2" rel="noopener noreferrer"&gt;Medium post&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;

</description>
      <category>docker</category>
      <category>airflow</category>
      <category>kafka</category>
      <category>mongodb</category>
    </item>
    <item>
      <title>End-to-End Realtime Streaming Data Engineering Project</title>
      <dc:creator>Cey</dc:creator>
      <pubDate>Wed, 07 Aug 2024 13:58:38 +0000</pubDate>
      <link>https://dev.to/akarce/end-to-end-realtime-streaming-data-engineering-project-2340</link>
      <guid>https://dev.to/akarce/end-to-end-realtime-streaming-data-engineering-project-2340</guid>
      <description>&lt;p&gt;&lt;strong&gt;This repository demonstrates a data engineering pipeline using Spark Structured Streaming. It retrieves random names from an API, sends the data to Kafka topics via Airflow, and processes it with Spark Structured Streaming before storing it in Cassandra.&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  System Architecture
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3i4vt2ho90tlsz2bete4.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3i4vt2ho90tlsz2bete4.jpg" alt="Image description" width="800" height="352"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Components:
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Data Source:&lt;/strong&gt; Uses the randomuser.me API for generating user data. &lt;br&gt;
&lt;strong&gt;Apache Airflow:&lt;/strong&gt; Orchestrates the pipeline and schedules data ingestion. &lt;br&gt;
&lt;strong&gt;Apache Kafka &amp;amp; Zookeeper:&lt;/strong&gt; Stream data from PostgreSQL to Spark. &lt;br&gt;
&lt;strong&gt;Apache Spark:&lt;/strong&gt; Processes data in real time. &lt;br&gt;
&lt;strong&gt;Cassandra:&lt;/strong&gt; Stores the processed data. &lt;br&gt;
&lt;strong&gt;Scripts:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;kafka_stream.py:&lt;/strong&gt; Airflow DAG script that pushes API data to Kafka during 2 minutes every 1 seconds. &lt;br&gt;
&lt;strong&gt;spark_stream.py:&lt;/strong&gt; Consumes and processes data from Kafka using Spark Structured Streaming. &lt;/p&gt;

&lt;h2&gt;
  
  
  What You'll Learn:
&lt;/h2&gt;

&lt;p&gt;Setting up and orchestrating pipelines with Apache Airflow. &lt;br&gt;
Real-time data streaming with Apache Kafka. &lt;br&gt;
Synchronization with Apache Zookeeper. &lt;br&gt;
Data processing with Apache Spark. &lt;br&gt;
Storage solutions with Cassandra and PostgreSQL. &lt;br&gt;
Containerization of the entire setup using Docker. &lt;br&gt;
&lt;strong&gt;Technologies:&lt;/strong&gt; &lt;br&gt;
Apache Airflow, Python, Apache Kafka, Apache Zookeeper, Apache Spark, Cassandra, PostgreSQL, Docker &lt;/p&gt;

&lt;h2&gt;
  
  
  Getting Started
&lt;/h2&gt;

&lt;h3&gt;
  
  
  WebUI links
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;Airflow&lt;/code&gt;  : &lt;a href="http://localhost:8080/" rel="noopener noreferrer"&gt;http://localhost:8080/&lt;/a&gt; &lt;br&gt;
&lt;code&gt;Kafka UI&lt;/code&gt; : &lt;a href="http://localhost:8085/" rel="noopener noreferrer"&gt;http://localhost:8085/&lt;/a&gt; &lt;/p&gt;

&lt;h3&gt;
  
  
  Clone the repository:
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;$ git clone https://github.com/akarce/e2e-structured-streaming.git&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Navigate to the project directory:
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;$ cd e2e-structured-streaming&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Create an .env file in project folder and set an AIRFLOW_UID
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;$ echo -e "AIRFLOW_UID=$(id -u)" &amp;gt; .env&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;$ echo AIRFLOW_UID=50000 &amp;gt;&amp;gt; .env&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Run Docker Compose to perform database migrations and create the first user account
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;$ docker-compose up airflow-init&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsm8jmxjc9ecd0nk95s4g.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsm8jmxjc9ecd0nk95s4g.png" alt="Image description" width="800" height="289"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Run Docker Compose again to spin up the services:
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;$ docker compose up -d&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnrh5melqrg4spwnqjg4w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnrh5melqrg4spwnqjg4w.png" alt="Image description" width="800" height="179"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Copy the dependencies.zip and spark_stream.py files into spark-master container
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;$ docker cp dependencies.zip spark-master:/dependencies.zip&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;$ docker cp spark_stream.py spark-master:/spark_stream.py&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnbqm9zfobxreraqeci2a.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnbqm9zfobxreraqeci2a.png" alt="Image description" width="800" height="63"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Run the docker exec command to access cqlsh shell in cassandra container
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;$ docker exec -it cassandra cqlsh -u cassandra -p cassandra localhost 9042&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Run describe command to see there are no keyspaces named in cassandra instance
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;cqlsh&amp;gt; DESCRIBE KEYSPACES;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--5FlqYzjT--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/v1/img/cqlsh_no_keyspace.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--5FlqYzjT--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/v1/img/cqlsh_no_keyspace.png" alt="cqlsh no keyspace" width="" height=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Unpause the dag user_automation using Airflow UI
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Go to Airflow UI using :&lt;/strong&gt; &lt;a href="http://localhost:8080/" rel="noopener noreferrer"&gt;http://localhost:8080/&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Login using&lt;/strong&gt; Username: &lt;code&gt;admin&lt;/code&gt; Password: &lt;code&gt;admin&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--LplaHPo6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/v1/img/unpause_user_automation.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--LplaHPo6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/v1/img/unpause_user_automation.png" alt="unpause the user_automation" width="" height=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;You can track the topic creation and message queue using the open source tool named UI for Apache Kafka that is running as a container, WebUI link:&lt;/strong&gt;  &lt;a href="http://localhost:8085/" rel="noopener noreferrer"&gt;http://localhost:8085/&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnogr2tid4vmk2q6ekxxn.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnogr2tid4vmk2q6ekxxn.png" alt="Image description" width="800" height="458"&gt;&lt;/a&gt;&lt;br&gt;
&lt;strong&gt;Message schema looks like this&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fg310zmsby9gxzukq9b79.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fg310zmsby9gxzukq9b79.png" alt="Image description" width="800" height="269"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  In a new terminal run the docker exec command to run spark job to read the streaming from kafka topic:
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;$ docker exec -it spark-master spark-submit     --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1     --py-files /dependencies.zip     /spark_stream.py&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Now go back to the cqlsh shell terminal back and run the command to see data is inserted to cassandra table called created_users
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;cqlsh&amp;gt; SELECT * FROM spark_streaming.created_users;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fiuv1fx7uznwa4on5q22c.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fiuv1fx7uznwa4on5q22c.png" alt="Image description" width="800" height="335"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  and run count query several times to approve data is being inserted while running user_automation dag
&lt;/h4&gt;

&lt;p&gt;&lt;code&gt;cqlsh&amp;gt; SELECT count(*) FROM spark_streaming.created_users;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--JUaFhb5p--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/..." class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--JUaFhb5p--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/..." alt="Uploading image" width="" height=""&gt;&lt;/a&gt;&lt;/p&gt;

</description>
      <category>airflow</category>
      <category>docker</category>
      <category>kafka</category>
      <category>spark</category>
    </item>
  </channel>
</rss>
