<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Neil Buesing</title>
    <description>The latest articles on DEV Community by Neil Buesing (@nbuesing).</description>
    <link>https://dev.to/nbuesing</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F362748%2Fe0b26630-bf2e-4e78-95a1-39c24a107730.jpg</url>
      <title>DEV Community: Neil Buesing</title>
      <link>https://dev.to/nbuesing</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/nbuesing"/>
    <language>en</language>
    <item>
      <title>Apache Kafka without Zookeeper and Dedicated Controllers</title>
      <dc:creator>Neil Buesing</dc:creator>
      <pubDate>Thu, 26 Jan 2023 04:17:33 +0000</pubDate>
      <link>https://dev.to/nbuesing/apache-kafka-without-zookeeper-and-dedicated-controllers-4d80</link>
      <guid>https://dev.to/nbuesing/apache-kafka-without-zookeeper-and-dedicated-controllers-4d80</guid>
      <description>&lt;p&gt;Are you interested in setting up Kafka without Zookeeper and with a dedicated controller quorum? Here are the steps and reference project showcasing how to do this using the Confluent community-licensed container images. A Grafana dashboard to observe the new metrics is also provided.&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Kafka Raft brings the consensus protocol into the Controller Plane of Apache Kafka from Zookeeper. With this change, the role of a Kafka instance can be that of a controller, broker, or both. Getting this configuration stood up requires some tweaks to the confluent &lt;a href="https://hub.docker.com/r/confluentinc/cp-kafka" rel="noopener noreferrer"&gt;cp-kafka&lt;/a&gt; image.&lt;/p&gt;

&lt;p&gt;If you want a deeper understanding of the design and implementation details, check out Jun Rao's course on Kafka Internals. Specifically, the &lt;a href="https://developer.confluent.io/learn-kafka/architecture/control-plane/" rel="noopener noreferrer"&gt;control-plane&lt;/a&gt; section.&lt;/p&gt;

&lt;h2&gt;
  
  
  Configuration
&lt;/h2&gt;

&lt;p&gt;There are many configuration parameters with Apache Kafka; highlighted here are the ones necessary to build out the cluster with KRaft.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs_node.id" rel="noopener noreferrer"&gt;&lt;code&gt;node.id&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The property &lt;code&gt;node.id&lt;/code&gt; replaces &lt;code&gt;broker.id&lt;/code&gt;. Be sure that all identifiers in the cluster are unique across brokers and controllers.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs_process.roles" rel="noopener noreferrer"&gt;&lt;code&gt;process.roles&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;A node can be both a &lt;code&gt;broker&lt;/code&gt; or a &lt;code&gt;controller&lt;/code&gt;. Set to &lt;code&gt;broker,controller&lt;/code&gt; to enable both the controller and data planes on a node.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs_controller.listener.names" rel="noopener noreferrer"&gt;&lt;code&gt;controller.listener.names&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;List out the listener names used for the controller. This indicates to a node the listener to use for that communication. While the property is a list, just like &lt;code&gt;advertised.listeners&lt;/code&gt;, the first one is what is used for controller communication.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs_controller.quorum.voters" rel="noopener noreferrer"&gt;&lt;code&gt;controller.quorum.voters&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;A comma delimited list of voters in the control plane, where a controller is noted as: &lt;code&gt;node_id@hostname:port&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;KAFKA_CONTROLLER_QUORUM_VOTERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;10@controller-0:9093,11@controller-1:9093,12@controller-2:9093&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Additional Configuration
&lt;/h3&gt;

&lt;p&gt;There are other &lt;a href="https://kafka.apache.org/documentation/#brokerconfigs" rel="noopener noreferrer"&gt;tuning parameters&lt;/a&gt; for the controller plan, see the documentation for details.&lt;/p&gt;

&lt;h3&gt;
  
  
  Lesson Learned
&lt;/h3&gt;

&lt;p&gt;Do not remove cluster settings from the dedicated controllers, since a controller is the node that performs administration operations, such as creating a topic.&lt;/p&gt;

&lt;p&gt;Incorrectly removing these from the controllers caused topics to be created without Apache Kafka defaults.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;KAFKA_DEFAULT_REPLICATION_FACTOR&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;3&lt;/span&gt;
&lt;span class="na"&gt;KAFKA_NUM_PARTITIONS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;4&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Storage
&lt;/h3&gt;

&lt;p&gt;Another change to setting up Apache Kafka with Raft is the storage. The storage on each node must be configured, before starting the JVM. This can be done with a &lt;code&gt;kafka-storage&lt;/code&gt; command provided as part of Apache Kafka.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Generate a unique UUID for the cluster, you can use &lt;code&gt;kafka-storage random-uuid&lt;/code&gt; or another means.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Before starting the cluster, format the metadata storage with &lt;code&gt;kafka-storage format&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;kafka-storage format &lt;span class="nt"&gt;-t&lt;/span&gt; &lt;span class="nv"&gt;$KAFKA_CLUSTER_ID&lt;/span&gt; &lt;span class="nt"&gt;-c&lt;/span&gt; &amp;lt;server.properties&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Container Images
&lt;/h3&gt;

&lt;p&gt;With these details in mind, applying them to Confluent's &lt;a href="https://hub.docker.com/r/confluentinc/cp-kafka" rel="noopener noreferrer"&gt;cp-kafka&lt;/a&gt; image takes a little finesse, at least with version &lt;code&gt;7.3.0&lt;/code&gt;. The &lt;code&gt;cp-kafka&lt;/code&gt; container's entry point, &lt;code&gt;/etc/confluent/docker/run&lt;/code&gt;, builds the configuration for Apache Kafka from environment variables following conventions. In addition, there are validation steps to catch misconfiguration. These validations, however, need to change, since certain assumptions no longer apply in a zookeeper-less setup. In addition, a node that is only for a &lt;code&gt;controller&lt;/code&gt; does not define &lt;code&gt;advertised.listeners&lt;/code&gt; so validation for that needs to be removed.&lt;/p&gt;

&lt;h4&gt;
  
  
  Script Modifications
&lt;/h4&gt;

&lt;p&gt;The following tasks need to be done to start up these images with raft. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Remove &lt;code&gt;KAFKA_ZOOKEEPER_CONNECT&lt;/code&gt; validation for all nodes.&lt;/li&gt;
&lt;li&gt;Remove checking for zookeeper ready state for all nodes.&lt;/li&gt;
&lt;li&gt;Remove &lt;code&gt;KAFKA_ADVERTISED_LISTENERS&lt;/code&gt; validation for dedicated controller nodes.&lt;/li&gt;
&lt;li&gt;Create Metadata store for all nodes by running &lt;code&gt;kafka-storage format&lt;/code&gt;. &lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Command
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;cp-kafka&lt;/code&gt; image's command is &lt;code&gt;/etc/confluent/docker/run&lt;/code&gt;, and the scripts and docker-compose command setting, allow&lt;br&gt;
these containers to start with raft consensus protocol.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://github.com/kineticedge/dev-local/blob/main/kafka-raft/broker.sh" rel="noopener noreferrer"&gt;broker.sh&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./broker.sh:/tmp/broker.sh&lt;/span&gt;
&lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;bash -c '/tmp/broker.sh &amp;amp;&amp;amp; /etc/confluent/docker/run'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://github.com/kineticedge/dev-local/blob/main/kafka-raft/controller.sh" rel="noopener noreferrer"&gt;controller.sh&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./controller.sh:/tmp/controller.sh&lt;/span&gt;
&lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;bash -c '/tmp/controller.sh &amp;amp;&amp;amp; /etc/confluent/docker/run'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h3&gt;
  
  
  Seeing It In Action
&lt;/h3&gt;

&lt;p&gt;If you are interested in seeing all this in action, check out the &lt;a href="https://github.com/kineticedge/dev-local/tree/main/kafka-raft" rel="noopener noreferrer"&gt;kafka-raft&lt;/a&gt; docker-compose&lt;br&gt;
setup in the &lt;a href="https://github.com/kineticedge/dev-local" rel="noopener noreferrer"&gt;dev-local&lt;/a&gt; project. It is a fully working examples with 3 controllers and 4 brokers.&lt;/p&gt;
&lt;h3&gt;
  
  
  Metrics
&lt;/h3&gt;

&lt;p&gt;If you are going to deploy Kafka with Raft to production, having visibility to metrics is important. Adding that visibility is just as important (if not more so) than having dedicated controllers. The key to dashboards, ensure that they report correctly on data and control planes are separated or combined.&lt;/p&gt;
&lt;h4&gt;
  
  
  Grafana
&lt;/h4&gt;

&lt;p&gt;A Grafana Dashboard is a multi-step process of extracting the metrics and storing them in a time-series database (e.g. Prometheus) and then visualizing those collected metrics in a Grafana Dashboard.&lt;/p&gt;
&lt;h5&gt;
  
  
  JMX Prometheus Exporter
&lt;/h5&gt;

&lt;p&gt;The &lt;a href="https://kafka.apache.org/documentation/#kraft_monitoring" rel="noopener noreferrer"&gt;KRaft Monitor&lt;/a&gt; metrics are defined in the documentation, with an MBean name, such as &lt;code&gt;kafka.server:type=raft-metrics,name=current-state&lt;/code&gt;. Using a JMX client, such as &lt;code&gt;jmxterm&lt;/code&gt;, shows the MBean is name is just &lt;code&gt;kafka.server:type=raft-metrics&lt;/code&gt; with each metric an attribute in that bean. This is different from the current documentation.&lt;/p&gt;

&lt;p&gt;If you deploy Java applications with JMX Metrics in containers, I highly recommend &lt;a href="https://github.com/jiaqi/jmxterm/releases" rel="noopener noreferrer"&gt;jmxterm&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;java &lt;span class="nt"&gt;-jar&lt;/span&gt; jmxterm.jar
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the cp containers, the java process is process &lt;code&gt;1&lt;/code&gt;, but use the command &lt;code&gt;jvms&lt;/code&gt; to see all available processes; and verify the JVM's process-id is indeed &lt;code&gt;1&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$&amp;gt;&lt;/span&gt;open 1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Show all the MBeans in the JVM, with &lt;code&gt;beans&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$&amp;gt;&lt;/span&gt;beans
...
kafka.server:type&lt;span class="o"&gt;=&lt;/span&gt;raft-metrics
...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Select a bean and use &lt;code&gt;info&lt;/code&gt; to explore the attributes on a bean and &lt;code&gt;get&lt;/code&gt; to get the current value of an attribute.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$&amp;gt;&lt;/span&gt;bean kafka.server:type&lt;span class="o"&gt;=&lt;/span&gt;raft-metrics
&lt;span class="c"&gt;#bean is set to kafka.server:type=raft-metrics&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$&amp;gt;&lt;/span&gt;info
&lt;span class="c"&gt;# attributes&lt;/span&gt;
%0   - append-records-rate &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%1   - commit-latency-avg &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%2   - commit-latency-max &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%3   - current-epoch &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%4   - current-leader &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%5   - current-state &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%6   - current-vote &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%7   - election-latency-avg &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%8   - election-latency-max &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%9   - fetch-records-rate &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%10  - high-watermark &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%11  - log-end-epoch &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%12  - log-end-offset &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%13  - number-unknown-voter-connections &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
%14  - poll-idle-ratio-avg &lt;span class="o"&gt;(&lt;/span&gt;double, r&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$&amp;gt;&lt;/span&gt;get current-leader
current-leader &lt;span class="o"&gt;=&lt;/span&gt; 10.0&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Leveraging the above, the following properly exposes these from JMX Prometheus Exporter. Since &lt;code&gt;current-state&lt;/code&gt; attribute value is a string, its value needs to be associated with a label to capture it in Prometheus.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;rules&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;pattern&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kafka.server&amp;lt;type=raft-metrics&amp;gt;&amp;lt;&amp;gt;(current-state):&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;(.+)"&lt;/span&gt;
  &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka_server_raft_metrics&lt;/span&gt;
  &lt;span class="na"&gt;labels&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;$1&lt;/span&gt;
  &lt;span class="na"&gt;state&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;$2&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
&lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;pattern&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kafka.server&amp;lt;type=raft-metrics&amp;gt;&amp;lt;&amp;gt;(.+):&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;(.+)"&lt;/span&gt;
  &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka_server_raft_metrics&lt;/span&gt;
  &lt;span class="na"&gt;labels&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;$1&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h5&gt;
  
  
  Grafana Dashboard
&lt;/h5&gt;

&lt;p&gt;Grafana is great for custom configuration, but that means time and effort are needed to build them. Here is a dashboard around some of those raft metrics; it is not a complete dashboard.&lt;/p&gt;

&lt;h6&gt;
  
  
  &lt;strong&gt;Node Information&lt;/strong&gt;
&lt;/h6&gt;

&lt;p&gt;So with metrics emitting, put them into a Grafana dashboard.&lt;/p&gt;

&lt;p&gt;By using the &lt;code&gt;current-state&lt;/code&gt; we can see who is the leader, in addition to capturing &lt;code&gt;node.id&lt;/code&gt;. In addition, the dashboard component joins in other data to provide additional information on each node.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fwww.kineticedge.io%2Fimages%2Fblog%2Fkafka-raft%2Fnode-info_hu2d6d4c439fe0020661a8ea561be89976_260827_1600x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fwww.kineticedge.io%2Fimages%2Fblog%2Fkafka-raft%2Fnode-info_hu2d6d4c439fe0020661a8ea561be89976_260827_1600x0_resize_box_3.png" alt="node info" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h6&gt;
  
  
  &lt;strong&gt;Node Counts&lt;/strong&gt;
&lt;/h6&gt;

&lt;p&gt;Counts of nodes and active controller are always re-assuring, and this leverages an existing metric, &lt;code&gt;kafka_controller_kafkacontroller_value{name="ActiveControllerCount",}&lt;/code&gt;. This metric only is emitted from a controller, so by counting the existence of this metric you see the number of controllers in the cluster, and by summing the value of the metric you get the actual number of active controllers; alert if this is ever not equal to 1.&lt;/p&gt;

&lt;p&gt;Check out the dashboard to see how the other values are calculated, as it is the same as in the zookeeper-based installations.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fwww.kineticedge.io%2Fimages%2Fblog%2Fkafka-raft%2Fnodes_huf1deff26c8139713de2df050e44f19c1_91796_1600x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fwww.kineticedge.io%2Fimages%2Fblog%2Fkafka-raft%2Fnodes_huf1deff26c8139713de2df050e44f19c1_91796_1600x0_resize_box_3.png" alt="nodes" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h6&gt;
  
  
  &lt;strong&gt;Active Controller&lt;/strong&gt;
&lt;/h6&gt;

&lt;p&gt;To get the &lt;code&gt;node.id&lt;/code&gt; of the quorum leader, just find &lt;code&gt;max(kafka_server_raft_metrics{name="current-leader",})&lt;/code&gt;. Because scraping of each node is from slightly different times, different values are possible at the time of a change; max is used to make the single value display easy to build.&lt;/p&gt;

&lt;p&gt;If a new leader is being voted, that will show up in the voted leader metric. In a single value dashboard, I do not expect this to be that useful, but in a time-series history of values, more value would be in having this metric recorded.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fwww.kineticedge.io%2Fimages%2Fblog%2Fkafka-raft%2Fleader_hue7d9dd872581b91b852092ddaee702f1_25835_1600x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fwww.kineticedge.io%2Fimages%2Fblog%2Fkafka-raft%2Fleader_hue7d9dd872581b91b852092ddaee702f1_25835_1600x0_resize_box_3.png" alt="leader" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h5&gt;
  
  
  &lt;strong&gt;Full Dashboard&lt;/strong&gt;
&lt;/h5&gt;

&lt;p&gt;Here is an example dashboard that also captures the fetch rate of the controller metadata.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fwww.kineticedge.io%2Fimages%2Fblog%2Fkafka-raft%2Ffull_hu98b119011914a8d9ffa92f80b5651829_1116307_1600x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fwww.kineticedge.io%2Fimages%2Fblog%2Fkafka-raft%2Ffull_hu98b119011914a8d9ffa92f80b5651829_1116307_1600x0_resize_box_3.png" alt="full" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Open-Source Tools
&lt;/h4&gt;

&lt;p&gt;I have checked a variety of open-source tools out there and have been unsuccessful in seeing raft metrics. Active controller information is incorrectly displayed. None of the tools tried came to support raft, but it is important that before you upgrade, you have a proper monitoring and alerting strategy in place.&lt;/p&gt;

&lt;h4&gt;
  
  
  Summary
&lt;/h4&gt;

&lt;p&gt;Be sure you properly test your monitoring and Apache Kafka support infrastructure as part of your move to Kafka Raft Consensus Protocol. Also, validate that kafka-raft metrics are captured and ensure that dashboards and tools work when brokers and controllers are on separate nodes.&lt;/p&gt;

</description>
      <category>productivity</category>
    </item>
    <item>
      <title>Console Consumer - BytesDeserializer for the Win</title>
      <dc:creator>Neil Buesing</dc:creator>
      <pubDate>Thu, 15 Sep 2022 19:10:57 +0000</pubDate>
      <link>https://dev.to/nbuesing/console-consumer-bytesdeserializer-for-the-win-4i7b</link>
      <guid>https://dev.to/nbuesing/console-consumer-bytesdeserializer-for-the-win-4i7b</guid>
      <description>&lt;p&gt;If you want to make sure your expected String key is what you think it is, using BytesDeserializer with your console consumers is better than StringDeserializer.&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;The Confluent Avro Serializer and Deserializer leverages storing the unique ID of the schema in the message. When unexpected characters show up in a string, a type mismatch would be more obvious. But what about non-printable characters? How do they show up? Will the issue then be obvious?&lt;/p&gt;

&lt;h2&gt;
  
  
  Demonstration
&lt;/h2&gt;

&lt;p&gt;A simple demonstration can be done with the Datagen Source Connector. Create a connector with Avro as the key. The data type for the Datagen's quickstart &lt;code&gt;users&lt;/code&gt; is a string. The Avro serializer will write this as an &lt;strong&gt;Avro primitive&lt;/strong&gt;. Typically, when Avro is used, the top-level object is a Record, but the serializer has custom code for supporting primitives.&lt;/p&gt;

&lt;h3&gt;
  
  
  The Configuration
&lt;/h3&gt;

&lt;p&gt;The Datagen connector is configured with the key being represented at Avro.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.kafka.connect.datagen.DatagenConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"tasks.max"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"kafka.topic"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"quickstart"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"users"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schema.registry.url"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"http://schema-registry:8081"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schemas.enable"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schema.registry.url"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"http://schema-registry:8081"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schemas.enable"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"max.interval"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"iterations"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;10000000&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Scenario
&lt;/h3&gt;

&lt;p&gt;You write a Kafka Streams application where you read the key as a &lt;code&gt;Serdes.String()&lt;/code&gt;, the default you used for your application. You forget to change the serde for reading &lt;code&gt;users&lt;/code&gt; from the default serde to an Avro Serde. You now join your stream of orders with users, and none of the joins succeeds.&lt;/p&gt;

&lt;h3&gt;
  
  
  Investigation...
&lt;/h3&gt;

&lt;p&gt;If you are me, the first thing you do is you use &lt;code&gt;kafka-avro-console-consumer&lt;/code&gt; to see what is going on.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;kafka-avro-console-consumer &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:19092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--property&lt;/span&gt; schema.registry.url&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"http://localhost:8081"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--property&lt;/span&gt; print.key&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;true&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--property&lt;/span&gt; key.separator&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"|"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--from-beginning&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--skip-message-on-error&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--key-deserializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;org.apache.kafka.common.serialization.StringDeserializer &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--topic&lt;/span&gt; &lt;span class="nb"&gt;users&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The result has content that looks pretty normal and expected:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;User_9|{"registertime":1489457902486,"userid":"User_9","regionid":"Region_1","gender":"OTHER"}
User_1|{"registertime":1500277798184,"userid":"User_1","regionid":"Region_2","gender":"OTHER"}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now there could be extra blank lines show up if the non-printable bytes triggers that; but that doesn't always stick out and an obvious issue (at least not obvious to me).&lt;/p&gt;

&lt;p&gt;What if your key deserializer was &lt;code&gt;BytesDeserializer&lt;/code&gt;, what would you have seen?&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;kafka-avro-console-consumer &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:19092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--property&lt;/span&gt; schema.registry.url&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"http://localhost:8081"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--property&lt;/span&gt; print.key&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;true&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--property&lt;/span&gt; key.separator&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"|"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--from-beginning&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--skip-message-on-error&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--key-deserializer&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;org.apache.kafka.common.serialization.BytesDeserializer &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;--topic&lt;/span&gt; &lt;span class="nb"&gt;users&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The serializer's magic byte (0x00) and the bytes for the schema-id show up in printable hex characters:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;\x00\x00\x00\x00\x03\x0CUser_9|{"registertime":1489457902486,"userid":"User_9","regionid":"Region_1","gender":"OTHER"}
\x00\x00\x00\x00\x03\x0CUser_1|{"registertime":1500277798184,"userid":"User_1","regionid":"Region_2","gender":"OTHER"}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now it is easy to see the issue, the key is Avro (a primitive Avro string as defined by the serializer). &lt;strong&gt;Solution&lt;/strong&gt;: update the connector to use a String, or update the streams application to re-key the data.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;NOTE&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Running containers for demonstrations is great, but the mis-match of URLs can be confusing. &lt;code&gt;localhost:port&lt;/code&gt; is used for connecting to services from the host machine (your laptop) via port mapping. The actual hostname is used when you are accessing the service from another container. Therefore, you will see &lt;code&gt;http://schema-registry:8081&lt;/code&gt; within the connect configuration, and &lt;code&gt;http://localhost:8081&lt;/code&gt; for commands running from the host machine. I do not translate here as these scripts align with the demo code.&lt;/p&gt;




&lt;h3&gt;
  
  
  Useful Shell Aliases
&lt;/h3&gt;

&lt;p&gt;I have these defined in my &lt;code&gt;.zshrc&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;alias &lt;/span&gt;&lt;span class="nv"&gt;kcc&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'kafka-console-consumer \
        --bootstrap-server localhost:19092 \
        --key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer  \
        --property print.key=true \
        --property key.separator="|" \
        --from-beginning \
        --topic'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;alias &lt;/span&gt;&lt;span class="nv"&gt;kacc&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'kafka-avro-console-consumer \
        --bootstrap-server localhost:19092 \
        --property schema.registry.url="http://localhost:8081" \
        --property print.key=true \
        --property key.separator="|" \
        --from-beginning \
        --skip-message-on-error \
        --key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer \
        --topic'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Takeaways
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;While this may seem obvious to you, and you would immediately inspect the connector configuration and uncover the problem; you want to make things easy for everyone on your team. Allowing them to troubleshoot and find issues easy is a win for you and a win for them.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;This demonstration is available within the &lt;code&gt;key-mismatch&lt;/code&gt; demo within &lt;a href="https://www.github.com/kineticedge/dev-local-demos"&gt;dev-local-demos&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Reach out
&lt;/h2&gt;

&lt;p&gt;I would enjoy hearing more about the development improvements you use.&lt;br&gt;
Reach out at &lt;a href="https://www.github.com/kineticedge/contact"&gt;contact us&lt;/a&gt;. &lt;/p&gt;

</description>
    </item>
    <item>
      <title>Apache Kafka Monitoring and Management</title>
      <dc:creator>Neil Buesing</dc:creator>
      <pubDate>Thu, 08 Sep 2022 16:33:03 +0000</pubDate>
      <link>https://dev.to/nbuesing/apache-kafka-monitoring-and-management-3lbf</link>
      <guid>https://dev.to/nbuesing/apache-kafka-monitoring-and-management-3lbf</guid>
      <description>&lt;p&gt;Setting up multiple kafka cluster configurations to explore the nuances of various tools that monitor Apache Kafka.&lt;/p&gt;

&lt;h2&gt;
  
  
  tl;dr
&lt;/h2&gt;

&lt;p&gt;I have a new project I have created for testing Kafka tools, check it on GitHub at &lt;a href="https://www.github.com/kineticedge/kafka-toolage"&gt;kineticedge/kafka-toolage&lt;/a&gt;. It has 4 different Kafka clusters. As I add new tools, I will be adding those configurations to this project and then write about that experience and talk about their configuration and their functionality. I spend a lot of time doing web-searches and trial and error getting configurations working; I hope I can save you some time.&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;When it comes to the configuration of Apache Kafka, the integration can get complicated. The number of configurations of Apache Kafka, Kafka Connect, Confluent Schema Registry is extensive. When you add in 3rd-party tools each built by independent developer teams, it's challenging to figure out all the configurations.&lt;/p&gt;

&lt;p&gt;When you want to evaluate a tool, you want to focus on exploring its features and determining if those features meet your needs; you do not want to worry about setup and especially don’t want to evaluate only to find out you cannot easily integrate it with your production environment.&lt;/p&gt;

&lt;p&gt;Future articles walk through the setup of tools against these 4 different Apache Kafka cluster configurations. This will accelerate your integration time and leave your developers focused on writing Kafka applications and minimizing time spent on infrastructure configuration.&lt;/p&gt;

&lt;p&gt;In addition to seeing how to configure open-source tooling on clusters of each configuration, here are some additional items to see through this journey:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;If you want to use a &lt;strong&gt;JAAS configuration file&lt;/strong&gt;, why does the Confluent Container of Schema Registry require you to add it to both KAFKA_OPTS and SCHEMA_REGISTRY_OPTS?&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Basic Auth configuration for Schema Registry and Kafka Connect are not the same, what are the differences exactly?&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Why is Confluent Schema Registry configuration in Kafka-UI different from Kafka Connect and Confluent’s Java Serializer and Deserializer configurations?&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Why Multiple Clusters?
&lt;/h2&gt;

&lt;p&gt;The 4 clusters showcase 5 different Apache Kafka protocols and multiple options for Kafka Connect and Schema Registry.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;non-authenticated&lt;/strong&gt; cluster is typically like an internal developer cluster or a POC cluster. It is typically the environment used when trying out Kafka for the first time, or an environment where someone is learning more about Kafka. The demonstrated here has two listeners, &lt;strong&gt;PLAINTEXT&lt;/strong&gt; and &lt;strong&gt;SSL&lt;/strong&gt;, but typically if &lt;strong&gt;SSL&lt;/strong&gt; is desired, the &lt;strong&gt;PLAINTEXT&lt;/strong&gt; listener is omitted.&lt;/p&gt;

&lt;p&gt;A &lt;strong&gt;SASL&lt;/strong&gt; authenticated cluster gives the ability to use &lt;strong&gt;PLAINTEXT&lt;/strong&gt; and &lt;strong&gt;SSL&lt;/strong&gt; connections. Why would anyone want to do this? Well, if you have internal IPs only accessible from in-network, you have the option for specific tools and connections to leverage 0 page-copy and improve performance of inner broker communication. You also can leverage &lt;strong&gt;PLAIN&lt;/strong&gt; authentication vs. &lt;strong&gt;SCRAM&lt;/strong&gt; authentication for setting up super-user access. Using &lt;strong&gt;SCRAM&lt;/strong&gt; based users for admin users is a little more complicated in that you have to create those users when zookeeper is up and available (before the brokers are ever started) and leverage tooling directly against zookeeper. This gives insight into how to do that.&lt;/p&gt;

&lt;p&gt;A &lt;strong&gt;SSL&lt;/strong&gt; authenticated cluster shows how you can leverage certificates for client authentication. Building up a self-signed certificate process is not something that is part of most distributions; this project provides one to showcase how to leverage openssl to create certificates while showcasing the proper X509 extensions necessary.&lt;/p&gt;

&lt;p&gt;Finally, an &lt;strong&gt;OAUTH&lt;/strong&gt; authenticate cluster showcases how extensible a tool is for integration into your environment; and will help you identify if multiple authentication means to your cluster is necessary for leveraging a particular tool.&lt;/p&gt;

&lt;p&gt;All certificates are all internally created by this project. A single turnkey generation from CA to broker and client certificates; including intermediate CA certificates.&lt;/p&gt;

&lt;p&gt;The goal here is to verify that the open-source tool being considered will work with your environment. While the clusters presented here, do not handle all scenarios, it covers many of them and I hope that can minimize the path to getting them integrated with your clusters. I have spent hours on incorrect understandings of configuration, certificates that can be used for client authentication, and the unique configuration settings of many 3rd party tools.&lt;/p&gt;

&lt;p&gt;While reading the documentation is typically the answer, juggling many documents when your interest is to verify something meets your needs before you spend all those hours getting it integrated is not ideal; let us help you with that by showcasing it here.&lt;/p&gt;

&lt;p&gt;Years of Apache Kafka experience is behind this setup.&lt;br&gt;
Such setups can easily become 2-4 months of effort within your organization. This project's goal is to help you get up and running in a fraction of that time.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Clusters
&lt;/h2&gt;

&lt;p&gt;There are way more scenarios than what I have here.&lt;br&gt;
What this provides is a way to test out &lt;strong&gt;SASL&lt;/strong&gt; and &lt;strong&gt;SSL&lt;/strong&gt; authentication, &lt;strong&gt;PLAINTEXT&lt;/strong&gt; and &lt;strong&gt;SSL&lt;/strong&gt; encryption, and basic-auth and no-auth authentication of the RESTful endpoints of Confluent’s Schema Registry and Kafka Connect. Also, a configuration I had yet to try, a custom &lt;strong&gt;OAUTH&lt;/strong&gt; implementation. From a tooling standpoint, I also wanted to also see if a tool can handle multiple connect clusters associated with a given Kafka cluster. This is why one cluster has two separate connect clusters; to verify tools that support cluster information supports having multiple of them (with different Kafka listeners).&lt;/p&gt;

&lt;h3&gt;
  
  
  Cluster 1: Non-Authenticated Cluster
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Connectivity

&lt;ul&gt;
&lt;li&gt;Kafka Brokers&lt;/li&gt;
&lt;li&gt;PLAINTEXT protocol on port 9092&lt;/li&gt;
&lt;li&gt;SSL protocol on port 9093&lt;/li&gt;
&lt;li&gt;Schema Registry&lt;/li&gt;
&lt;li&gt;HTTP&lt;/li&gt;
&lt;li&gt;Kafka Connect&lt;/li&gt;
&lt;li&gt;HTTP&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;




&lt;ul&gt;
&lt;li&gt;Configuration

&lt;ul&gt;
&lt;li&gt;1 Zookeeper&lt;/li&gt;
&lt;li&gt;3 Brokers&lt;/li&gt;
&lt;li&gt;Inner Broker Protocol PLAINTEXT&lt;/li&gt;
&lt;li&gt;1 Schema Registry&lt;/li&gt;
&lt;li&gt;Broker Protocol PLAINTEXT&lt;/li&gt;
&lt;li&gt;Kafka Connect Cluster A&lt;/li&gt;
&lt;li&gt;Broker Protocol PLAINTEXT&lt;/li&gt;
&lt;li&gt;Kafka Connect Cluster B&lt;/li&gt;
&lt;li&gt;Broker Protocol SSL&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--AUT2-_Yy--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/kafka-toolage-setup/cluster-1_hu66c3dd654c6c16438e4fe01a8ee31496_229654_1600x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--AUT2-_Yy--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/kafka-toolage-setup/cluster-1_hu66c3dd654c6c16438e4fe01a8ee31496_229654_1600x0_resize_box_3.png" alt="noauth cluster" title="Cluster 1 : PLAINTEXT and SSL   Cluster without authentication" width="880" height="864"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Cluster 2: SASL Authenticated Cluster
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Connectivity

&lt;ul&gt;
&lt;li&gt;Kafka Brokers&lt;/li&gt;
&lt;li&gt;SASL_PLAINTEXT protocol on port 9092 (scram-sha-512 &amp;amp; plain)&lt;/li&gt;
&lt;li&gt;SASL_SSL protocol on port 9093 (scram-sha-512)&lt;/li&gt;
&lt;li&gt;Schema Registry&lt;/li&gt;
&lt;li&gt;HTTPS with Basic Authentication&lt;/li&gt;
&lt;li&gt;Kafka Connect&lt;/li&gt;
&lt;li&gt;HTTPS with Basic Authentication&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;




&lt;ul&gt;
&lt;li&gt;Configuration

&lt;ul&gt;
&lt;li&gt;1 Zookeeper&lt;/li&gt;
&lt;li&gt;3 Brokers&lt;/li&gt;
&lt;li&gt;Inner Broker Protocol SASL_PLAINTEXT (plain)&lt;/li&gt;
&lt;li&gt;Schema Registry&lt;/li&gt;
&lt;li&gt;Broker Protocol SASL_PLAINTEXT (plain)&lt;/li&gt;
&lt;li&gt;Kafka Connect&lt;/li&gt;
&lt;li&gt;Two Workers&lt;/li&gt;
&lt;li&gt;Broker Protocol SASL_PLAINTEXT (plain)&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KWOshuSP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/kafka-toolage-setup/cluster-2_hua3b3329d41b230d638a85f1815584d46_202720_800x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KWOshuSP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/kafka-toolage-setup/cluster-2_hua3b3329d41b230d638a85f1815584d46_202720_800x0_resize_box_3.png" alt="ssl cluster" title="Cluster 2 : Cluster with SASL authentication (plain and scram)" width="800" height="535"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Cluster 3: SSL Authenticated Cluster
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Connectivity

&lt;ul&gt;
&lt;li&gt;Kafka Brokers&lt;/li&gt;
&lt;li&gt;SSL protocol on port 9093 (SSL Client Authentication required)&lt;/li&gt;
&lt;li&gt;Schema Registry&lt;/li&gt;
&lt;li&gt;HTTPS with Basic Authentication&lt;/li&gt;
&lt;li&gt;Kafka Connect&lt;/li&gt;
&lt;li&gt;HTTPS with Basic Authentication&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;




&lt;ul&gt;
&lt;li&gt;Configuration

&lt;ul&gt;
&lt;li&gt;1 Zookeeper&lt;/li&gt;
&lt;li&gt;3 Brokers&lt;/li&gt;
&lt;li&gt;Inner Broker Protocol SSL&lt;/li&gt;
&lt;li&gt;Schema Registry&lt;/li&gt;
&lt;li&gt;Broker Protocol SSL&lt;/li&gt;
&lt;li&gt;Kafka Connect&lt;/li&gt;
&lt;li&gt;Broker Protocol SSL&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--fX3n8yKK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/kafka-toolage-setup/cluster-3_hu55b370b7cab96d17bdb8b3077b7426f3_187341_1600x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--fX3n8yKK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/kafka-toolage-setup/cluster-3_hu55b370b7cab96d17bdb8b3077b7426f3_187341_1600x0_resize_box_3.png" alt="noauth cluster" title="Cluster 3 : cluster with SSL encryption and authentication" width="880" height="589"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Cluster 4: OAUTH Authenticated Cluster
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Connectivity

&lt;ul&gt;
&lt;li&gt;Kafka Brokers&lt;/li&gt;
&lt;li&gt;SASL_PLAINTEXT protocol on port 9092 (plain)&lt;/li&gt;
&lt;li&gt;SASL_SSL protocol on port 9093 (oauthbearer)&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;




&lt;ul&gt;
&lt;li&gt;Configuration

&lt;ul&gt;
&lt;li&gt;1 Zookeeper&lt;/li&gt;
&lt;li&gt;3 Brokers&lt;/li&gt;
&lt;li&gt;Inner Broker Protocol SASL_PLAINTEXT (plain)&lt;/li&gt;
&lt;li&gt;No Schema Registry&lt;/li&gt;
&lt;li&gt;No Connect Cluster&lt;/li&gt;
&lt;li&gt;Open Source Hydra OAuth Server&lt;/li&gt;
&lt;li&gt;Image extended to pre-configure database and users created.&lt;/li&gt;
&lt;li&gt;OAuth Client java library&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--45b4MgsL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/kafka-toolage-setup/cluster-4_hu555f17e6b2283fa0550b35423fa32dd5_157093_1600x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--45b4MgsL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/kafka-toolage-setup/cluster-4_hu555f17e6b2283fa0550b35423fa32dd5_157093_1600x0_resize_box_3.png" alt="noauth cluster" title="Cluster 4 : cluster with custom oauth authentication" width="880" height="519"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Demonstration Project
&lt;/h4&gt;

&lt;p&gt;The above clusters and configurations are freely available in an Apache 2.0 Licensed project on &lt;a href="https://www.github.com/kineticedge/kafka-toolage"&gt;GitHub&lt;/a&gt;. This licensing does not apply to the components being evaluated, and you need to validate and understand their licensing before bringing them into your organization.&lt;/p&gt;

&lt;h5&gt;
  
  
  Project Notes
&lt;/h5&gt;

&lt;ul&gt;
&lt;li&gt;If clusters fail to start, the first thing to check is if you created the certificates. If certificates don’t exist the brokers will fail to start and the dependencies on them will cause them to hang.&lt;/li&gt;
&lt;li&gt;Because of having multiple clusters at once, port mapping is not done. Use the &lt;code&gt;jumphost&lt;/code&gt; container to gain access to kafka command line tools to the clusters.&lt;/li&gt;
&lt;li&gt;This project is being developed on a Mac M1 Max with 64GB, with 32GB dedicated to Docker. On a machine with less resources, consider testing against each cluster one at a time.&lt;/li&gt;
&lt;li&gt;As I add in new tools, I expect I will have to go back and make some changes. Hopefully not, but with all nuances between them and uncovered copy and paste issues; I expect I will find a few more.&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Classifications
&lt;/h4&gt;

&lt;p&gt;Tool classifications, for the sake of these reviews, are &lt;strong&gt;monitoring&lt;/strong&gt;, &lt;strong&gt;observation&lt;/strong&gt;, and &lt;strong&gt;administration&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Example operations for each:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Monitoring

&lt;ul&gt;
&lt;li&gt;Bytes in and out on a given topic&lt;/li&gt;
&lt;li&gt;Disk utilization of brokers&lt;/li&gt;
&lt;li&gt;Number of partitions on a broker&lt;/li&gt;
&lt;li&gt;Connector status and health&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;Observation

&lt;ul&gt;
&lt;li&gt;Inspect Messages on a topic&lt;/li&gt;
&lt;li&gt;Inspect a Schema&lt;/li&gt;
&lt;li&gt;Consumer Lag&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;Administration

&lt;ul&gt;
&lt;li&gt;Move partitions of a topic&lt;/li&gt;
&lt;li&gt;Change the retention time of a topic&lt;/li&gt;
&lt;li&gt;Adjust offsets of a consumer group&lt;/li&gt;
&lt;li&gt;Pause and resume a connector&lt;/li&gt;
&lt;li&gt;Delete a schema&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Tools can support multiple operations, but also know that tools are developed with a specific feature set in mind. I expect this list to change as I learn more about each tool as I attempt to integrate them into these clusters.&lt;/p&gt;

&lt;h4&gt;
  
  
  Tools
&lt;/h4&gt;

&lt;p&gt;These are current tools planned to be evaluated against these clusters. When the review is completed, a link will be made available from here to that review. The review is more than just an overview of the features and how they work and integrate with Apache Kafka, but a source to use to see each cluster integration together, hoping at least one of these configurations is close to your setup making it quicker for integration and evaluation.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;AKQH&lt;/li&gt;
&lt;li&gt;CMAK&lt;/li&gt;
&lt;li&gt;Grafana&lt;/li&gt;
&lt;li&gt;Kafdrop&lt;/li&gt;
&lt;li&gt;KafkaUI&lt;/li&gt;
&lt;li&gt;kowl&lt;/li&gt;
&lt;li&gt;Lensesio

&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;em&gt;The order listed does not indicate the order of review.&lt;/em&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Coming Soon
&lt;/h4&gt;

&lt;p&gt;First I will explore &lt;strong&gt;Grafana&lt;/strong&gt;, which leveraging Prometheus and the Prometheus JMX Exporter. Then I will explore &lt;strong&gt;Kafka-UI&lt;/strong&gt;.&lt;/p&gt;

&lt;h4&gt;
  
  
  Questions You May have
&lt;/h4&gt;

&lt;p&gt;&lt;strong&gt;Why is a tool missing from the list?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This list is currently based on tools that are open-source with free-to-use licensing. Please, feel free to suggest additional tools for evaluation. If the tool is commercial, a free developer license is required for my review. That license should be renewable, as I hope to reevaluate when major changes are released to Apache Kafka (e.g. KRaft) and to the respective tool.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Will you succeed?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;I do not know yet the configuration of each tool. I may "give up" saying it is not possible only learn later I just didn’t know how to do it. I will be more than happy to update the process, admit my mistake, and make it easier for others to integrate tools with a given setup.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;When will the results of each tool be posted?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;My timeline will not be consistent. I have other areas of interest to write about so I expect other articles to be intermixed with this process. Also, I expect there to be a fair amount of work on my part to understand and test out things – well before I even write about it. I can say I have two integrations nearly completed; but I still have to write them.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;What about the all the interesting stuff you learned along the way?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The technology questions I have uncovered during this process (like the 3 I mentioned above) will be covered throughout the set of articles.&lt;/p&gt;

&lt;h2&gt;
  
  
  Reach out
&lt;/h2&gt;

&lt;p&gt;Please &lt;a href="https://www.kineticedge.io/contact"&gt;contact us&lt;/a&gt; if you have ideas, suggestions, want clarification, or just want to talk Apache Kafka administration.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Building an arm64 container for Apache Druid for your Apple Silicon</title>
      <dc:creator>Neil Buesing</dc:creator>
      <pubDate>Thu, 08 Sep 2022 13:35:02 +0000</pubDate>
      <link>https://dev.to/nbuesing/building-an-arm64-container-for-apache-druid-for-your-apple-silicon-4l4d</link>
      <guid>https://dev.to/nbuesing/building-an-arm64-container-for-apache-druid-for-your-apple-silicon-4l4d</guid>
      <description>&lt;p&gt;The published Apache Druid container at &lt;a href="https://hub.docker.com/r/apache/druid"&gt;Docker&lt;/a&gt; is a linux/amd64 only image. Running this on your Apple Silicon (M1 or M2 chipset) is slow. &lt;/p&gt;

&lt;p&gt;Fortunately, it is super easy to build your own leveraging the binary distribution and existing &lt;a href="https://github.com/apache/druid/blob/master/distribution/docker/druid.sh"&gt;docker.sh&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;All of this is available in a Dockerfile and build script in the &lt;a href="https://github.com/kineticedge/druid-m1"&gt;druid-m1&lt;/a&gt; repository. The &lt;code&gt;build.sh&lt;/code&gt; builds an arm64 image based on the version to be downloaded in the &lt;code&gt;Dockerfile&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;A &lt;code&gt;linux/amd64&lt;/code&gt; container-based deployment of Apache Druid on the Apple M1 Silicon takes 2 minutes (1:58.58 - test on Apple M1 Max with 64GB memory and 32GB allocated) to start and become available for processing. &lt;br&gt;
An image built with &lt;code&gt;linux/arm64&lt;/code&gt; based linux images only takes 18 seconds (0:17.79) to become available.&lt;/p&gt;

&lt;p&gt;Just need an arm64/v8 image, just download the &lt;a href="https://github.com/kineticedge/druid-m1"&gt;druid-m1&lt;/a&gt; project and run the &lt;code&gt;build.sh&lt;/code&gt; script. What to know a little bit into how it was put together; continue on.&lt;/p&gt;
&lt;h2&gt;
  
  
  Image
&lt;/h2&gt;

&lt;p&gt;The process of creating this image isn't complicated. Three major pieces went into it's creation.&lt;/p&gt;
&lt;h3&gt;
  
  
  OS Architecture
&lt;/h3&gt;

&lt;p&gt;First find and use containers that have an arm64/v8 image. Both "openjdk:11-jre-slim" and "busybox" have arm64/v8&lt;br&gt;
images.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--NFt0elLX--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/druid-m1/openjdk-image_hu1db27e9891049f36aa0389e2cc901d26_114467_1600x0_resize_box_3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--NFt0elLX--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/druid-m1/openjdk-image_hu1db27e9891049f36aa0389e2cc901d26_114467_1600x0_resize_box_3.png" alt="openjdk container image" title="Docker Container Image for OpenJDK 11" width="880" height="264"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Software Installation
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;Dockerfile&lt;/code&gt; downloads and installs Druid and downloads and uses the &lt;code&gt;druid.sh&lt;/code&gt; that is being maintained by Apache Druid.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="k"&gt;ARG&lt;/span&gt;&lt;span class="s"&gt; DRUID_VERSION=0.23.0&lt;/span&gt;
&lt;span class="k"&gt;ADD&lt;/span&gt;&lt;span class="s"&gt; https://dlcdn.apache.org/druid/${DRUID_VERSION}/apache-druid-${DRUID_VERSION}-bin.tar.gz /tmp&lt;/span&gt;
&lt;span class="k"&gt;ADD&lt;/span&gt;&lt;span class="s"&gt; https://raw.githubusercontent.com/apache/druid/${DRUID_VERSION}/distribution/docker/druid.sh /druid.sh&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Druid Extensions
&lt;/h3&gt;

&lt;p&gt;Druid extensions are added by &lt;code&gt;pull-deps&lt;/code&gt; operation available with Druid. For this build, the &lt;code&gt;kafka-emitter&lt;/code&gt; extension is included, but others are easy to add.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="k"&gt;RUN &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;    java &lt;span class="nt"&gt;-cp&lt;/span&gt; &lt;span class="s2"&gt;"/opt/druid/lib/*"&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;        &lt;span class="nt"&gt;-Ddruid&lt;/span&gt;.extensions.directory&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"/opt/druid/extensions/"&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;        &lt;span class="nt"&gt;-Ddruid&lt;/span&gt;.extensions.hadoopDependenciesDir&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"/opt/druid/hadoop-dependencies/"&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;        org.apache.druid.cli.Main tools pull-deps &lt;span class="nt"&gt;--no-default-hadoop&lt;/span&gt; &lt;span class="se"&gt;\
&lt;/span&gt;        &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="s2"&gt;"org.apache.druid.extensions.contrib:kafka-emitter"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Why The Difference?
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;Dockerfile&lt;/code&gt; that is part of Apache Druid is all about building the software. But since this is being done after a build is released; its approach is to used the fact that the binaries&lt;br&gt;
are available for download.&lt;/p&gt;

&lt;h2&gt;
  
  
  New To Druid?
&lt;/h2&gt;

&lt;p&gt;If you are new to Druid and want to see what it can do, check out the &lt;code&gt;druid-late&lt;/code&gt; demonstration within &lt;a href="https://github.com/kineticedge/dev-local-demos"&gt;dev-local-demos&lt;/a&gt;. It leverages a container-based ecosystem provided at &lt;a href="https://github.com/kineticedge/dev-local"&gt;dev-local&lt;/a&gt;. Update the &lt;code&gt;.env&lt;/code&gt; file within the druid folder to point to your individually built &lt;code&gt;arm64&lt;/code&gt; image.&lt;/p&gt;

&lt;h2&gt;
  
  
  Reach Out
&lt;/h2&gt;

&lt;p&gt;Please &lt;a href="https://www.kineticedge.io/contact"&gt;contact us&lt;/a&gt; if you would like to talk about online analytic processing or event-streaming.&lt;/p&gt;

</description>
      <category>apachedruid</category>
      <category>docker</category>
      <category>containers</category>
      <category>arm64</category>
    </item>
    <item>
      <title>Showcasing Change Data Capture with Debezium and Kafka</title>
      <dc:creator>Neil Buesing</dc:creator>
      <pubDate>Wed, 07 Sep 2022 12:57:23 +0000</pubDate>
      <link>https://dev.to/nbuesing/showcasing-change-data-capture-with-debezium-and-kafka-30i2</link>
      <guid>https://dev.to/nbuesing/showcasing-change-data-capture-with-debezium-and-kafka-30i2</guid>
      <description>&lt;p&gt;Setting up Change Data Capture with Databases, Apache Kafka, Kafka Connect, and Debezium takes time; with tricky configurations along the way.&lt;/p&gt;

&lt;p&gt;Here we will walk through a setup of all components to showcase what is possible and give the complete picture to give you the pieces to bring this into your project.&lt;/p&gt;

&lt;h2&gt;
  
  
  Real-Time Ecosystem
&lt;/h2&gt;

&lt;p&gt;This demo is available in the &lt;strong&gt;rdbms-cdc-nosql&lt;/strong&gt; folder in &lt;a href="https://github.com/kineticedge/dev-local-demos"&gt;dev-local-demos&lt;/a&gt; project. It leverages applications available through containers in &lt;a href="https://github.com/kineticedge/dev-local"&gt;dev-local&lt;/a&gt;. Within a few minutes, you can see change data captures from relational databases (Postgres, MySQL v8, and MySQL v5) into NoSQL data stores (Mongo, Cassandra, and Elastic).&lt;/p&gt;

&lt;p&gt;Stream enrichment processing is done with ksqlDB.&lt;/p&gt;

&lt;h2&gt;
  
  
  Challenges
&lt;/h2&gt;

&lt;p&gt;There are many nuances in setting up change-data-capture, and doing this within an enterprise organization takes effort and time. &lt;/p&gt;

&lt;p&gt;The challenges address here are an attempt to make this a little be easier by providing you a complete POC demonstration to see the steps needed to make a change-data-capture solution.&lt;/p&gt;

&lt;p&gt;The specific touch-points here covered:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Enabling logging within a database&lt;/li&gt;
&lt;li&gt;Connector setting nuances&lt;/li&gt;
&lt;li&gt;Management of Kafka Connect Secrets&lt;/li&gt;
&lt;li&gt;Logical Data-types&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Database logging
&lt;/h2&gt;

&lt;p&gt;Each database has its own nuances to set up logging. This is critical for any change-data-capture process, and something that needs to be well understood for success. Here are the settings and issues in the configuration of Postgres and MySQL with Debezium for change data capture. This is not a complete overview of all the settings, but rather to provide insight into the complexities that need to be determined between your database operations and development teams. Work with your database administration to enable database logging on the tables that are needed, along with any snapshotting or specific configurations.&lt;/p&gt;

&lt;h3&gt;
  
  
  Postgres and Debezium
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;TL;dr;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;wan_level&lt;/code&gt; must be set to &lt;code&gt;logical&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;If you are running postgres with docker compose, override command to the following:
&lt;/li&gt;
&lt;/ul&gt;

&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;postgres&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;-c&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;wal_level=logical"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;



&lt;ul&gt;
&lt;li&gt;connector needs to enable the &lt;code&gt;pgoutput&lt;/code&gt; plugin.&lt;/li&gt;
&lt;li&gt;Add the following to the connector configuration:
&lt;/li&gt;
&lt;/ul&gt;

&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="nl"&gt;"plugin.path"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"pgoutput"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;




&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;Details&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Postgres needs to be able to capture changes; this is done through the &lt;em&gt;write-ahead-log&lt;/em&gt; (wal).
The amount of data captured is based on the &lt;code&gt;wal_level&lt;/code&gt; settings.
The default setting is &lt;code&gt;replica&lt;/code&gt; but this is insufficient level of data for debezium.
The &lt;code&gt;logical&lt;/code&gt; setting includes replica information and additional logical change sets.&lt;/li&gt;
&lt;li&gt;Debezium has to be configured to use the &lt;code&gt;pgoutput&lt;/code&gt; plugin.
Use the configuration property &lt;code&gt;plugin.name&lt;/code&gt; to set this.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Troubleshooting&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This is a set of errors seen when using Debezium Postgres Source Connector.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;Attempting to start debezium with Postgres, w/out &lt;code&gt;wal_level&lt;/code&gt; properly defined.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Connector configuration is invalid and contains the following 1 error(s):  
Postgres server wal_level property must be \"logical\" but is: replica
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Restarting postgres w/out &lt;code&gt;-c wal_level=logical&lt;/code&gt; will result in postgres failing to start with the following error:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;FATAL:  logical replication slot "debezium" exists, but wal_level &amp;lt; logical
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Starting a connector w/out &lt;code&gt;pgoutput&lt;/code&gt; plugin enabled.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;io.debezium.DebeziumException: Creation of replication slot failed
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If (I should say when) you uncover an error; take the time to document and document it well. Having a deja-vu moment when an error re-surfaces, is not fun.&lt;/p&gt;

&lt;h3&gt;
  
  
  MySql (v8) and Debezium
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;TL;dr;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Version 8 of MySql has logging enabled by default.
In production, however, you do need to verify this with operations.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;Troubleshooting&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;You delete and recreate your MySQL database, but reuse the connector (and the state it persists in Kafka)
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Caused by: io.debezium.DebeziumException: 
Client requested master to start replication from position &amp;gt; file size Error code: 1236; SQLSTATE: HY000
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  MySql (v5) and Debezium
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;TL;dr;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Binlog is not enabled by default, but is rather simple to do, set &lt;code&gt;log-bin&lt;/code&gt; name and configure &lt;code&gt;binlog_format&lt;/code&gt; to &lt;code&gt;row&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;You need to ensure logs are maintained for as longer than any level of reprocessing.&lt;/li&gt;
&lt;li&gt;Debezium expects to resume where it has left off, modifying the logs after debezium has been started can lead to some unexpected errors.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;Details&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Add the following properties to your database's &lt;code&gt;mysql.cnf&lt;/code&gt; file.
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;  &lt;span class="s"&gt;server-id         = &lt;/span&gt;&lt;span class="m"&gt;1&lt;/span&gt;
  &lt;span class="s"&gt;log_bin           = mysql-bin&lt;/span&gt;
  &lt;span class="s"&gt;expire_logs_days  = &lt;/span&gt;&lt;span class="m"&gt;99&lt;/span&gt;
  &lt;span class="s"&gt;binlog_format     = row&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;Troubleshooting&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;you recreate your database but use the same instance of the connector (v5 has same error as v8).
&lt;/li&gt;
&lt;/ul&gt;

&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Caused by: io.debezium.DebeziumException: 
Client requested master to start replication from position &amp;gt; file size Error code: 1236; SQLSTATE: HY000.
&lt;/code&gt;&lt;/pre&gt;



&lt;ul&gt;
&lt;li&gt;MySQL was shut down or became inaccessible. This is a pretty easy fix, just not intuitive.
&lt;/li&gt;
&lt;/ul&gt;

&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Caused by: io.debezium.DebeziumException: Failed to read next byte from position XXXXX
&lt;/code&gt;&lt;/pre&gt;




&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Debezium
&lt;/h3&gt;

&lt;p&gt;Debezium is an excellent change-data-capture open-source product. It provides a lot of feature that make it a powerful tool. I find that having a few working examples make it a lot easier to understand. Here is a few things pointed out as understanding this made it easier to configure them and quickly get to seeing the rewards of change data capture.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;database.server.name&lt;/code&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;This property is not a connection property to the database, but rather the name used to keep this connection uniquely identified. It is used in the topic name generated for the CDC process against the source database. My suggestion is to not pick the type of database as the name (e.g. &lt;em&gt;postgres&lt;/em&gt; or &lt;em&gt;mysql&lt;/em&gt;). Picking a name like this could cause those maintaining the code to believe this name needs to align to the type of the database.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;code&gt;io.debezium.transforms.ExtractNewRecordState&lt;/code&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;By default, Debezium provides nested elements of before, after, and operation. For most use-cases, extracting just the &lt;code&gt;after&lt;/code&gt; state is sufficient, and Debezium provides a Single Message Transform (SMT) to do just that. Nested elements can be tricky if you are not writing stream applications; so allowing the data to be flattened with one simple SMT is very helpful. Using this SMT makes it easier to pull data into &lt;code&gt;ksqlDB&lt;/code&gt; for enrichment.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;Predicates  (Apache Kafka 2.6)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Prior to Apache Kafka 2.6, transformations were unconditional, making a single connector process multiple tables more difficult. By using predicates, a single connector can have different rules for extracting the key from the message.&lt;/li&gt;
&lt;li&gt;A common use of SMTs in Debezium connectors is to pull out the element from the value that is the primary-key, this is to ensure that the events on a given row (primary-key) in the database are processed in order. &lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;code&gt;database.history&lt;/code&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Many connectors allow for metadata related to the connector to be sourced to a different Kafka cluster. This flexibility leads to confusion, especially to developers new to Kafka Connector and a specific Connector.&lt;/li&gt;
&lt;li&gt;Debezium's database history, is designed this way.
You need to set-up bootstrap server, protocol, and any other connection to the kafka cluster to maintain this information, even if it is the same cluster. For enterprise deployments, this flexibility is critical. For proof-of-concepts, development, and trying to get something up and running quickly it is a lot of duplicate configuration.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;code&gt;decimal.handling.mode&lt;/code&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Setting this to &lt;code&gt;string&lt;/code&gt; can address sink connector issues that cannot handle the decimal logical type. The demo code uses &lt;code&gt;ksqlDB&lt;/code&gt; to cast decimal to string, for downstream sinks that need the help, but this is an alternative approach.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Connector Secrets
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Apache Kafka provides a Configuration plugin interface that allows for secrets to be stored separately from configuration. This is also available to a distributed connect cluster, and accessible from the connectors.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;config a file connector as follows &lt;code&gt;org.apache.kafka.common.config.provider.FileConfigProvider&lt;/code&gt; in the settings of the distributed connect cluster.&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;config.providers&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;file&lt;/span&gt;
&lt;span class="py"&gt;config.providers.file.class&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.config.provider.FileConfigProvider&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In addition to leveraging these from your kafka component configurations, they are also accessible from connectors; such as:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"${file:/etc/kafka-connect/secrets/mysql.properties:USERNAME}"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Put more than secrets in this file; if you store the URL connecting string to the database here, but connection strings and other settings that vary between deployment environments. By keeping those removed from the configuration, a single artifact than be published and maintained with your source-code.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="nl"&gt;"connection.url"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"${file:/etc/kafka-connect/secrets/mysql.properties:CONNECTION_URL}"&lt;/span&gt;&lt;span class="err"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="nl"&gt;"connection.user"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"${file:/etc/kafka-connect/secrets/mysql.properties:CONNECTION_USER}"&lt;/span&gt;&lt;span class="err"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="nl"&gt;"connection.password"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"${file:/etc/kafka-connect/secrets/mysql.properties:CONNECTION_PASSWORD}"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Schemas and Data-types
&lt;/h3&gt;

&lt;p&gt;When it comes to data-types, especially those consider to be logical data types in Connect API, not all connectors are the same. If you are doing change data capture, odds are you will have decimal's and timestamps. Fortunately, timestamps are stored as long epoch, which will usually translate into a database even if the logical type is not properly handled. Decimals, however, are stored as byte arrays.If the connector doesn't properly invoke the logical converter, it will not be properly converted for the end system. What makes matter worse, connectors are not consistent on how they handle the errors.&lt;/p&gt;

&lt;h4&gt;
  
  
  Specific Connector Observations
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;Mongo&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In this demonstration, MongoDB Sink Connector properly handles the logical type, and data is stored correctly. The &lt;a href="https://github.com/mongodb/mongo-kafka/blob/1.5.x/src/main/java/com/mongodb/kafka/connect/sink/converter/SchemaRecordConverter.java#L67"&gt;SchemaRecordConverter&lt;/a&gt; properly handles the conversion; but as you can see, the converter has to account for and handle logical-types; it is not being done within the Connect API.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Data shown in MongoDB.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--mF2MhFe6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/cdc/mongo.d71e9dcbe5716af75c0f3762127f45b8772d70c429423d474d2608f5d53bc00b.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--mF2MhFe6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/cdc/mongo.d71e9dcbe5716af75c0f3762127f45b8772d70c429423d474d2608f5d53bc00b.png" alt="all_orders" title="All Orders Shown in Mongo" width="880" height="534"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Elasticsearch&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If elasticsearch sink connector creates the index in elastic (&lt;code&gt;schema.ignore=false&lt;/code&gt;), logical types are handled properly. If the sink connector doesn't create the index (&lt;code&gt;schema.ignore=true&lt;/code&gt;), logical converters are not processes and logical-type decimals will end up as an array of bytes in elasticsearch.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Index generated by the connector. 
In each record the amount is a decimal value. &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--i5-owfHw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/cdc/elastic_all_orders_table.4cce65579f0304d42e6ec7a6fb07bd33da24d5583a6bbe5f8939232b86192ab8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--i5-owfHw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/cdc/elastic_all_orders_table.4cce65579f0304d42e6ec7a6fb07bd33da24d5583a6bbe5f8939232b86192ab8.png" alt="elastic-index-connector-created" title="Elasticsearch connector created index" width="880" height="192"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Index is built manually through elastic API (connector does not create it).
The decimal bytes are passed as-is to the index; yielding in a non-desired result.
Each record is the byte array value (the physical representation of the logical type).&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--vlbeOXUv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/cdc/elastic_all_orders_si_table.bbb273397f89adc07d8d240eb72b800349160697db123b8718767bf3fbe7b30c.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--vlbeOXUv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://www.kineticedge.io/images/blog/cdc/elastic_all_orders_si_table.bbb273397f89adc07d8d240eb72b800349160697db123b8718767bf3fbe7b30c.png" alt="elastic-index-elastisearch-created" title="Elasticsearch index created by elastic" width="880" height="190"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The amounts shown in the Kibana screenshot of &lt;code&gt;HUM=&lt;/code&gt;, &lt;code&gt;G2Q=&lt;/code&gt;, and &lt;code&gt;FIA=&lt;/code&gt; are actually the physical byte arrays converted to strings.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Cassandra&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The &lt;strong&gt;Datastax Cassandra Sink Connector&lt;/strong&gt; does not handle the decimal logical-type correctly. What makes matter worse, is the conversion is a warning that shows up in the connect cluster log. No data is written, and the connector keeps running. When checking the log, you can see:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;WARN Error decoding/mapping Kafka record ... : Codec not found for requested operation: 
[DECIMAL &amp;lt;-&amp;gt; java.nio.ByteBuffer] (com.datastax.oss.kafka.sink.CassandraSinkTask) 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  For every sink connector you plan to use:
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Be sure to test with decimals, timestamps, &amp;amp; dates; unless that truly isn't in your use case. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Don't simplify your POC. For example, don't let elasticsearch sink connector create your indexes in your POC, if that is not possible in production.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Watch the logs and check for &lt;code&gt;WARN&lt;/code&gt; or even &lt;code&gt;INFO&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If you have issues with logical types, you can have Debezium use strings for decimals (&lt;code&gt;decimal.handling.mode=string&lt;/code&gt;), or you can leverage a stream processor (e.g. ksqlDB) to cast a decimal to a string.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Takeaways
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Enabling database logging can be tricky and each database has unique ways of configuration and enabling. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Having an end-to-end proof of concept showcasing change-data-capture is a great way to get developer buy-in and involvement, but plan appropriate time working with your database operational team to get it enabled in the enterprise.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Leverage Apache Kafka's Config Provider for secrets and environment specific differences, such as the database connection string. The ability to check in a single configuration artifact into revision control w/out specifics to a given environment, is a great benefit.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Validate decimals, dates, and timestamps, as not all connectors handle them correctly.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Check out &lt;a href="https://github.com/kineticedge/dev-local"&gt;dev-local&lt;/a&gt; project and the demo in &lt;code&gt;rdbms-cdc-nosql&lt;/code&gt; in &lt;a href="https://github.com/kineticedge/dev-local-demos"&gt;dev-local-demos&lt;/a&gt;. The specifics discussed here is based on this demonstration, and scripts are there to get you to start observing change-data-capture with Debezium and Kafka within minutes.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Reach out
&lt;/h2&gt;

&lt;p&gt;Please &lt;a href="https://www.kineticedge.io/contact"&gt;contact us&lt;/a&gt; if you have improvements, want clarification, or just want to talk streaming.&lt;/p&gt;

</description>
      <category>kakfa</category>
      <category>debezium</category>
      <category>changedatacapture</category>
      <category>kafkaconnect</category>
    </item>
    <item>
      <title>Building a Mix Protocol Apache Kafka Cluster</title>
      <dc:creator>Neil Buesing</dc:creator>
      <pubDate>Mon, 01 Jun 2020 21:51:59 +0000</pubDate>
      <link>https://dev.to/nbuesing/building-a-mix-protocol-apache-kafka-cluster-506m</link>
      <guid>https://dev.to/nbuesing/building-a-mix-protocol-apache-kafka-cluster-506m</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Goal: build a multi-protocol Apache Kafka Clusters for &lt;strong&gt;SSL Client Authentication&lt;/strong&gt; for all clients while leveraging &lt;strong&gt;PLAINTEXT&lt;/strong&gt; for inter broker communication.&lt;/p&gt;

&lt;p&gt;There are many tutorials and articles on setting up Apache Kafka Clusters with different security options. However, I have not seen a mix-protocol cluster with &lt;strong&gt;SSL&lt;/strong&gt; for encryption and client authentication and &lt;strong&gt;PLAINTEXT&lt;/strong&gt; for broker communication; so I decided to build one.&lt;/p&gt;

&lt;h2&gt;
  
  
  tl;tr
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Setup broker configuration of &lt;code&gt;super.users&lt;/code&gt; for &lt;strong&gt;SSL&lt;/strong&gt; authenticated users, not for &lt;strong&gt;PLAINTEXT&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;Start brokers with &lt;strong&gt;SSL&lt;/strong&gt; inter broker communication.&lt;/li&gt;
&lt;li&gt;create full-access ACLs for user &lt;code&gt;User:ANONYMOUS&lt;/code&gt;, but &lt;em&gt;only&lt;/em&gt; for the IPs of your brokers.&lt;/li&gt;
&lt;li&gt;Change brokers to use &lt;strong&gt;PLAINTEXT&lt;/strong&gt; inter broker communication.&lt;/li&gt;
&lt;li&gt;Do a rolling restart.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Challenges
&lt;/h2&gt;

&lt;p&gt;The main challenges in setting up the cluster were: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Kafka broker authorizers are not protocol specific.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Unlike &lt;strong&gt;SASL&lt;/strong&gt;, &lt;strong&gt;SSL&lt;/strong&gt; authorization cannot be done over &lt;strong&gt;PLAINTEXT&lt;/strong&gt;; it has to be over &lt;strong&gt;SSL&lt;/strong&gt; encryption.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;SSL Certificates are tricky, and those were addressed in &lt;a href="https://www.buesing.dev/post/kafka-ssl-certificates/"&gt;What I learned about SSL Certificates when building a Secured Kafka Cluster&lt;/a&gt;. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;While ACLs can allow resource access to a given principal from a given set of hosts, the broker's &lt;code&gt;super.users&lt;/code&gt; is only a principal&lt;br&gt;
setting.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Configuration
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;SSL&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Each broker needs to be configured with the truststore and keystore referencing the appropriate jks files.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    ssl.key.credentials = kafka.key
    ssl.keystore.credentials = kafka.key
    ssl.keystore.filename = broker-{ID}.keystore.jks
    ssl.truststore.credentials = kafka.key
    ssl.truststore.filename = kafka.server.truststore.jks
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: &lt;a href="https://github.com/confluentinc/cp-docker-images"&gt;Confluent platform docker images&lt;/a&gt; has minor changes over a standard&lt;br&gt;
  deployment; as in the use of &lt;code&gt;ssl.*.credentials&lt;/code&gt; and files with those credentials instead of &lt;code&gt;ssl.*.passwords&lt;/code&gt;.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Protocols and Listeners&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The settings for protocols and listeners are standard. Both &lt;strong&gt;SSL&lt;/strong&gt; and &lt;strong&gt;PLAINTEXT&lt;/strong&gt; are needed.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs#:~:text=listener%2Esecurity%2Eprotocol%2Emap,-%3A"&gt;listener.security.protocol.map&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The default value for this broker property is just fine, but if you prefer to only have configured what is needed; &lt;br&gt;
  consider setting it to:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;```
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL
```
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs#:~:text=advertised%2Elisteners,-%3A"&gt;advertised.listeners&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Each broker needs to established its list of listeners and advertises them in a way that the clients and other brokers can &lt;br&gt;
  directly access them. Standard 9092 and 9093 ports for &lt;code&gt;PLAINTEXT&lt;/code&gt; and &lt;code&gt;SSL&lt;/code&gt; used respectively.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  advertised.listeners = PLAINTEXT://broker-{ID}:9092,SSL://broker-{ID}:9093
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Authorizer&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs#:~:text=authorizer%2Eclass%2Ename,-%3A"&gt;authorizer.class.name&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;To enable &lt;a href="https://kafka.apache.org/documentation/#security_authz"&gt;client authorization&lt;/a&gt;, set &lt;code&gt;authorizer.class.name&lt;/code&gt;&lt;br&gt;
  to &lt;code&gt;kafka.security.authorizer.AclAuthorizer&lt;/code&gt;, the provided implementation of &lt;code&gt;org.apache.kafka.server.authorizer.Authorizer&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  authorizer.class.name = kafka.security.authorizer.AclAuthorizer
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;SSL Client Authentication&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs#:~:text=ssl%2Eclient%2Eauth,-%3A"&gt;ssl.client.auth&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Set &lt;code&gt;ssl.client.auth&lt;/code&gt; to &lt;code&gt;required&lt;/code&gt; or &lt;code&gt;requested&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  ssl.client.auth = required
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;While you could set this to &lt;code&gt;requested&lt;/code&gt; that would still allow SSL clients to not support client authentication. Unless&lt;br&gt;
  you have a specific reason to need to do this, make sure you set this to &lt;code&gt;required&lt;/code&gt;.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;No ACL Found&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://kafka.apache.org/documentation/#:~:text=allow%2Eeveryone%2Eif%2Eno%2Eacl%2Efound"&gt;allow.everyone.if.no.acl.found&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Be sure to exclude the property &lt;code&gt;allow.everyone.if.no.acl.found&lt;/code&gt; or set to &lt;code&gt;false&lt;/code&gt;.  If you set this to &lt;code&gt;true&lt;/code&gt;,&lt;br&gt;
  it allows full access to a resource if no ACLs are found.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  allow.everyone.if.no.acl.found = false
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Super Users&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs#:~:text=super%2Eusers"&gt;super.users&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Do not add &lt;code&gt;User:ANONYMOUS&lt;/code&gt; to &lt;code&gt;super.users&lt;/code&gt;.  There is no way to restrict privileges to a given host. Instead, only add the &lt;br&gt;
  users of the &lt;code&gt;SSL&lt;/code&gt; certificates for your brokers and the user you will use to create ACLs. This ensures that the cluster &lt;br&gt;
  is always secure. In addition to each broker's certificate added here; add in the root certificate so &lt;code&gt;kafka-acls&lt;/code&gt; can be used to create the proper ACL.  &lt;/p&gt;

&lt;p&gt;The &lt;code&gt;super.users&lt;/code&gt; configuration it should not be used for &lt;br&gt;
  authentication for &lt;strong&gt;PLAINTEXT&lt;/strong&gt;. When SSL authorization is enabled, the user presented over &lt;strong&gt;PLAINTEXT&lt;/strong&gt; will always be &lt;code&gt;User:ANONYMOUS&lt;/code&gt;.&lt;br&gt;
  If you add this user to &lt;code&gt;super.users&lt;/code&gt; anyone will be able to access your cluster (unless network restrictions prevent it).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  super.users = User:CN=root;User:CN=broker-1;User:CN=broker-2;User:CN=broker-3;User:CN=broker-4
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Inter Broker Protocol&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;&lt;a href="https://kafka.apache.org/documentation/#brokerconfigs#:~:text=security%2Einter%2Ebroker%2Eprotocol,-%3A"&gt;security.inter.broker.protocol&lt;/a&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This is a tricky setting. First, start the cluster with &lt;strong&gt;SSL&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;security.inter.broker.protocol = SSL&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Once the ACL for the brokers is created it change it to &lt;strong&gt;PLAINTEXT&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;security.inter.broker.protocol = PLAINTEXT&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;The cluster will need to be restarted (following a proper rolling restart) for the change to take effect.&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating the Secure Cluster
&lt;/h2&gt;

&lt;p&gt;With all these pieces in place, the process is straightforward.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Start Cluster&lt;/li&gt;
&lt;li&gt;Create ACLs for a given superuser, &lt;code&gt;User:ANONYMOUS&lt;/code&gt; only from IPs of brokers.&lt;/li&gt;
&lt;li&gt;Change inter broker protocol on all the brokers from &lt;code&gt;SSL&lt;/code&gt; to &lt;code&gt;PLAINTEXT&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Perform a rolling restart of the cluster&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;An alternate approach considered was temporarily adding &lt;strong&gt;User:ANONYMOUS&lt;/strong&gt; to the &lt;code&gt;super.users&lt;/code&gt;, and remove it after the ACLs&lt;br&gt;
are configured; to use this approach, ensure the cluster is not accessible until &lt;code&gt;User:ANONYMOUS&lt;/code&gt; is part of &lt;code&gt;super.users&lt;/code&gt;. &lt;/p&gt;
&lt;h2&gt;
  
  
  Example Cluster
&lt;/h2&gt;

&lt;p&gt;The &lt;a href="https://github.com/nbuesing/kafka-ssl-cluster"&gt;Kafka SSL Cluster&lt;/a&gt; repository provides an example cluster. It deploys on &lt;br&gt;
a single machine leveraging &lt;code&gt;docker-compose&lt;/code&gt;. It showcases this configuration, it is not a production-ready cluster.&lt;/p&gt;

&lt;p&gt;See Confluent's &lt;a href="https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html"&gt;quickstart documentation&lt;/a&gt; to understand&lt;br&gt;
the differences in some of the configurations leveraging the Confluent Platform docker images.&lt;/p&gt;

&lt;p&gt;Use the &lt;code&gt;jumphost&lt;/code&gt; container to access the cluster.  Scripts to easily create the ACLs and test the cluster are mounted to this &lt;br&gt;
container.&lt;/p&gt;

&lt;p&gt;Connect to that cluster using &lt;code&gt;docker exec&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it kafka_jumphost bash
cd /opt/bin
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Example Code
&lt;/h4&gt;

&lt;p&gt;An Example configuration is available at the GitHub repository &lt;a href="https://github.com/nbuesing/kafka-ssl-cluster"&gt;Kafka SSL Cluster&lt;/a&gt;.&lt;br&gt;
It includes scripts to create certificates, script to swap changing protocol from &lt;strong&gt;SSL&lt;/strong&gt; to &lt;strong&gt;PLAINTEXT&lt;/strong&gt;, a &lt;br&gt;
rolling restart script, and a dashboard. Please see its documentation for additional details.&lt;/p&gt;

&lt;p&gt;To run commands to create ACLs, you will need a configuration like the following.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;security.protocol=SSL
ssl.key.password=dev_cluster_secret
ssl.keystore.location=/certs/root.keystore.jks
ssl.keystore.password=dev_cluster_secret
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=dev_cluster_secret
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Super User ACL
&lt;/h4&gt;

&lt;p&gt;Once the cluster is up and running with the security inter broker protocol of &lt;strong&gt;SSL&lt;/strong&gt;, create an ACL based &lt;strong&gt;superuser&lt;/strong&gt;&lt;br&gt;
that can only be used by the hosts of the brokers.  With this, the unauthenticated user of &lt;code&gt;User:ANONYMOUS&lt;/code&gt; has full operational&lt;br&gt;
access to the cluster, but only from the hosts hosting the brokers.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-acls \
 --bootstrap-server broker-1:9093,broker-2:9093,broker-3:9093,broker-4:9093 \
 --command-config ./config/adminclient-config.conf \
 --add \
 --force \
 --allow-principal User:ANONYMOUS \
 --allow-host 10.5.0.101 \
 --allow-host 10.5.0.102 \
 --allow-host 10.5.0.103 \
 --allow-host 10.5.0.104 \
 --operation All \
 --topic '*' \
 --cluster
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Performance Testing
&lt;/h2&gt;

&lt;p&gt;Now that we know it is feasible, is a mixed protocol cluster worth it? While using &lt;code&gt;docker-compose&lt;/code&gt; to run a cluster on &lt;br&gt;
one machine is not the best way to property test; it does give insights on where it could lead.&lt;/p&gt;

&lt;h4&gt;
  
  
  acks=1
&lt;/h4&gt;

&lt;p&gt;With &lt;code&gt;acks=1&lt;/code&gt; theoretically, I would expect no change. However, with less encryption going on between brokers, the CPU&lt;br&gt;
load would go down, so a small improvement is expected. My tests show a &lt;strong&gt;7%&lt;/strong&gt; improvement.&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;security.inter.broker.protocol&lt;/th&gt;
&lt;th&gt;records/sec&lt;/th&gt;
&lt;th&gt;MB/sec&lt;/th&gt;
&lt;th&gt;ms avg latency&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;SSL&lt;/td&gt;
&lt;td&gt;29,338.56&lt;/td&gt;
&lt;td&gt;27.98&lt;/td&gt;
&lt;td&gt;970.06&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;PLAINTEXT&lt;/td&gt;
&lt;td&gt;31,512.17&lt;/td&gt;
&lt;td&gt;30.06&lt;/td&gt;
&lt;td&gt;926.85&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;h4&gt;
  
  
  acks=all
&lt;/h4&gt;

&lt;p&gt;With &lt;code&gt;acls=all&lt;/code&gt; a larger improvement is expected. This is because the time for the insync replicates to replicate should be faster. &lt;br&gt;
The test shows a &lt;strong&gt;44%&lt;/strong&gt; improvement; which is greater than what I was expecting. I will need to move this to a realistic &lt;br&gt;
configuration and do additional testing. This result is rather promising and justifies doing more realistic cluster testing.&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;security.inter.broker.protocol&lt;/th&gt;
&lt;th&gt;records/sec&lt;/th&gt;
&lt;th&gt;MB/sec&lt;/th&gt;
&lt;th&gt;ms avg latency&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;SSL&lt;/td&gt;
&lt;td&gt;12,097.04&lt;/td&gt;
&lt;td&gt;11.54&lt;/td&gt;
&lt;td&gt;2,557.13&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;PLAINTEXT&lt;/td&gt;
&lt;td&gt;17,418.82&lt;/td&gt;
&lt;td&gt;16.62&lt;/td&gt;
&lt;td&gt;1,732.00&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;h3&gt;
  
  
  The Test
&lt;/h3&gt;

&lt;p&gt;Here is the performance script I ran with the ACLs generated for the topic.&lt;/p&gt;

&lt;h4&gt;
  
  
  performance script
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  export BOOTSTRAP_SERVERS=broker-1:9093,broker-2:9093,broker-3:9093,broker-4:9093

  kafka-producer-perf-test \
    --num-records 500000 \
    --record-size 1000 \
    --throughput -1 \
    --producer.config ./config/producer-config.conf \
    --producer-props \
      bootstrap.servers=${BOOTSTRAP_SERVERS} \
      acks=all \
      request.timeout.ms=60000 \
      retries=2147483647 \
    --topic $1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  ACLs
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  kafka-acls \
   --bootstrap-server broker-1:9093,broker-2:9093,broker-3:9093,broker-4:9093 \
   --command-config ./config/adminclient-config.conf \
   --add \
   --allow-principal User:CN=jumphost \
   --allow-host '*' \
   --operation READ \
   --operation WRITE \
   --topic $1

  kafka-acls \
   --bootstrap-server broker-1:9093,broker-2:9093,broker-3:9093,broker-4:9093 \
   --command-config ./config/adminclient-config.conf \
   --add \
   --allow-principal User:CN=jumphost \
   --allow-host '*' \
   --operation ALL \
   --group '*' 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;I hope this gives you some insights into Kafka security and possible configurations you could explore for performance&lt;br&gt;
improvements. For me it was the journey to create a cluster configuration I hadn't seen before; giving me the opportunity&lt;br&gt;
to fully understand the security configurations of Apache Kafka.&lt;/p&gt;

&lt;h2&gt;
  
  
  Additional Information
&lt;/h2&gt;

&lt;p&gt;This article was originally published on my professional blog at &lt;a href="https://www.buesing.dev/post/kafka-ssl-mix-protocol/"&gt;buesing.dev&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>security</category>
      <category>ssl</category>
      <category>acls</category>
    </item>
    <item>
      <title>What I learned about SSL Certificates when building a Secured Kafka Cluster</title>
      <dc:creator>Neil Buesing</dc:creator>
      <pubDate>Wed, 08 Apr 2020 11:31:06 +0000</pubDate>
      <link>https://dev.to/nbuesing/what-i-learned-about-ssl-certificates-when-building-a-secured-kafka-cluster-1g9</link>
      <guid>https://dev.to/nbuesing/what-i-learned-about-ssl-certificates-when-building-a-secured-kafka-cluster-1g9</guid>
      <description>&lt;p&gt;To build a multi-protocol Apache Kafka Clusters to allow for &lt;strong&gt;SSL Client Authentication&lt;/strong&gt; with&lt;br&gt;
&lt;strong&gt;PLAINTEXT&lt;/strong&gt; for inter broker communication, I needed to generate both broker and client SSL certificates.&lt;br&gt;
There were many interesting things I learned in this process and wanted to share them.&lt;/p&gt;

&lt;p&gt;An upcoming article will be using these certificates for setting up my Secured Apache Kafka Cluster.&lt;/p&gt;
&lt;h3&gt;
  
  
  Disclaimer
&lt;/h3&gt;

&lt;p&gt;I build certificates to explore the security available in various systems; in this particular scenario, it is&lt;br&gt;
to explore a mixed protocol Kafka Cluster. Please seek advice from your security teams when creating certificates. &lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Introduction&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;If you only need to create certificates as part of an enterprise setup, consider actively supported&lt;br&gt;
projects such as Confluent's &lt;a href="https://docs.confluent.io/current/installation/cp-ansible/index.html"&gt;Ansible Playbooks&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;There are excellent articles on SSL Certificates, including Confluent's &lt;a href="https://docs.confluent.io/current/kafka/authentication_ssl.html"&gt;documentation&lt;/a&gt;&lt;br&gt;
and an excellent &lt;a href="https://docs.confluent.io/current/security/security_tutorial.html#generating-keys-certs"&gt;tutorial&lt;/a&gt;.&lt;br&gt;
The documentation and tutorial focuses on &lt;strong&gt;SASL&lt;/strong&gt; client-side authentication (&lt;strong&gt;SASL_SSL&lt;/strong&gt; protocol) &lt;br&gt;
and I uncovered some unique challenges with client-side &lt;strong&gt;SSL&lt;/strong&gt; authentication.&lt;/p&gt;

&lt;p&gt;My experience in certificate generation is based on using &lt;strong&gt;OpenSSL&lt;/strong&gt;. There are other means of generating certificates,&lt;br&gt;
so if another means is used for certificate creation, some of these highlights may not apply to you.&lt;/p&gt;

&lt;p&gt;My full scripts are in the repository &lt;a href="https://github.com/nbuesing/kafka-ssl-cluster"&gt;Kafka SSL Cluster&lt;/a&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Naming&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Your certificate &lt;strong&gt;subject&lt;/strong&gt; must start with /, and should contain at minimum a &lt;strong&gt;CN&lt;/strong&gt; element, as in &lt;strong&gt;/CN=${hostname}&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;For broker certificates, have the &lt;strong&gt;CN&lt;/strong&gt; matches the hostname, and then leverage Subject Alternate Names for&lt;br&gt;
ensuring the certificate can be used with and without domain name successfully.&lt;/p&gt;

&lt;p&gt;There are no wildcard options with the standard Apache Kafka Authorizer, &lt;code&gt;kafka.security.authorizer.AclAuthorizer&lt;/code&gt;.&lt;br&gt;
So if you a complete subject of common name (CN), organization unit (OU), organization (OU), locality (L), state (S),&lt;br&gt;
and country (C) the full name is what will be used as the User in ACLs.&lt;/p&gt;

&lt;p&gt;This: &lt;code&gt;User:CN=broker-1,OU=KAFKA,O=COMPANY,L=CITY,ST=MN,C=US&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Not This: &lt;code&gt;User:CN=broker-1&lt;/code&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;JKS key and keystore passwords should be the same&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;When creating Java key-stores (.jks files) for your JVMs, make sure the keystore password is the same as the key password.&lt;br&gt;
This is due to limitations of the &lt;strong&gt;SunX509 KeyManagerFactory&lt;/strong&gt;.  Java Secure Socket Extension (JSSE) &lt;a href="https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-65A7A023-AE02-4A95-8210-386AE6F18EB5"&gt;Reference Guide&lt;/a&gt;.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;All keys in the KeyStore must be protected by the same password &lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;I have found a few other documented cases of this with other systems, the most recent is with &lt;a href="https://confluence.atlassian.com/bitbucketserverkb/bitbucket-server-fails-to-start-with-ssl-java-security-unrecoverablekeyexception-cannot-recover-key-814205872.html"&gt;BitBucket&lt;/a&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Extensions&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Be sure the certificates have the proper x509 extensions.&lt;/p&gt;
&lt;h3&gt;
  
  
  &lt;strong&gt;Basic Contraints extension for CA certificate&lt;/strong&gt;
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;basicConstraints=CA:TRUE,pathlen:0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;If you setup SSL certificates for client authentication, the CA certificate needs the following x509 extension indicating it is&lt;br&gt;
indeed a CA certificate. Without this extension, it will not authenticate client certificates.&lt;/p&gt;

&lt;p&gt;For additional information, see &lt;a href="https://www.openssl.org/docs/man1.0.2/man5/x509v3_config.html"&gt;x509v3 config&lt;/a&gt;.&lt;/p&gt;
&lt;h3&gt;
  
  
  &lt;strong&gt;Subject Alternate Name Extension for Brokers&lt;/strong&gt;
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;subjectAltName=DNS:${i}, DNS:${i}.${DOMAIN}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;When it comes to SSL Certificates for encryption, it is important that it can have extensions to handle being used with and without&lt;br&gt;
domain names.&lt;/p&gt;
&lt;h3&gt;
  
  
  &lt;strong&gt;Extended Key Usage Extension (for all)&lt;/strong&gt;
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;extendedKeyUsage=serverAuth,clientAuth
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;The public key a certificate had to be noted that it can be used for server and client authentication. For my cluster,&lt;br&gt;
I configured all certificates so they can be used for both server and client authentication.  I added both to all &lt;br&gt;
certificates.&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Extensions are not Copied from Request to Certificate&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;By default, extensions added to a request is not propagated to the certificate.  In order for extensions to be copied, &lt;br&gt;
you would need &lt;code&gt;copy_extensions = copy&lt;/code&gt; added to your &lt;code&gt;openssl.cfg&lt;/code&gt;. I was unsuccessful in getting this to work in &lt;br&gt;
MacOS so I updated my scripts to explicitly add extensions when certificate requests are signed. &lt;/p&gt;

&lt;p&gt;It is a good reminder if you create certificate requests for another group to sign, be sure to indicate extensions you&lt;br&gt;
need in your certificates.&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Check Status after every call to OpenSSL&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;If you write scripts, make sure you check return status codes after each command and fail on error.  I have spent many hours&lt;br&gt;
troubleshooting script errors only to find out that something failed much earlier in the process.&lt;/p&gt;

&lt;p&gt;After each command, add the following. Scripts are finicky, especially with loops. So double-check the error handling &lt;br&gt;
works as well. This is the type of check I do after every &lt;code&gt;openssl&lt;/code&gt; and every other command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[ $? -eq 1 ] &amp;amp;&amp;amp; echo "failure" &amp;amp;&amp;amp; exit
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Intermediate Certificates&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Most enterprises do not sign machine or applications certificates with their top-level CA certificate; they use an &lt;br&gt;
intermediate certificate. Using an intermediate certificate requires a certificate chain when creating the trust-store. &lt;br&gt;
Personally, I use intermediate certificates, so it is a reminder on how to properly chain them when creating the pkcs12 file.&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;The openSSL configuration file&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;The configuration file of OpenSSL, &lt;strong&gt;openssl.cnf&lt;/strong&gt; can vary greatly between Unix systems and even between Linux distributions.&lt;/p&gt;

&lt;p&gt;For every command of &lt;strong&gt;openssl&lt;/strong&gt; that uses &lt;code&gt;openssl.cfg&lt;/code&gt; provide a configuration file; otherwise, a default file will be used&lt;br&gt;
and it varies greatly between OS distributions. To avoid confusing on how &lt;strong&gt;OpenSSL&lt;/strong&gt; executes your request, I explicitly&lt;br&gt;
provide the &lt;code&gt;openssl.cfg&lt;/code&gt; to use. I keep this file with my scripts. Furthermore, if you are scripting your certificate&lt;br&gt;
process, leverage inline files to provide a custom configuration file to each specific command execution; it is then&lt;br&gt;
self-documenting too.&lt;/p&gt;

&lt;p&gt;An example of inlining a file concatenation.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-config &amp;lt;(cat ./openssl.cnf &amp;lt;(printf "\n[ext]\nbasicConstraints=CA:TRUE,pathlen:0"))
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;Check your OpenSSL documentation&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;OpenSSL varies between Unix variants and even between Linux distributions; please read the manual pages for your specific&lt;br&gt;
version and release of &lt;code&gt;openssl&lt;/code&gt;. Also, there could be a bug or limitation with the instance of  &lt;code&gt;openssl&lt;/code&gt; you are using.&lt;br&gt;
For example, I was not successful leveraging copy_extensions on MacOS.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Conclusion&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;What I have documented here were the major pieces I had to figure out when building certificates for client authentication&lt;br&gt;
for my Kafka Cluster. It wouldn't surprise me if there are other things I could uncover, especially limiting extensions&lt;br&gt;
only between the broker certificates and client certificates.&lt;/p&gt;

&lt;p&gt;If you want a glimpse to the scripts that I will use in part 2 of this blog where I configure my Secured (Mix-Protocol)&lt;br&gt;
Kafka Cluster, you find them all at &lt;a href="https://github.com/nbuesing/kafka-ssl-cluster"&gt;Kafka SSL Cluster&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>security</category>
      <category>ssl</category>
    </item>
  </channel>
</rss>
