<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Musa Atlıhan</title>
    <description>The latest articles on DEV Community by Musa Atlıhan (@musaatlihan).</description>
    <link>https://dev.to/musaatlihan</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F471319%2Faa90386f-dc89-48db-b222-12875f7983d4.jpeg</url>
      <title>DEV Community: Musa Atlıhan</title>
      <link>https://dev.to/musaatlihan</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/musaatlihan"/>
    <language>en</language>
    <item>
      <title>Simulate IoT sensor, use Kafka to process data in real-time, save to Elasticsearch</title>
      <dc:creator>Musa Atlıhan</dc:creator>
      <pubDate>Fri, 06 Nov 2020 19:36:53 +0000</pubDate>
      <link>https://dev.to/musaatlihan/simulate-iot-sensor-use-kafka-to-process-data-in-real-time-save-to-elasticsearch-13c8</link>
      <guid>https://dev.to/musaatlihan/simulate-iot-sensor-use-kafka-to-process-data-in-real-time-save-to-elasticsearch-13c8</guid>
      <description>&lt;p&gt;Previously I did a project that solves the predictive maintenance problem by using Apache Spark ML - with the article &lt;a href="https://dev.to/musaatlihan/predicting-machine-failures-with-distributed-computing-spark-aws-emr-and-dl-44b3"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Now I have done an additional project to collect that sensory data in real-time by using Apache Kafka.&lt;/p&gt;

&lt;p&gt;The code is in this Github repo &lt;a href="https://github.com/musa-atlihan/IoT-voltage" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;First I created the &lt;a href="https://github.com/musa-atlihan/IoT-voltage/blob/main/telemetry-feed/src/main/java/feed/TelemetryPoller.java" rel="noopener noreferrer"&gt;&lt;code&gt;TelemetryPoller&lt;/code&gt;&lt;/a&gt; class - I need to simulate and produce random voltage readings. &lt;a href="https://github.com/musa-atlihan/IoT-voltage/blob/50e281986def45c88aeca6a9c216503c0d777af5/telemetry-feed/src/main/java/feed/TelemetryPoller.java#L103" rel="noopener noreferrer"&gt;&lt;code&gt;TelemetryRunnable&lt;/code&gt;&lt;/a&gt; subclass handles that part.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;
&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="nf"&gt;produceSensorReading&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Random&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Random&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;leftLimit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;800L&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;rightLimit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1500L&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;number&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;leftLimit&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Math&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;random&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rightLimit&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;leftLimit&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

    &lt;span class="c1"&gt;// simulate reading time&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Thread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;sleep&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;InterruptedException&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"SensorReading generation interrupted!"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;number&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Randomly generates a voltage value. I added some time delay to simulate a sensor &lt;code&gt;Thread.sleep(random.nextInt(1000))&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Likewise, there is the &lt;a href="https://github.com/musa-atlihan/IoT-voltage/blob/50e281986def45c88aeca6a9c216503c0d777af5/telemetry-feed/src/main/java/feed/TelemetryPoller.java#L124" rel="noopener noreferrer"&gt;&lt;code&gt;produceMachineId&lt;/code&gt;&lt;/a&gt; method for producing random machine ids (there are 100 machines in the project).&lt;/p&gt;

&lt;p&gt;Then I created the &lt;a href="https://github.com/musa-atlihan/IoT-voltage/blob/50e281986def45c88aeca6a9c216503c0d777af5/telemetry-feed/src/main/java/feed/TelemetryPoller.java#L52" rel="noopener noreferrer"&gt;&lt;code&gt;ProducerRunnable&lt;/code&gt;&lt;/a&gt; for another thread - the threadsafe &lt;code&gt;LinkedBlockingQueue&lt;/code&gt; is shared across threads.&lt;/p&gt;

&lt;p&gt;Kafka Producer Configs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Kafka Producer properties
topic=voltage-reading
bootstrap.servers=127.0.0.1:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# safe producer (Kafka &amp;gt;= 0.11)
enable.idempotence=true
acks=all
retries=2147483647
# if max.in.flight.requests.per.connection=5 Kafka &amp;gt;= 1.1 else 1
max.in.flight.requests.per.connection=5
# high throughput
compression.type=snappy
linger.ms=20
# (32*1024)
batch.size=32768
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I created a topic with the name &lt;code&gt;voltage-reading&lt;/code&gt;. I am using my local machine so the bootstrap server is &lt;code&gt;127.0.0.1:9092&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;I don't want Kafka to commit the same data multiple times so the &lt;code&gt;enable.idempotence&lt;/code&gt; is &lt;code&gt;true&lt;/code&gt;. &lt;code&gt;acks&lt;/code&gt;, &lt;code&gt;retries&lt;/code&gt; and &lt;code&gt;max.in.flight.requests.per.connection&lt;/code&gt; are in default mode for the idempotent producer.&lt;/p&gt;

&lt;p&gt;I am adding a small delay &lt;code&gt;linger.ms=20&lt;/code&gt; and increase the &lt;code&gt;batch.size&lt;/code&gt; so I can get high throughput which is good for a better message compression. Snappy is a good option for JSON compression.&lt;/p&gt;

&lt;p&gt;Result is a stream of voltage values including the machine ids.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fn2k029lbgfaachyaal63.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fn2k029lbgfaachyaal63.gif" alt="Producer stream"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now I need a Kafka Consumer and Elasticsearch Client.&lt;/p&gt;

&lt;p&gt;I created them in the &lt;a href="https://github.com/musa-atlihan/IoT-voltage/blob/main/telemetry-stream/src/main/java/stream/ElasticsearchConsumer.java" rel="noopener noreferrer"&gt;&lt;code&gt;ElasticsearchConsumer&lt;/code&gt;&lt;/a&gt; class.&lt;/p&gt;

&lt;p&gt;First I want to explain the settings a little bit:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Kafka Consumer properties
topic=voltage-reading
bootstrap.servers=127.0.0.1:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=voltage-reading-elasticsearch
auto.offset.reset=latest
enable.auto.commit=false
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Again it is running on the localhost, thus the bootstrap server is &lt;code&gt;127.0.0.1:9092&lt;/code&gt;. Kafka Consumers need a group id, or it will assign a random one. I am getting the &lt;code&gt;latest&lt;/code&gt; messages and &lt;code&gt;enable.auto.commit&lt;/code&gt; is false, because I will commit the batch manually. By doing the manual committing, I will ensure the &lt;code&gt;at least once&lt;/code&gt; property, but I need to be careful about the document duplication in the Elasticsearch index. So, additionally I am generating a unique id by using Kafka records' topic name, partition number, and offset number.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;"_"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="sc"&gt;'_'&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I am giving this id to Elasticsearch client to prevent any duplication.&lt;/p&gt;

&lt;p&gt;Finally, the while loop:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;ConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMillis&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

    &lt;span class="nc"&gt;BulkRequest&lt;/span&gt; &lt;span class="n"&gt;bulkRequest&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BulkRequest&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// kafka generic id to prevent document duplication&lt;/span&gt;
        &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;"_"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;partition&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="sc"&gt;'_'&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;offset&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Consumer reads: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// insert data into elasticsearch&lt;/span&gt;
            &lt;span class="nc"&gt;IndexRequest&lt;/span&gt; &lt;span class="n"&gt;indexRequest&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IndexRequest&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"machine-telemetry"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;source&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="nc"&gt;XContentType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;JSON&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

            &lt;span class="n"&gt;bulkRequest&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;add&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;indexRequest&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;NullPointerException&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; &lt;span class="c1"&gt;// skip bad data&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;warn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Skipping bad data: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;count&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;BulkResponse&lt;/span&gt; &lt;span class="n"&gt;bulkItemResponses&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;bulk&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bulkRequest&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;RequestOptions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;DEFAULT&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commitSync&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I am using a &lt;code&gt;BulkRequest&lt;/code&gt; for a better performance, so the client waits until it goes over each record in the for loop. After the &lt;code&gt;bulk&lt;/code&gt; operation, it is time to manually commit the messages that are put into Elasticsearch - &lt;code&gt;consumer.commitSync()&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;And when &lt;code&gt;ElasticsearchConsumer&lt;/code&gt; class is  running, I see the streaming data is being consumed and is in the Elasticsearch.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fayokn6kfk9vss890rskq.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fayokn6kfk9vss890rskq.gif" alt="Consumer"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Summary
&lt;/h3&gt;

&lt;p&gt;In this project, I created a simulator to produce random voltage values as if they are being produced from an IoT sensor. Then I created a Kafka Producer to send that stream into Kafka (Brokers). Finally, I used Kafka Consumer to put that stream into Elasticsearch.&lt;/p&gt;

&lt;p&gt;I made the Producer safe by making it idempotent (meaning no duplication on the Brokers). In addition, since I am producing and consuming JSON string data, I applied an efficient compression algorithm called Snappy. Because when the batch is larger the compression is more efficient, I put some delay and increased the batch size from 16KB to 32KB.&lt;/p&gt;

&lt;p&gt;On the consumer side, I disabled the automatic committing to ensure &lt;code&gt;at least once&lt;/code&gt; behaviour and did the committing manually after putting the messages into Elasticseach.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>bigdata</category>
      <category>elasticsearch</category>
      <category>datascience</category>
    </item>
    <item>
      <title>How and When to Use SQL Window Functions</title>
      <dc:creator>Musa Atlıhan</dc:creator>
      <pubDate>Thu, 29 Oct 2020 08:45:16 +0000</pubDate>
      <link>https://dev.to/musaatlihan/how-and-when-to-use-sql-window-functions-3163</link>
      <guid>https://dev.to/musaatlihan/how-and-when-to-use-sql-window-functions-3163</guid>
      <description>&lt;p&gt;You may think SQL window functions are similar to aggregating but it actually defines a window of rows with a given length around the current row.&lt;/p&gt;

&lt;p&gt;You can do,&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;aggregate operations: SUM, COUNT, MAX, MIN&lt;/li&gt;
&lt;li&gt;ranking: RANK, DENSE_RANK, ROW_NUMBER, NTILE&lt;/li&gt;
&lt;li&gt;value: LAG, LEAD, FIRST_VALUE, LAST_VALUE&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Let's give a concrete example to understand what exactly a window function is doing.&lt;/p&gt;

&lt;p&gt;Consider you are a data scientist working on a classification task and you want to predict if an event will occur within the next three hours. In your data, the label is 1 if the event occurred at that exact time and 0 otherwise. Therefore you need a backfill operation to build the target. You can do it by using SQL window functions. Let's create the data frame first,&lt;/p&gt;

&lt;p&gt;&lt;em&gt;We don't need to create a database, we will use the &lt;code&gt;pandasql&lt;/code&gt; library for executing SQL queries over &lt;code&gt;pandas&lt;/code&gt; data frames (download via &lt;code&gt;pip install pandas&lt;/code&gt; and &lt;code&gt;pip install pandasql&lt;/code&gt;).&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# import libraries 
&lt;/span&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;pandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;

&lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 06:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 07:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 08:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 09:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 10:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 11:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 12:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 13:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 14:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt; &lt;span class="err"&gt; &lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2020-01-01 15:00:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;]&lt;/span&gt;

&lt;span class="c1"&gt;# create data frame
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"datetime"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"event"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;head&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here is the dataframe:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--H3rEyeqJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/anxcv1k15o4jyyfp1lex.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--H3rEyeqJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/anxcv1k15o4jyyfp1lex.png" alt="dataframe head"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Let's solve the problem:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pandasql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;sqldf&lt;/span&gt;

&lt;span class="n"&gt;pysqldf&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;q&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;sqldf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;q&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;globals&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

&lt;span class="n"&gt;solution&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pysqldf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
&lt;span class="s"&gt;"""
SELECT datetime, event, MAX(event) OVER
(
    ORDER BY datetime ASC
    ROWS BETWEEN CURRENT ROW AND 3 FOLLOWING
) as target
FROM df
"""&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;solution&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;head&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We ordered the rows using &lt;code&gt;datetime&lt;/code&gt; column in ascending order and filled the window rows with the MAX value. This is the data frame we get with the target column:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--j8ykrU9N--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/6hl38232yn50j1obk9ns.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--j8ykrU9N--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/6hl38232yn50j1obk9ns.png" alt="solution"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We labeled the previous 3 rows before 11am in the &lt;code&gt;target&lt;/code&gt; column.&lt;/p&gt;

</description>
      <category>sql</category>
      <category>datascience</category>
      <category>postgres</category>
      <category>database</category>
    </item>
    <item>
      <title>Predicting machine failures with distributed computing (Spark, AWS EMR, and DL)</title>
      <dc:creator>Musa Atlıhan</dc:creator>
      <pubDate>Mon, 26 Oct 2020 16:35:44 +0000</pubDate>
      <link>https://dev.to/musaatlihan/predicting-machine-failures-with-distributed-computing-spark-aws-emr-and-dl-44b3</link>
      <guid>https://dev.to/musaatlihan/predicting-machine-failures-with-distributed-computing-spark-aws-emr-and-dl-44b3</guid>
      <description>&lt;p&gt;This article is about a fictional project that the story goes like this: &lt;/p&gt;

&lt;p&gt;Image you are working at a tech company as a data scientist where you develop DS solutions for industries like e-commerce, IoT, manufacturing, etc.&lt;/p&gt;

&lt;p&gt;One day, your company partners with a large manufacturing company collecting and storing gigabytes of sensory data each day. It is obvious that a relational database solution is not possible for them. However, instead of maintaining a distributed system like HDFS, they do a cost effective solution and use Amazon's S3.&lt;/p&gt;

&lt;p&gt;In this scenario, let's say your job is to help them by building a predictive model that can determine if a machine would fail within the next 24 hours.&lt;/p&gt;

&lt;p&gt;You built a poof of concept and tested it by working on a sample and realized that the models perform better when more historical data is used for training. Hence you need to train the models on a dataset that wouldn't fit on a single machine.&lt;/p&gt;

&lt;p&gt;In this article, I will cover these topics to develop a solution for this exact problem:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Setup a cluster with Spark using AWS EMR&lt;/li&gt;
&lt;li&gt;Data wrangling with PySpark on the cluster&lt;/li&gt;
&lt;li&gt;Distributed model training with PySpark's ML library&lt;/li&gt;
&lt;li&gt;Submit a Spark job to train the models in the EMR cluster&lt;/li&gt;
&lt;li&gt;Load the predictions back to S3&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Github repo is &lt;a href="https://github.com/musa-atlihan/spark-pm"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Dataset
&lt;/h2&gt;

&lt;p&gt;While there is the &lt;a href="https://registry.opendata.aws/"&gt;Amazon's open data registry&lt;/a&gt; that could be used for experimenting. I chose a very specific open dataset published by &lt;a href="https://azure.microsoft.com/en-au/blog/author/fboylu/"&gt;Fidan Boylu Uz&lt;/a&gt; from Microsoft, because I believe this dataset suits well to this exact problem of predictive maintenance.&lt;/p&gt;

&lt;p&gt;If you would like to replicate the steps and practice it, please download the dataset first by using the &lt;code&gt;download_dataset.sh&lt;/code&gt; here in this &lt;a href="https://github.com/musa-atlihan/spark-pm/tree/main/src/data"&gt;directory&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Download via:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cd src/data

sh download_dataset.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This dataset is composed of five "small" &lt;code&gt;csv&lt;/code&gt; files and can be fit to a single machine. However, it will serve the basic structure for me to build the solution and I will be able to experiment on a small cluster (e.g. a cluster up to 4 nodes: 1 driver and 3 workers). I may consider this dataset as sample for development and test. Because Spark is awesome, if I build a working prototype on a single machine (local mode), I can easily scale it without changing my implementation.&lt;/p&gt;

&lt;p&gt;Let's assume the manufacturing company saves the data in parquet format because they know it is faster to aggregate time series data with a columnar format. In a real-life project, I would expect see multiple records like all daily records being saved into one file at the end of the day. But I will not divide the records into multiple files in order to avoid the &lt;code&gt;PUT&lt;/code&gt; and &lt;code&gt;GET&lt;/code&gt; pricing of Amazon. I transform the &lt;code&gt;csv&lt;/code&gt; files into parquet by running the &lt;code&gt;transform.py&lt;/code&gt; in the same &lt;a href="https://github.com/musa-atlihan/spark-pm/tree/main/src/data"&gt;directory&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cd src/data

python transform.py
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And now I see a new directory called &lt;code&gt;records&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;If you are willing to experiment further by running the code in a cluster, you will need to create a new S3 bucket and upload the records as I do it for myself:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws s3 sync records s3://your-bucket-name
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(&lt;em&gt;Download &lt;code&gt;sudo apt install awscli&lt;/code&gt; and &lt;a href="https://docs.aws.amazon.com/comprehend/latest/dg/setup-awscli.html"&gt;configure&lt;/a&gt;.&lt;/em&gt;)&lt;/p&gt;

&lt;p&gt;Please keep in mind, you will be &lt;a href="https://aws.amazon.com/s3/pricing/"&gt;charged for file transfers&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;After the upload, I see five different parquet files in the bucket.&lt;/p&gt;

&lt;p&gt;Example records stored in S3:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pdm_telemetry:
Row(datetime='2015-01-01 06:00:00', machineID=1, volt=176.217853015625, rotate=418.504078221616, pressure=113.07793546208299, vibration=45.0876857639276)

pdm_errors:
Row(datetime='2015-01-03 07:00:00', machineID=1, errorID='error1')

pdm_failures:
Row(datetime='2015-01-05 06:00:00', machineID=1, failure='comp4')

pdm_machines:
Row(machineID=1, model='model3', age=18)

pdm_maint:
Row(datetime='2014-06-01 06:00:00', machineID=1, comp='comp2')
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I created another doc and added the details about each dataset &lt;a href="https://github.com/musa-atlihan/spark-pm/blob/main/dataset.md"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating Spark Cluster with AWS EMR
&lt;/h2&gt;

&lt;p&gt;While it is possible to create a Spark cluster by creating multiple EC2 instances manually, the easier way is using the Amazon Elastic MapReduce (EMR) service. Here, using the EMR console is one option, however I prefer having a script file that I can use multiple times. I prepared a &lt;a href="https://github.com/musa-atlihan/spark-pm/blob/main/src/emr.sh"&gt;script file&lt;/a&gt; named &lt;code&gt;emr.sh&lt;/code&gt; that I am using the &lt;code&gt;awscli&lt;/code&gt; in it.&lt;/p&gt;

&lt;p&gt;I can start a cluster simply by typing &lt;code&gt;sh emr.sh&lt;/code&gt; in the terminal and the &lt;code&gt;aws emr create-cluster&lt;/code&gt; command will be executed:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;#!/bin/bash

# Specify the cluster name
CLUSTER_NAME=any-arbitrary-name

# Insert the IAM KEYNAME
# You get this key from AWS console's IAM section
KEY_NAME=your-key-name


# If you have not previously created the default Amazon EMR service role and EC2 instance profile,
# type `aws emr create-default-roles` to create them before typing the create-cluster subcommand.
aws emr create-cluster \
--name $CLUSTER_NAME \
--use-default-roles \
--release-label emr-5.31.0 \
--instance-count 4 \
--applications Name=Spark Name=Hadoop Name=Zeppelin Name=Livy \
--ec2-attributes KeyName=$KEY_NAME \
--instance-type m5.xlarge \
--bootstrap-actions Path="s3://your-actions-bucket/bootstrap.sh", \
Args=["instance.isMaster=true","echo running on driver node"]\
--auto-terminate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I can configure the number of clusters and the instance type from &lt;code&gt;--instance-count&lt;/code&gt; and &lt;code&gt;--instance-type&lt;/code&gt; arguments. &lt;code&gt;--auto-terminate&lt;/code&gt; is an important one that terminates the cluster after the task finishes. Since I will connect to the cluster via &lt;code&gt;ssh&lt;/code&gt; and submit the spark job manually, I will remove it &lt;strong&gt;- But please don't forget to &lt;a href="https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingEMR_TerminateJobFlow.html"&gt;terminate&lt;/a&gt; it afterwards, otherwise amazon will keep charging you!&lt;/strong&gt; &lt;/p&gt;

&lt;p&gt;&lt;code&gt;--bootstrap-actions&lt;/code&gt; is useful when I want to run some setup commands while the cluster is being created. For example, I will upload a download script (&lt;code&gt;s3://your-actions-bucket/bootstrap.sh&lt;/code&gt;) to another S3 bucket to use it for downloading my pyspark script (&lt;code&gt;mlp.py&lt;/code&gt;) in the driver node. I will create these scripts later in this article and will use the &lt;code&gt;emr.sh&lt;/code&gt; script after that.&lt;/p&gt;

&lt;h2&gt;
  
  
  Data Wrangling with PySpark
&lt;/h2&gt;

&lt;p&gt;I am using &lt;a href="https://github.com/musa-atlihan/spark-pm/blob/main/src/mlp.py"&gt;a single python script&lt;/a&gt; to implement the data wrangling and the model training in the same place so that I can easily download this script to the EC2 driver node and submit a Spark job with it.&lt;/p&gt;

&lt;p&gt;The first thing I need to do is creating a Spark session. A Spark session gives me the flexibility to use multiple Spark contexts in a unified way.&lt;/p&gt;

&lt;p&gt;I create the session via the session builder:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;

&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'PM Training'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here &lt;code&gt;.getOrCreate()&lt;/code&gt; method activates the existing session or creates a new one.&lt;/p&gt;

&lt;p&gt;Now that I have the session, I will load the datasets from my bucket.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;telemetry_path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;'s3a://your-bucket/PdM_telemetry.parquet'&lt;/span&gt;
&lt;span class="n"&gt;failures_path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;'s3a://your-bucket/PdM_failures.parquet'&lt;/span&gt;

&lt;span class="n"&gt;failures&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;parquet&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;failures_path&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;telemetry&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;parquet&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;telemetry_path&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;I see sometimes the colleagues may not notice the difference and use &lt;code&gt;s3://...&lt;/code&gt; instead of &lt;code&gt;s3a://...&lt;/code&gt;. Please use the &lt;code&gt;s3a&lt;/code&gt; since it is a much faster way to load the datasets from the buckets!&lt;/strong&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Another fact is it is very important to have the s3 bucket in the same area with the cluster nodes (EC2 instances) or it will take some quite time to load the data from the distant S3 services.&lt;/strong&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Having the two datasets loaded, now I merge them together: &lt;code&gt;telemetry&lt;/code&gt; and &lt;code&gt;failures&lt;/code&gt; into &lt;code&gt;merged&lt;/code&gt;. The telemetry contains the timestamp, machine id (there are 100 machines), and four sensor readings (volt, rotate, pressure, vibration) and the failures is the logging from four different failure events.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# create tables
&lt;/span&gt;&lt;span class="n"&gt;telemetry&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;createOrReplaceTempView&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'telemetry'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;failures&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;createOrReplaceTempView&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'failures'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# join telemetry and failures tables
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"""
    SELECT 
      telemetry.datetime, telemetry.machineID, telemetry.volt,
      telemetry.rotate, telemetry.pressure, telemetry.vibration, failures.failure
    FROM telemetry
    LEFT JOIN failures
      ON telemetry.machineID = failures.machineID
      AND telemetry.datetime = failures.datetime
    """&lt;/span&gt;
&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;createOrReplaceTempView&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'merged'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Although I do have other datasets with additional information (like the machine types, maintenance time, errors, etc...), I won't use those features, since the aim of this article is to use Spark for distributed computing.&lt;/p&gt;

&lt;p&gt;Now after the join, I see &lt;code&gt;null&lt;/code&gt; valued failures when there is no failure at that particular time in the merged dataset. I will label the failure types and assign zero for the no-failure events.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# label encode
&lt;/span&gt;&lt;span class="n"&gt;failure_dict&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="s"&gt;'None'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'comp1'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="s"&gt;'comp2'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'comp3'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'comp4'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;udf&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;register&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'label_failure'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;failure_dict&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;)],&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;IntegerType&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"""
    SELECT *, label_failure(failure) as failure_lbl
    FROM merged
    """&lt;/span&gt;
&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;createOrReplaceTempView&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'merged'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I now have a new column &lt;code&gt;failure_lbl&lt;/code&gt; with the integer labels indicating the different failure types. Since my aim is to accomplish the classification task: predicting if there will be a failure within the next 24 hours. I also need to do backfilling on labels with non-zero values:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# backfill any failure for the previous 24 hours
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"""
    SELECT *, MAX(failure_lbl)
    OVER 
      (
        PARTITION BY machineID
        ORDER BY datetime ASC
        ROWS BETWEEN CURRENT ROW AND 24 FOLLOWING
      ) as label
    FROM merged
    """&lt;/span&gt;
&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;createOrReplaceTempView&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'merged'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After the backfilling the table looks like this (I didn't include all the fields for simplicity):&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--cq3QfAuF--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/6chax378rhpx5qi42baw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--cq3QfAuF--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/6chax378rhpx5qi42baw.png" alt="the dataset after labeling"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;On the table above, the &lt;code&gt;comp4&lt;/code&gt; failure is labeled for the previous 24 hours. Now it is time to get the table as a data frame and prepare the ml pipeline.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# get the data frame
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"""
    SELECT datetime, machineID, volt, rotate, pressure, vibration, label
    FROM merged
    """&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Dealing with Imbalanced Data
&lt;/h2&gt;

&lt;p&gt;The non-failure events (label=0) are the majority in this task and I can't train a model with an imbalanced dataset. Here I will do undersampling to obtain a balanced data frame:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# label 0 is majority, apply undersampling
&lt;/span&gt;&lt;span class="n"&gt;major&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;label&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;minor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;label&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;isin&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;]))&lt;/span&gt;

&lt;span class="n"&gt;ratio&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;major&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mf"&gt;4.&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;minor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;sampled_major&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;major&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sample&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;ratio&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;balanced&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sampled_major&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;unionAll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;minor&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Model Training with PySpark ML
&lt;/h2&gt;

&lt;p&gt;First, I apply &lt;code&gt;persist&lt;/code&gt; for the balanced dataset. When the persisting (caching) applied, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset which allows future actions to be much faster.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;balanced&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;persist&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, I split the dataset before the pipeline steps:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;train&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;test&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;balanced&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;randomSplit&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="mf"&gt;0.8&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mf"&gt;0.2&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;seed&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;42&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I do have two kinds of features here: the numerical features (sensor readings) and the categorical one (&lt;code&gt;machineID&lt;/code&gt;). I start by normalizing the numerical features, then get the one-hot-encoded feature from &lt;code&gt;machineID&lt;/code&gt; and combine them all into a single feature vector using the &lt;code&gt;VectorAssembler&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# define the steps
&lt;/span&gt;&lt;span class="n"&gt;num_assembler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;VectorAssembler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;inputCols&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'volt'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'rotate'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'pressure'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'vibration'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;outputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'numericFeatures'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;scaler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Normalizer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'numericFeatures'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;outputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'numericFeaturesScaled'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;encoder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;OneHotEncoder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'machineID'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;outputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'MachineIDOneHot'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;assembler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;VectorAssembler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;inputCols&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;'MachineIDOneHot'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'numericFeaturesScaled'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;outputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'features'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now I have my features prepared, the next step is to choose the model. I chose the multi-layer perceptron (mlp) since a deep learning model can keep increasing its performance as the dataset becomes larger. I will define the mlp structure and then will combine these &lt;a href="https://spark.apache.org/docs/1.6.0/ml-guide.html#pipeline-components"&gt;transformers&lt;/a&gt; (stages) in a single pipeline.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# mlp
&lt;/span&gt;&lt;span class="n"&gt;layers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;104&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;256&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="n"&gt;mlp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;MultilayerPerceptronClassifier&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;maxIter&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;200&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;layers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;layers&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;seed&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;42&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# create the pipeline
&lt;/span&gt;&lt;span class="n"&gt;pipeline&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stages&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;num_assembler&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;scaler&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;encoder&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;assembler&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mlp&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once I created my pipeline, I can use it to train the model and evaluate the test set. Spark will do the training on the worker nodes and predict the test set by including the probabilities and the predictions into the test data frame.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# train and get the model
&lt;/span&gt;&lt;span class="n"&gt;model&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;fit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;train&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# test set inference
&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;transform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;test&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# eval
&lt;/span&gt;&lt;span class="n"&gt;evaluator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;MulticlassClassificationEvaluator&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;f1&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;evaluator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;evaluate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;f'Test score: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;f1&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Saving the Predictions to S3
&lt;/h2&gt;

&lt;p&gt;I created my model, predicted the test set and got back the predictions in the &lt;code&gt;result&lt;/code&gt; data frame. Now I want to save the results so me or my colleagues can access it in the future. I want to save it in the best possible way so reading the results on a particular date becomes mush easier. I accomplish it by partitioning the results by the year, month, and day when saving. I will first create the year, month, and day columns from the timestamp and save the result by using their partitions.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# save the results
# create year, month, and day columns for the partitioning
&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'year'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;year&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'datetime'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'month'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;month&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'datetime'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'day'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dayofmonth&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'datetime'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;span class="c1"&gt;# save the results
&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;'datetime'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'year'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'month'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'day'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="s"&gt;'machineID'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'prediction'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'overwrite'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'header'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitionBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'year'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'month'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'day'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;\
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;'s3a://your-another-bucket/data/partitions/data.csv'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Sor far, I loaded the datasets, joined them together, did some data wrangling and applied undersampling, created my model and predicted the results! As the final step, I will stop the Spark, because this is batch processing rather than a streaming job.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stop&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Prepare the Bootstrap and Run
&lt;/h2&gt;

&lt;p&gt;I want my emr script (&lt;code&gt;emr.sh&lt;/code&gt;) automatically do the cluster creation and download my pyspark script (&lt;code&gt;mlp.py&lt;/code&gt;) in the driver node. So I upload the &lt;code&gt;mlp.py&lt;/code&gt; file to another S3 bucket along with my &lt;a href="https://github.com/musa-atlihan/spark-pm/blob/main/src/bootstrap.sh"&gt;bootstrap script&lt;/a&gt; (&lt;code&gt;bootstrap.sh&lt;/code&gt;) which will download the &lt;code&gt;mlp.py&lt;/code&gt; file in the cluster creation step. My bootstrap file contains a simple download command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;#!/bin/bash

aws s3 cp s3://your-actions-bucket/mlp.py ~/
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Having the bootstrap file ready, I can now modify and use the &lt;code&gt;emr.sh&lt;/code&gt; file to start the cluster: &lt;code&gt;sh emr.sh&lt;/code&gt;. Then I can connect to the driver node via &lt;code&gt;ssh&lt;/code&gt; and check for the &lt;code&gt;mlp.py&lt;/code&gt; file in the home directory &lt;code&gt;ls -lah ~/&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;I see that all the clusters are running successfully, and my &lt;code&gt;mlp.py&lt;/code&gt; file is there in the home directory. I can now submit the spark job and run my &lt;code&gt;mlp.py&lt;/code&gt; script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/usr/bin/spark-submit --master yarn ./mlp.py
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I monitor the steps and after the job finishes, I see my result data frame is now saved to the S3 bucket successfully!&lt;/p&gt;

&lt;h2&gt;
  
  
  Summary
&lt;/h2&gt;

&lt;p&gt;In this work, I assumed that the model performance becomes better with a larger training set and the training set doesn't fit into a single machine for data processing and model training. Although there is a great potential for feature engineering by using the rest of the three datasets, I didn't want to go into a detailed feature engineering since the main goal was to discover the Spark functionalities and accomplish a distributed computing. &lt;/p&gt;

&lt;p&gt;This project was a fictional one, however very close to a real world scenario. The difference is the dataset I used can fit into a single machine, so I treated this dataset as a sample from a large population. I started implementing the pipelines on local mode then sent it to EMR cluster and tested. It is now possible to run the same computation on big data by just changing the S3 urls. When transitioning from sample to population, the most common issue is imbalanced RDDs. An imbalanced RDD means one of the worker nodes gets a very large portion of the dataset while others having small partitions. This causes a slow down in the process and can be tracked by monitoring the workers via the Spark web UI. There are multiple ways to overcome this issue (e.g. applying &lt;code&gt;repartition&lt;/code&gt;) which can be a good reason for another article.&lt;/p&gt;

</description>
      <category>spark</category>
      <category>emr</category>
      <category>bigdata</category>
      <category>deeplearning</category>
    </item>
  </channel>
</rss>
