<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Paulius</title>
    <description>The latest articles on DEV Community by Paulius (@pdambrauskas).</description>
    <link>https://dev.to/pdambrauskas</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F7303%2F2736175e-4a5e-4dd9-9e4c-ed9aaa021e83.jpg</url>
      <title>DEV Community: Paulius</title>
      <link>https://dev.to/pdambrauskas</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/pdambrauskas"/>
    <language>en</language>
    <item>
      <title>Lightweight HTTP API for Big Data on S3</title>
      <dc:creator>Paulius</dc:creator>
      <pubDate>Wed, 15 Mar 2023 15:50:29 +0000</pubDate>
      <link>https://dev.to/exacaster/lightweight-http-api-for-big-data-on-s3-3fnb</link>
      <guid>https://dev.to/exacaster/lightweight-http-api-for-big-data-on-s3-3fnb</guid>
      <description>&lt;p&gt;We are happy to announce our third opensource project - &lt;a href="https://github.com/exacaster/delta-fetch"&gt;Delta Fetch&lt;/a&gt;.&lt;br&gt;
Delta Fetch is a configurable HTTP API service for accessing &lt;a href="https://delta.io/"&gt;Delta Lake&lt;/a&gt; tables. Service is highly configurable, with possibility to filter your Delta tables by selected columns.&lt;/p&gt;
&lt;h2&gt;
  
  
  How it works?
&lt;/h2&gt;

&lt;p&gt;Delta Fetch heavily relies on Delta table metadata, which contains statistics about each Parquet file. The same metadata that is used for &lt;a href="https://docs.delta.io/latest/optimizations-oss.html#data-skipping"&gt;data skipping&lt;/a&gt; is used to read only relevant files, in particular - minimum and maximum value of each column in each file. The Delta table metadata is cached for better performance and can be refreshed by enabling auto cache update or making API requests with the &lt;code&gt;...?exact=true&lt;/code&gt; query parameter.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Request handling flow:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The user makes an API request to one of the configured API resources.&lt;/li&gt;
&lt;li&gt;Delta Fetch reads Delta table metadata from file storage and stores it in memory.&lt;/li&gt;
&lt;li&gt;Delta Fetch finds the relevant file paths in the stored metadata and starts reading them.&lt;/li&gt;
&lt;li&gt;Delta Fetch uses the Hadoop Parquet Reader implementation, which supports filter push down to avoid reading the entire file.&lt;/li&gt;
&lt;li&gt;Delta Fetch continues reading Parquet files one by one until the requested or configured limit is reached.&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Configuration
&lt;/h2&gt;

&lt;p&gt;Resources can be configured in the following way:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;app&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;resources&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;path&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/api/data/{table}/{identifier}&lt;/span&gt;
      &lt;span class="na"&gt;schema-path&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/api/schemas/{table}/{identifier}&lt;/span&gt;
      &lt;span class="na"&gt;delta-path&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;s3a://bucket/delta/{table}/&lt;/span&gt;
      &lt;span class="na"&gt;response-type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;SINGLE&lt;/span&gt;
      &lt;span class="na"&gt;filter-variables&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;column&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;id&lt;/span&gt;
          &lt;span class="na"&gt;path-variable&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;identifier&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;path&lt;/code&gt; property defines API path which will be used to query your Delta tables. Path variables can be defined by using curly braces as shown in the example.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;schema-path&lt;/code&gt; (optional) property can be used to define API path for Delta table schema.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;delta-path&lt;/code&gt; property defines S3 path of your Delta table. Path variables on this path will be filled in by variables provided in API path.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;response-type&lt;/code&gt; (optional, default: &lt;code&gt;SINGLE&lt;/code&gt;) property defines weather to search for multiple resources, or a single one. Use &lt;code&gt;LIST&lt;/code&gt; type for multiple resources.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;max-results&lt;/code&gt; (optional, default: &lt;code&gt;100&lt;/code&gt;) maximum number of rows that can be returned in case of &lt;code&gt;LIST&lt;/code&gt; &lt;code&gt;response-type&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;filter-variables&lt;/code&gt; (optional) additional filters applied to Delta table.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You can also configure one of two security mechanisms - Basic Auth or OAuth2, and some caching parameters for better performance. Refer to &lt;a href="https://github.com/exacaster/delta-fetch"&gt;Delta Fetch&lt;/a&gt; Github Repo for more information.&lt;/p&gt;

&lt;h2&gt;
  
  
  Recommendations
&lt;/h2&gt;

&lt;p&gt;In order to be able to quickly access the data in Parquet files you need to configure block size to a smaller value that you would normally do. We've got acceptable results by setting &lt;code&gt;parquet.block.size&lt;/code&gt; to &lt;code&gt;1048576&lt;/code&gt; (1mb) value.&lt;/p&gt;

&lt;p&gt;Also we highly recommend &lt;strong&gt;not&lt;/strong&gt; to use &lt;code&gt;OPTIMIZE ... ZORDER ...&lt;/code&gt; on your tables, which are exposed through Delta Fetch, since this command usually stores data split by 1GB chunks. We suggest to rely on simple data ordering by the columns that you are planning to use as "keys" in Delta Fetch API.&lt;/p&gt;

&lt;p&gt;More recommendations and considerations can be found on our &lt;a href="https://github.com/exacaster/delta-fetch/doces/recommendations.md"&gt;recommendations page&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;With those recommendations applied we managed to get ~1s response time, when requesting for a single row by a single column value:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;time &lt;/span&gt;curl http://localhost:8080/api/data/disable_optimize_ordered/872480210503_234678
&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"version"&lt;/span&gt;:5,&lt;span class="s2"&gt;"data"&lt;/span&gt;:&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"user_id"&lt;/span&gt;:&lt;span class="s2"&gt;"872480210503_234678"&lt;/span&gt;,&lt;span class="s2"&gt;"sub_type"&lt;/span&gt;:&lt;span class="s2"&gt;"PREPAID"&lt;/span&gt;,&lt;span class="s2"&gt;"activation_date"&lt;/span&gt;:&lt;span class="s2"&gt;"2018-09-01"&lt;/span&gt;,&lt;span class="s2"&gt;"status"&lt;/span&gt;:&lt;span class="s2"&gt;"ACTIVE"&lt;/span&gt;,&lt;span class="s2"&gt;"deactivation_date"&lt;/span&gt;:&lt;span class="s2"&gt;"9999-01-01"&lt;/span&gt;&lt;span class="o"&gt;}}&lt;/span&gt;
curl   0.00s user 0.01s system 1% cpu 0.982 total
&lt;span class="nt"&gt;---&lt;/span&gt;
&lt;span class="nb"&gt;time &lt;/span&gt;curl http://localhost:8080/api/data/disable_optimize_ordered/579520210231_237911
&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"version"&lt;/span&gt;:5,&lt;span class="s2"&gt;"data"&lt;/span&gt;:&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"user_id"&lt;/span&gt;:&lt;span class="s2"&gt;"579520210231_237911"&lt;/span&gt;,&lt;span class="s2"&gt;"sub_type"&lt;/span&gt;:&lt;span class="s2"&gt;"PREPAID"&lt;/span&gt;,&lt;span class="s2"&gt;"activation_date"&lt;/span&gt;:&lt;span class="s2"&gt;"2018-06-24"&lt;/span&gt;,&lt;span class="s2"&gt;"status"&lt;/span&gt;:&lt;span class="s2"&gt;"ACTIVE"&lt;/span&gt;,&lt;span class="s2"&gt;"deactivation_date"&lt;/span&gt;:&lt;span class="s2"&gt;"9999-01-01"&lt;/span&gt;&lt;span class="o"&gt;}}&lt;/span&gt;
curl   0.00s user 0.01s system 0% cpu 1.250 total
&lt;span class="nt"&gt;---&lt;/span&gt;
➜  ~ &lt;span class="nb"&gt;time &lt;/span&gt;curl http://localhost:8080/api/data/disable_optimize_ordered/875540210000_245810
&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"version"&lt;/span&gt;:2,&lt;span class="s2"&gt;"data"&lt;/span&gt;:&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"user_id"&lt;/span&gt;:&lt;span class="s2"&gt;"875540210000_245810"&lt;/span&gt;,&lt;span class="s2"&gt;"sub_type"&lt;/span&gt;:&lt;span class="s2"&gt;"PREPAID"&lt;/span&gt;,&lt;span class="s2"&gt;"activation_date"&lt;/span&gt;:&lt;span class="s2"&gt;"2018-09-01"&lt;/span&gt;,&lt;span class="s2"&gt;"status"&lt;/span&gt;:&lt;span class="s2"&gt;"ACTIVE"&lt;/span&gt;,&lt;span class="s2"&gt;"deactivation_date"&lt;/span&gt;:&lt;span class="s2"&gt;"9999-01-01"&lt;/span&gt;&lt;span class="o"&gt;}}&lt;/span&gt;
curl   0.00s user 0.01s system 1% cpu 0.870 total
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We consider this API service experimental and hope to get some feedback and contributions from Open Source (and also dev.to :)) community. Let us know what do you think about our new project.&lt;/p&gt;

</description>
      <category>deltalake</category>
      <category>bigdata</category>
      <category>opensource</category>
      <category>s3</category>
    </item>
    <item>
      <title>Testing PySpark &amp; Pandas in style</title>
      <dc:creator>Paulius</dc:creator>
      <pubDate>Thu, 10 Feb 2022 07:49:05 +0000</pubDate>
      <link>https://dev.to/exacaster/testing-pyspark-pandas-in-style-31cg</link>
      <guid>https://dev.to/exacaster/testing-pyspark-pandas-in-style-31cg</guid>
      <description>&lt;p&gt;Today we'd like to share a small utility package for testing Dataframes on PySpark and Pandas.&lt;/p&gt;

&lt;p&gt;If you are a fan of test-driven development and had a chance to work on PySpark (or Pandas) projects, you've probably had written tests similar to this one:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark_test&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;assert_pyspark_df_equal&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;your_module&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;calculate_result&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;test_event_aggregation&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"user_id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"even_type"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"item_id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"event_time"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"country"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"dt"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;expected_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;createDataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;[&lt;/span&gt;
            &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;123456&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'page_view'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2017&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;12&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;31&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;23&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="s"&gt;"uk"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2017-12-31"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;123456&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;'item_view'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;68471513&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2017&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;12&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;31&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;23&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;55&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="s"&gt;"uk"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2017-12-31"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;],&lt;/span&gt; 
        &lt;span class="n"&gt;schema&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;result_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;calculate_result&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;assert_pyspark_df_equal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;expected_df&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;result_df&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It works OK for small applications, but when your project gets bigger, data gets more complicated and the amount of tests starts to grow, you might want a less tedious way to define test data.&lt;/p&gt;

&lt;p&gt;Exacaster alumni &lt;a href="https://www.linkedin.com/in/vaidasarmonas"&gt;Vaidas Armonas&lt;/a&gt; came up with an idea to represent Spark DataFrames as markdown tables. This idea materialized to a testing package &lt;a href="https://pypi.org/project/markdown-frames/"&gt;markdown-frames&lt;/a&gt;. With this package the test, which was shown before, can be replaced with this one:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark_test&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;assert_pyspark_df_equal&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;markdown_frames.spark_dataframe&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;spark_df&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;your_module&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;calculate_result&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;test_event_aggregation&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;input_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;""" 
        |  user_id   |  even_type  | item_id  |    event_time       | country  |     dt      |
        |   bigint   |   string    |  bigint  |    timestamp        |  string  |   string    |
        | ---------- | ----------- | -------- | ------------------- | -------- | ----------- |
        |   123456   |  page_view  |   None   | 2017-12-31 23:50:50 |   uk     | 2017-12-31  |
        |   123456   |  item_view  | 68471513 | 2017-12-31 23:50:55 |   uk     | 2017-12-31  |
    """&lt;/span&gt;
    &lt;span class="n"&gt;expected_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark_df&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;result_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;calculate_result&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;assert_pyspark_df_equal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;expected_df&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;result_df&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It makes tests more readable and self-explanatory.&lt;/p&gt;

&lt;p&gt;Everything looks almost the same, when you need to build a Dataframe for Pandas, you just need to use different function:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;from markdown_frames.pandas_dataframe import pandas_df&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Share in the comments, if you know any other convienient tips &amp;amp; tricks when writing PySpark (and Pandas) Unit tests.&lt;/p&gt;

</description>
      <category>spark</category>
      <category>pandas</category>
      <category>testing</category>
      <category>opensource</category>
    </item>
    <item>
      <title>Monitoring Apartment temperature &amp; humidity with Raspberry Pi, Prometheus &amp; Grafana</title>
      <dc:creator>Paulius</dc:creator>
      <pubDate>Sat, 29 Feb 2020 19:29:05 +0000</pubDate>
      <link>https://dev.to/pdambrauskas/monitoring-apartment-temperature-humidity-with-raspberry-pi-prometheus-grafana-1i48</link>
      <guid>https://dev.to/pdambrauskas/monitoring-apartment-temperature-humidity-with-raspberry-pi-prometheus-grafana-1i48</guid>
      <description>&lt;p&gt;For quite some time, I had a spare Raspberry Pi lying around in my place. And one weekend I came up with idea to make my apartment "smarter". What I mean by saying "smarter" is tracking some metrics of my surroundings.&lt;/p&gt;

&lt;p&gt;I have some experience in working with &lt;a href="https://prometheus.io/"&gt;Prometheus&lt;/a&gt; and &lt;a href="https://grafana.com/"&gt;Grafana&lt;/a&gt;, so I decided to incorporate those tools into my solution. Yes, it does sound like overengineering simple task, you can probably get same results in much simpler way : ). But it was fun weekend project for me.&lt;/p&gt;

&lt;p&gt;In this post I'll describe my setup for monitoring room temperature &amp;amp; humidity.&lt;/p&gt;

&lt;h2&gt;
  
  
  Hardware components
&lt;/h2&gt;

&lt;p&gt;These are all the component, I used in my project:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://www.raspberrypi.org/products/raspberry-pi-3-model-b/"&gt;Raspberry Pi 3 Model B&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;16 GB microSD card&lt;/li&gt;
&lt;li&gt;&lt;a href="https://components101.com/dht11-temperature-sensor"&gt;DHT11 Temperature And Humidity Sensor&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;Mobile phone charger, for powering Raspberry Pi&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Connecting Sensor to Raspberry Pi
&lt;/h2&gt;

&lt;p&gt;I connected Ground pin to the Ground of Raspberry PI, Data Pin to GPIO 14 pin, Vcc pin to 3.3V power supply pin.&lt;/p&gt;

&lt;h2&gt;
  
  
  Reading sensor data
&lt;/h2&gt;

&lt;p&gt;For reading sensor data and feeding it to Prometheus, I chose &lt;a href="https://github.com/szazo/DHT11_Python"&gt;DHT11_Python&lt;/a&gt; library, which is quite unstable, and sometimes does not return valid results, so you might get some gaps in your graphs.&lt;/p&gt;

&lt;p&gt;Also I've created simple Flask API to serve metrics for Prometheus:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;flask&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Flask&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;dht11&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;RPi.GPIO&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;GPIO&lt;/span&gt;

&lt;span class="n"&gt;GPIO&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;setwarnings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;GPIO&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;setmode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;GPIO&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BCM&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;instance&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;dht11&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DHT11&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;pin&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;14&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Flask&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;__name__&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;app&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;route&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"/metrics"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;metrics&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="n"&gt;dht11_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;""&lt;/span&gt;
    &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;is_valid&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="n"&gt;dht11_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;f"""pihome_temperature &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;temperature&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;
pihome_humidity &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;humidity&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="s"&gt;f"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;dht11_data&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;200&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;'Content-Type'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;'text/plain; charset=utf-8'&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;"__main__"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;app&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s"&gt;'0.0.0.0'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Prometheus configuration
&lt;/h2&gt;

&lt;p&gt;To scrape metrics from my Flask API, I've added configuration to &lt;code&gt;prometheus.yml&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;global&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;scrape_interval&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;30s&lt;/span&gt;
&lt;span class="na"&gt;scrape_configs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;job_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;pihome'&lt;/span&gt;
      &lt;span class="na"&gt;static_configs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;targets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;pihome&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;&lt;span class="nv"&gt;5000&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Grafana Configuration
&lt;/h2&gt;

&lt;p&gt;Then, in &lt;code&gt;/etc/grafana/provisioning&lt;/code&gt;, I've added datasource configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;apiVersion&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
&lt;span class="na"&gt;datasources&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Prometheus&lt;/span&gt;
    &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;prometheus&lt;/span&gt;
    &lt;span class="na"&gt;url&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;http://prometheus:9090/&lt;/span&gt;
    &lt;span class="na"&gt;access&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;proxy&lt;/span&gt;
    &lt;span class="na"&gt;isDefault&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;It is also possible to add Grafana dashboards to provisioning folder as json files, so that you don't need to create new dashboard each time you re-deploy Grafana.&lt;/p&gt;

&lt;h2&gt;
  
  
  Connecting everything together
&lt;/h2&gt;

&lt;p&gt;To make everything portable and easy to install, I packed my Flask API to Docker image and configured all services in &lt;code&gt;docker-compose.yaml&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight yaml"&gt;&lt;code&gt;
&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;3'&lt;/span&gt;

&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;pihome&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;pihome&lt;/span&gt;
    &lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;.&lt;/span&gt;
    &lt;span class="na"&gt;restart&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;always&lt;/span&gt;
    &lt;span class="na"&gt;devices&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/dev/mem:/dev/mem"&lt;/span&gt;
    &lt;span class="na"&gt;privileged&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;5000:5000&lt;/span&gt;

  &lt;span class="na"&gt;prometheus&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;prom/prometheus:v2.16.0&lt;/span&gt;
    &lt;span class="na"&gt;user&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;root&lt;/span&gt;
    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./prometheus/:/etc/prometheus/&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;/var/prometheus:/prometheus&lt;/span&gt;
    &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--config.file=/etc/prometheus/prometheus.yml'&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--storage.tsdb.path=/prometheus'&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--storage.tsdb.retention.time=30d'&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--web.console.libraries=/usr/share/prometheus/console_libraries'&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--web.console.templates=/usr/share/prometheus/consoles'&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;9090:9090&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;pihome&lt;/span&gt;
    &lt;span class="na"&gt;restart&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;always&lt;/span&gt;

  &lt;span class="na"&gt;grafana&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;grafana/grafana:6.6.2&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;prometheus&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;80:3000&lt;/span&gt;
    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./grafana/:/etc/grafana&lt;/span&gt;
    &lt;span class="na"&gt;restart&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;always&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Results
&lt;/h2&gt;

&lt;p&gt;I left my stack running for some time, to collect some historical data, and dashboard looked like this:&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Quc9QKnk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/http://tinklas.xz.lt/todo/dashboard.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Quc9QKnk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/http://tinklas.xz.lt/todo/dashboard.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Git project
&lt;/h2&gt;

&lt;p&gt;You can find my full configuration and code on Github: &lt;a href="https://github.com/pdambrauskas/pihome"&gt;https://github.com/pdambrauskas/pihome&lt;/a&gt; &lt;/p&gt;

</description>
      <category>prometheus</category>
      <category>grafana</category>
      <category>diy</category>
      <category>smarthome</category>
    </item>
    <item>
      <title>Kafka Connect: How it let us down?</title>
      <dc:creator>Paulius</dc:creator>
      <pubDate>Mon, 03 Feb 2020 06:18:55 +0000</pubDate>
      <link>https://dev.to/pdambrauskas/kafka-connect-how-it-let-us-down-2nnc</link>
      <guid>https://dev.to/pdambrauskas/kafka-connect-how-it-let-us-down-2nnc</guid>
      <description>&lt;p&gt;About a year ago me and &lt;a href="https://github.com/minutis"&gt;@minutis&lt;/a&gt; had a chance to try out &lt;a href="https://docs.confluent.io/3.0.0/connect/"&gt;Kafka Connect&lt;/a&gt;. We used it as the backbone of one of our &lt;a href="https://www.webopedia.com/TERM/E/ETL.html"&gt;ETL&lt;/a&gt; processes but eventually, we chose a different approach. In this post, I'll try to remember what problems we met, and why Kafka Connect didn't fit our needs.&lt;/p&gt;

&lt;p&gt;For those of you, who do not know what Kafka Connect is, it is a framework for connecting Apache Kafka to external systems such as databases, search indexes and file systems.&lt;br&gt;
Kafka Connect allows both: write data from external source system to Kafka topic and export data from Kafka topic to external system.&lt;/p&gt;

&lt;h2&gt;
  
  
  Main Kafka Connect concepts
&lt;/h2&gt;

&lt;p&gt;I'm not going in-depth to each and every Kafka Connect component, there is plenty of information online on how Kafka Connect is designed and how it works, however, I'll try to describe them, in short, to give you an idea on how Kafka Connect works so that you have more context on what I'm going to write further in this post.&lt;/p&gt;

&lt;p&gt;So Main Kafka Connect components are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Connector - unit of work and also a logical implementation of integrations with external systems. There are two types of connectors: Source Connectors, responsible for reading from external systems to Kafka and Sink Connectors responsible for writing data from Kafka to external systems. Confluent Inc., the main contributor of Kafka Connect, has quite detailed &lt;a href="https://docs.confluent.io/current/connect/devguide.html#"&gt;docummentation&lt;/a&gt; on how to implement your own Source and Sink connectors.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Task - a unit of work. When you are configuring Connector for an external system, you can define the maximum number of tasks. This number defines how many processes in parallel should read from your external system (or write to it). So the work done by Connector is parallelized by the number of tasks.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Worker - component, responsible for task execution. Kafka Connect can work in two modes: standalone and distributed. In standalone mode, you have one worker process responsible for executing your connector task, configured on properties file. In distributed mode, you can start many worker processes and distribute them all across your Kafka cluster. Also, in distributed mode, all connector configuration is done by using &lt;a href="https://docs.confluent.io/current/connect/references/restapi.html"&gt;Kafka Connect Rest API&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Transform - transformation applied to Kafka's message after connector ingests data, but before data is written to Kafka topic. There are many &lt;a href="https://docs.confluent.io/current/connect/transforms/index.html"&gt;Transforms implementations&lt;/a&gt;. It is also very easy to implement custom transforms.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Why it failed us?
&lt;/h2&gt;

&lt;p&gt;The first time I found out about Kafka Connect, I was excited. It looked like really nice and thought through solution for managing different ETL pipelines. It has Rest API for adding/removing connectors, starting stopping and scaling tasks and monitoring task statuses. Extensibility looked really promising too, you can easily add your own connector and transform implementations, without forking Kafka source code and scale it through as many worker processes as you need.&lt;br&gt;
Without further investigations, we decided to try it out. What we experienced wasn't as nice, as we hoped :).&lt;/p&gt;

&lt;h3&gt;
  
  
  Too early to use in production
&lt;/h3&gt;

&lt;p&gt;At the time we were experimenting with Kafka Connect it wasn't stable enough. It had some bugs, the quality of open-sourced connectors was quite poor, and there were a few architecture flaws which were a deal-breakers for us.&lt;/p&gt;

&lt;h4&gt;
  
  
  Bugs
&lt;/h4&gt;

&lt;p&gt;In our use case, we wanted to write data to &lt;a href="https://www.ibm.com/analytics/hadoop/hdfs"&gt;HDFS&lt;/a&gt;. For that we decided to use open-sourced &lt;a href="https://github.com/confluentinc/kafka-connect-hdfs"&gt;kafka-connect-hdfs&lt;/a&gt; connector implementation. At the time we used it, it was pretty much unusable:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We had corrupted files after Kafka rebalance &lt;a href="https://github.com/confluentinc/kafka-connect-hdfs/issues/268"&gt;#268(open)&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;We had limited datatypes available &lt;a href="https://github.com/confluentinc/kafka-connect-hdfs/issues/49"&gt;#49(open)&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;We were not able to partition data by multiple fields (we had implemented our own solution for this one) &lt;a href="https://github.com/confluentinc/kafka-connect-storage-common/issues/53"&gt;#commons-53(fixed)&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;We had tasks failing to resume after a pause &lt;a href="https://github.com/confluentinc/kafka-connect-hdfs/issues/53"&gt;#53(fixed)&lt;/a&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;After the experience with open source connectors we saw, that they are not only buggy but also lack the features we need. We decided to use our own connector implementations and didn't stop believing in Kafka Connect. We encountered some Kafka bugs too, but most of them were fixed fast enough (&lt;a href="https://issues.apache.org/jira/browse/KAFKA-6252"&gt;KAFKA-6252&lt;/a&gt;).&lt;/p&gt;

&lt;p&gt;Some minor Kafka Connect bugs are unfixed even today. One of them, worth mentioning is &lt;a href="https://issues.apache.org/jira/browse/KAFKA-4107"&gt;KAFKA-4107&lt;/a&gt;. In the process of testing, we had some cases when we needed to delete and recreate some of the connectors. Kafka Connect provides REST API endpoint for Connector deletion, however when you delete connector through this API, old task offset remain undeleted, so you can not create a connector with the same name. We found a workaround for this problem: we've added connector versioning (appended version numbers on connector name), to avoid conflicts with offsets from deleted tasks.&lt;/p&gt;

&lt;h4&gt;
  
  
  Rebalance all the time
&lt;/h4&gt;

&lt;p&gt;This was the Kafka Connect design flaw. Kafka Connect rebalanced &lt;em&gt;all&lt;/em&gt; of the tasks on its cluster every time you changed task set (add or delete a task or a connector, etc.). That meant all running tasks had to be stopped and re-started. The time needed to rebalance all your tasks grows significantly each time you add a new connector and becomes unacceptable when it comes to ~100 tasks. &lt;br&gt;
This was the biggest roadblock for us since we had a dynamic environment where the task set was changing rapidly, so rebalancing was happening too.&lt;br&gt;
Well, today this &lt;strong&gt;is not a problem anymore&lt;/strong&gt;. With Kafka 2.3.0 which came not so long ago, this flaw was fixed. You can read more on that &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;We droped the idea of using Kafka Connect (some time in 2018). We droped it, because it wasn't production ready yet, and it didn't fully covered our use cases. Today, many of the problems we met are fixed (some of them are not, but you can find workarounds). I'm still kind of skeptikal about Kafka Connect, however trying and experimenting with it was really fun. I'd say you should consider Kafka Connect only if you are willing to invest time in implementing your own Connectors.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkaconnect</category>
      <category>etl</category>
    </item>
    <item>
      <title>Event Sourcing with Redis</title>
      <dc:creator>Paulius</dc:creator>
      <pubDate>Sat, 25 Jan 2020 11:34:12 +0000</pubDate>
      <link>https://dev.to/pdambrauskas/event-sourcing-with-redis-45ha</link>
      <guid>https://dev.to/pdambrauskas/event-sourcing-with-redis-45ha</guid>
      <description>&lt;p&gt;It is a common practice to use asynchronous messaging technologies to implement communication between multiple microservices. Kafka is a go-to solution when it comes to streaming pipelines and publish/subscribe systems for async communication. It has many advantages over traditional, synchronous HTTP communication, some of them are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;You can easily scale microservice architecture by running multiple instances of your microservice in the same consumer group.&lt;/li&gt;
&lt;li&gt;You can add new consumers without modifying existing communication-related code.&lt;/li&gt;
&lt;li&gt;Take advantage of persistence of Kafka topics by re-consuming them any time you need.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I have implemented asynchronous communication using Kafka several times. Both at work and on my personal projects. But there are many alternatives. One of the latest implementations is newest Redis Datatype - Redis Streams, which came in &lt;a href="https://redislabs.com/blog/redis-5-0-is-here/"&gt;Redis 5.0&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://redis.io/topics/streams-intro"&gt;Redis Streams&lt;/a&gt; may look very similar to traditional Redis Pub/Sub concept, however it is quite different. It shares main conceptual ideas with Apache Kafka:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Stream can have multiple consumers, every new entry on a stream will be delivered to every consumer (unless consumers belong to same consumer group).&lt;/li&gt;
&lt;li&gt;Consumed messages do not disappear, Redis stores streamed data and last consumed &lt;em&gt;id&lt;/em&gt; for each consumer group,
so new consumer groups can consume group from the beginning.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;So, as you see, in theory Redis Streams work very similar to Kafka topics, which means it can be used for same use cases. You can find many resources and examples on how to use Kafka for Event Sourcing (&lt;a href="https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection/"&gt;example&lt;/a&gt;), but when you look at Redis Streams, the variety of examples is very limited. So I decided to contribute to filling in this resource gap :).&lt;br&gt;
In this post I will try to describe how to use Redis Streams for &lt;a href="https://martinfowler.com/eaaDev/EventSourcing.html"&gt;Event sourcing&lt;/a&gt; &amp;amp; &lt;a href="https://www.martinfowler.com/bliki/CQRS.html"&gt;CQRS&lt;/a&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  Redis Streams API theoretical example
&lt;/h2&gt;

&lt;p&gt;Lets say we have a REST API endpoint for GET'ing user social media profile, with personal information, liked pages and list of users' friends. For that we'll have three different Redis Streams:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;user_updates&lt;/code&gt; stream consists of user profile updates. Every time the user changes his birth date, name or other personal information, entry is published to this stream.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;user_activity&lt;/code&gt; every time user writes a comment, reacts to some post, or performs any other action (that is available in Facebook), entry is published to this stream.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;user_friends&lt;/code&gt; every time user gets or loses a friend, entry is published to this stream.&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Filling up event streams
&lt;/h3&gt;

&lt;p&gt;So, we would publish our events by using &lt;code&gt;XADD&lt;/code&gt; command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;XADD user_updates * user_id 1 command register name Duffy surename Duck
XADD user_updates * user_id 2 command register name Bugs surename Bunny
XADD user_activity * user_id 1 command kick object Bugs
XADD user_activity * user_id 1 command dislike object Bugs face
XADD user_friends * user_id 1 command remove friend_id 2
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h3&gt;
  
  
  Reading messages from event streams
&lt;/h3&gt;

&lt;p&gt;Now we can read all streams using &lt;code&gt;XREAD&lt;/code&gt; command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;XREAD STREAMS user_updates user_activity user_friends 0 0 0
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Your output should look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;1) 1) "user_updates"
   2) 1) 1) "1577650357114-0"
         2) 1) "user_id"
            2) "1"
            3) "action"
            4) "register"
            5) "name"
            6) "Duffy"
            7) "surename"
            8) "Duck"
      2) 1) "1577650371803-0"
         2) 1) "user_id"
            2) "2"
            3) "action"
            4) "register"
            5) "name"
            6) "Bugs"
            7) "surename"
            8) "Bunny"
2) 1) "user_activity"
   2) 1) 1) "1577650378926-0"
         2) 1) "user_id"
            2) "1"
            3) "action"
            4) "kick"
            5) "object"
            6) "Bugs"
      2) 1) "1577650384649-0"
         2) 1) "user_id"
            2) "1"
            3) "action"
            4) "dislike"
            5) "object"
            6) "Bugs face"
3) 1) "user_friends"
   2) 1) 1) "1577650389616-0"
         2) 1) "user_id"
            2) "1"
            3) "action"
            4) "remove"
            5) "friend_id"
            6) "2"
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h3&gt;
  
  
  Building snapshots
&lt;/h3&gt;

&lt;p&gt;When building profile representation, you can transform and reflect it in any structure you need. Notice, the three zeroes at the end of the &lt;code&gt;XREAD&lt;/code&gt; command. Those are entry IDs, that can be used as consumer offsets when you are using &lt;code&gt;XREADGROUP&lt;/code&gt; command (we'll use this command later). Entry IDs by default are actually timestamps, with nanoseconds part after dot, so it is easy to read stream from any point in time you need. &lt;/p&gt;

&lt;p&gt;In most cases, it is inefficient to consume the whole stream every time you need data. It is a common practice to save your user profile snapshots in one way or another. For snapshoting you can use Redis Hash data structure (or one of other Redis structures, if that makes sense for you). Hash structure can be created by using &lt;code&gt;HMSET&lt;/code&gt; command (you can set multiple hash fields for aggregated data, it is skipped in this example), the suffix of Hash key is user id:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;HMSET user_snapshot_1 name Duffy ... [field value]
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h3&gt;
  
  
  Continuous consumption
&lt;/h3&gt;

&lt;p&gt;But you don't want to keep track of offsets, you have already consumed, right? Redis has a solution to this problem too. What you can do is use &lt;code&gt;XREADGROUP&lt;/code&gt; along with &lt;code&gt;XACK&lt;/code&gt;. Pseudocode for whole snapshoting process would look something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;XGROUP CREATE user_updates snapshotter 0
XGROUP CREATE user_activity snapshotter 0
XGROUP CREATE user_friends snapshotter 0
WHILE true
    entries = XREADGROUP GROUP snapshotter event_consumer BLOCK 2000 COUNT 10 STREAMS user_updates user_activity user_friends &amp;gt; &amp;gt; &amp;gt;
    if entries == nil
        puts "Timeout... try again"
        CONTINUE
    end

    FOREACH entries AS stream_entries
        FOREACH stream_entries as message
            process_message(message)
            XACK message.stream snapshotter message.id
        END
    END
END
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Java example
&lt;/h2&gt;

&lt;p&gt;I've also implemented Java App to illustrate how to use Redis Streams for Event Sourcing. I chose to use &lt;a href="https://github.com/lettuce-io/lettuce-core"&gt;lettuce&lt;/a&gt; library for communication with Redis server. Source code of my implementation can be found on &lt;a href="https://github.com/pdambrauskas/event-sourcing/tree/master/event-sourcing-redis"&gt;Github&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;There are three main classes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://github.com/pdambrauskas/event-sourcing/blob/master/event-sourcing-redis/src/main/java/com/github/pdambrauskas/eventsourcing/redis/EventStore.java"&gt;EventStore&lt;/a&gt; - this class can be used for event publishing, and subscribing to Redis Streams.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/pdambrauskas/event-sourcing/blob/master/event-sourcing-redis/src/main/java/com/github/pdambrauskas/eventsourcing/redis/SnapshotStore.java"&gt;SnapshotStore&lt;/a&gt; - this class can be used to store and retrieve snapshot objects from Redis.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/pdambrauskas/event-sourcing/blob/master/event-sourcing-redis/src/main/java/com/github/pdambrauskas/eventsourcing/redis/processing/StreamProcessor.java"&gt;StreamProcessor&lt;/a&gt; - combines EventStore and Snapshot store. You can supply multiple event handlers, which are used for building snapshots.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The whole combination of these classes can be found on &lt;a href="https://github.com/pdambrauskas/event-sourcing/blob/master/event-sourcing-redis/src/test/java/com/github/pdambrauskas/eventsourcing/redis/RedisEventSourcingTest.java"&gt;RedisEventSourcingTest&lt;/a&gt; Unit test class.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;While Redis Streams is relatively new concept, it is heavily inspired by Apache Kafka, and has many overlapping features. Also we mustn't forget that Redis has many more data structures and features, which can be used alongside Redis streams (one of which we used for the snapshotting in this post). If you already have Redis in your technology stack and are looking into streaming solutions, consider using Redis Streams. Not only Redis is feature rich it is also very easy to learn and use.  &lt;/p&gt;

</description>
      <category>redis</category>
      <category>streaming</category>
      <category>cqrs</category>
      <category>eventsourcing</category>
    </item>
  </channel>
</rss>
