<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Balázs Czoma</title>
    <description>The latest articles on DEV Community by Balázs Czoma (@bczoma).</description>
    <link>https://dev.to/bczoma</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F404744%2F7247fd7c-9468-49e6-9e86-2a15c70ce548.png</url>
      <title>DEV Community: Balázs Czoma</title>
      <link>https://dev.to/bczoma</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/bczoma"/>
    <language>en</language>
    <item>
      <title>How the PubSub+ Connector for Kafka Enables Fine-Grained Filtering, Protocol Conversion, and Scalable Delivery</title>
      <dc:creator>Balázs Czoma</dc:creator>
      <pubDate>Wed, 17 Jun 2020 16:02:48 +0000</pubDate>
      <link>https://dev.to/bczoma/how-the-pubsub-connector-for-kafka-enables-fine-grained-filtering-protocol-conversion-and-scalable-delivery-dc</link>
      <guid>https://dev.to/bczoma/how-the-pubsub-connector-for-kafka-enables-fine-grained-filtering-protocol-conversion-and-scalable-delivery-dc</guid>
      <description>&lt;p&gt;Kafka is often used to aggregate massive amounts of log data and stream it to analytics engines and big data repositories. In a scenario of processing large amount of collected data, the analysis may result in actionable events that need to be delivered to specific destinations. Using only Kafka, it’s hard to achieve scalable, targeted distribution to many individual consumers. For more information on this, I will refer you to &lt;a href="https://solace.com/blog/beyond-kafka-2-flexible-event-filtering/"&gt;part 2&lt;/a&gt; of Ken Barr’s &lt;em&gt;“Why You Need to Look Beyond Kafka for Operational Use Cases”&lt;/em&gt; series.&lt;/p&gt;

&lt;p&gt;Especially in IoT, consumers and the network infrastructure may not have the throughput and processing capability of picking out relevant events from a stream, so delivery must be filtered and targeted for efficiency. A good example is the case of collecting and analyzing real-time vehicle mechanical &lt;a href="https://solace.com/blog/public-transport-in-smart-cities/"&gt;IoT data from a fleet of city buses&lt;/a&gt; and reacting to results by delivering commands back to individual buses. &lt;/p&gt;

&lt;p&gt;The &lt;a href="https://solace.com/products/platform/"&gt;Solace PubSub+ Platform&lt;/a&gt; inherently helps to overcome this limitation. The goal of this blog is to present a practical solution and to demonstrate how to integrate the Kafka platform with the PubSub+ &lt;a href="https://solace.com/what-is-an-event-mesh/"&gt;event mesh&lt;/a&gt; (a network of PubSub+ &lt;a href="https://solace.com/what-is-an-event-broker/"&gt;event brokers&lt;/a&gt;) using the &lt;a href="https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink"&gt;PubSub+ Connector for Kafka: Sink&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;In the solution, one will notice that Kafka topics are coarse-grained and cannot be used to address individual vehicles, but PubSub+ allows fine-grained filtering because topics aren’t actually configured on the broker – they are defined by the publishing application. PubSub+ topics can be thought of as a property of a message, or metadata, and not like “writing to a file” or “sending to a destination.” Each and every message could be published with a unique topic. For more information on Solace topics vs. Kafka topics, read &lt;a href="https://solace.com/blog/solace-topics-vs-kafka-topics/"&gt;this blog post&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The &lt;a href="https://github.com/SolaceProducts/pubsubplus-connector-kafka-source"&gt;Solace PubSub+ Kafka Source Connector&lt;/a&gt; and &lt;a href="https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink"&gt;PubSub+ Kafka Sink Connector&lt;/a&gt; provide easy integration between events to/from the PubSub+ event mesh and Kafka records.&lt;/p&gt;

&lt;h1&gt;
  
  
  Customizing the Data Conversion Code in “Processors”
&lt;/h1&gt;

&lt;p&gt;As the &lt;a href="https://docs.confluent.io/current/connect/devguide.html"&gt;Kafka Connector Developer Guide&lt;/a&gt; suggests, PubSub+ event messages must be modeled as a partitioned stream to Kafka records and vice-versa. The PubSub+ Connectors have been architected such that this mapping is located in Processors (MessageProcessors in the Source Connector and RecordProcessors in the Sink Connector). This approach enables developers to easily customize the data conversion code to their specific application needs and the rest is taken care of by the PubSub+ Connector code, making use of the tunable, high-performance Solace-Java messaging API.&lt;/p&gt;

&lt;p&gt;The Source and Sink Connector projects include sample Processors: simple ones that just map PubSub+ message contents to Kafka record values, and more advanced ones that can additionally map message properties to Kafka records keys.&lt;/p&gt;

&lt;p&gt;As an example, the following scenario is easy to implement with the Kafka Sink Connector using the DynamicDestinations record processor. DynamicDestinations makes use of a unique Solace connector feature that supports dynamic generation of topics that overrides the static topics in the connector configuration. The source code is available from the &lt;a href="https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink/blob/master/src/main/java/com/solace/connector/kafka/connect/sink/recordprocessor/SolDynamicDestinationRecordProcessor.java"&gt;Sink Connector GitHub location&lt;/a&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Example: IoT Data from Public Transport
&lt;/h1&gt;

&lt;p&gt;In our city bus IoT scenario – similar to &lt;a href="https://solace.com/resources/solace-with-kafka/solace-kafka-iot-demo"&gt;this demo&lt;/a&gt; – analytics produce Kafka records that may represent commands to a fleet of buses, each listening to a PubSub+ topic structured as &lt;code&gt;ctrl/bus/&amp;lt;bus-id&amp;gt;/&amp;lt;command&amp;gt;&lt;/code&gt;. In this case, a command of &lt;em&gt;“stop”&lt;/em&gt; may mean an emergency request to take the bus out of service and &lt;em&gt;“start”&lt;/em&gt; may mean allowing the bus back into service.&lt;/p&gt;

&lt;p&gt;Below is an example of fine-grained filtered delivery to consumers. With the Kafka Sink Connector deployed (1.), a single Kafka topic can dynamically create any destination topics as PubSub+ event message attributes, individually routing events to the correct physical destination (2.).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--SNzl_wlR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/yvda4khb94m7nlfxvxky.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--SNzl_wlR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/yvda4khb94m7nlfxvxky.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this scenario, it is possible to add any number of new destinations (i.e. buses) anytime, without addition of a new Kafka topic or any other change in the systems. Note that once the message is in the PubSub+ event mesh, it is converted to MQTT for sending to the appropriate bus without the need for an additional adapter.&lt;/p&gt;

&lt;h1&gt;
  
  
  Build a Kafka Source/Sink Connector Demo
&lt;/h1&gt;

&lt;p&gt;The following main components will be required for this demo (all networked):&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;PubSub+ Event Broker&lt;/li&gt;
&lt;li&gt;Kafka server&lt;/li&gt;
&lt;li&gt;PubSub+ Connector for Kafka: Sink deployed&lt;/li&gt;
&lt;li&gt;MQTT consumers, simulating individual buses in a fleet&lt;/li&gt;
&lt;li&gt;Kafka console producer, simulating an analytics engine writing command records&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For this demo we will use a free-tier of the &lt;a href="https://solace.com/products/event-broker/cloud/"&gt;PubSub+ Event Broker: Cloud service&lt;/a&gt;, with all other components locally deployed.&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 1: Get access to a PubSub+ Event Broker
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://console.solace.cloud/login/new-account"&gt;Sign up&lt;/a&gt; for a free service PubSub+ Event Broker: Cloud – &lt;a href="https://solace.com/resources/videos/getting-started-with-pubsub-cloud-2"&gt;this video&lt;/a&gt; provides a quick walkthrough, if needed.&lt;/p&gt;

&lt;p&gt;Once the service is active, obtain connection details for the connecting clients:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;For one, the Kafka Sink Connector needs to connect to PubSub+ using the Solace Java API. The connection details include Username, Password, Message VPN, and Secured SMF Host (we are going to use TLS).&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--73EkktrI--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/h6is1cclusslgqmbt69b.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--73EkktrI--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/h6is1cclusslgqmbt69b.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;On the other end, the MQTT clients will connect using MQTT connections. The connection details will include Username, Password, and Secured MQTT Host.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--73EkktrI--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/h6is1cclusslgqmbt69b.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--73EkktrI--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/h6is1cclusslgqmbt69b.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 2: Set up a Kafka server
&lt;/h2&gt;

&lt;p&gt;&lt;em&gt;Note: the followings assume Linux is running locally, you can adjust commands to reflect your environment.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Kafka requires Java JRE 8 or later, so ensure it is already installed locally.&lt;/p&gt;

&lt;p&gt;Follow the &lt;a href="https://kafka.apache.org/quickstart"&gt;Apache Kafka quickstart&lt;/a&gt; to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Download the code for your platform. This demo assumes following location: &lt;code&gt;/opt/apache/kafka_2.12-2.5.0&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Start the server – it is recommended to use dedicated command-line session for each component as they all log to the console; and&lt;/li&gt;
&lt;li&gt;Create a topic (named “test”).&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;em&gt;Tip: if you see issues with starting the server, try to clear the Zookeeper (default /tmp/zookeeper) and Kafka logs (default /tmp/kafka-logs) then try again.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 3: Deploy PubSub+ Connector for Kafka: Sink
&lt;/h2&gt;

&lt;p&gt;Obtain the &lt;a href="https://solaceproducts.github.io/pubsubplus-connector-kafka-sink/downloads/"&gt;download link&lt;/a&gt;, then download and expand the connector to a location accessible by Kafka. In this example we will place the connector into the &lt;code&gt;/opt/apache/kafka_2.12-2.5.0/connectors&lt;/code&gt; directory.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt; 1 &lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; /opt/apache/kafka_2.12-2.5.0/connectors
 2 &lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;cd&lt;/span&gt; /opt/apache/kafka_2.12-2.5.0/connectors
 3 &lt;span class="nv"&gt;$ &lt;/span&gt;wget https://solaceproducts.github.io/pubsubplus-connector-kafka-sink/downloads/pubsubplus-connector-kafka-sink-&amp;lt;version&amp;gt;.tar
 4 &lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;tar&lt;/span&gt; &lt;span class="nt"&gt;-xvf&lt;/span&gt; pubsubplus-connector-kafka-sink-&amp;lt;version&amp;gt;.tar &lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="nb"&gt;rm&lt;/span&gt; ./&lt;span class="k"&gt;*&lt;/span&gt;.tar
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Add the connector’s location to the Kafka plugin search path. We will use Kafka Connect in standalone mode, so we will edit this properties file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt; 1 &lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;cd&lt;/span&gt; /opt/apache/kafka_2.12-2.5.0
 2 &lt;span class="nv"&gt;$ &lt;/span&gt;vi config/connect-standalone.properties
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Edit the last line, ensure it is not commented out:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt; 1 &lt;span class="c"&gt;# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins:&lt;/span&gt;
 2 plugin.path&lt;span class="o"&gt;=&lt;/span&gt;/opt/apache/kafka_2.12-2.5.0/connectors
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Make the necessary configurations in the PubSub+ Sink Connector’s properties:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt; 1 &lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;cd&lt;/span&gt; /opt/apache/kafka_2.12-2.5.0
 2 &lt;span class="nv"&gt;$ &lt;/span&gt;vi connectors/pubsubplus-connector-kafka-sink-&amp;lt;version&amp;gt;/etc/solace_sink.properties
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Set followings correctly in the property file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt; 1 :
 2 :
 3 
 4 # Kafka topics to read from
 5 topics=test
 6 
 7 # PubSub+ connection information
 8 sol.host= tcps://mrbkvuibog5lt.messaging.solace.cloud:55443
 9 sol.username=solace-cloud-client
10 sol.password=vkghqm3aobegnmn6r2eu3manem
11 sol.vpn_name=kafkatest
12  
13 
14 # PubSub+ Kafka Sink connector record processor
15 sol.record_processor_class=com.solace.connector.kafka.connect.sink.recordprocessor.SolDynamicDestinationRecordProcessor
16 
17 
18 # Set to true only if using SolDynamicDestinationRecordProcessor and dynamic destinations
19 sol.dynamic_destination=true
20 
21 # Connector TLS session to PubSub+ message broker properties
22 sol.ssl_trust_store=/usr/lib/jvm/java-8-openjdk-amd64/lib/security/cacerts
23 sol.ssl_trust_store_password=changeit
24 
25 
26 :
27 :
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;In the code snipped above:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Line 5 is the Kafka topic created in &lt;em&gt;Step 2&lt;/em&gt;,&lt;/li&gt;
&lt;li&gt;The values in lines 8-11 are taken from the Solace Java API connection details from &lt;em&gt;Step 1&lt;/em&gt;,&lt;/li&gt;
&lt;li&gt;The sample in line 15 is using the SolDynamicDestinationRecordProcessor,&lt;/li&gt;
&lt;li&gt;The “true” in line 19 will enable destinations to generate dynamically, and&lt;/li&gt;
&lt;li&gt;Because you’re using TLS, provide the file location of your JRE trust store (depends on your install) and password (“changeit” is the JRE defualt) in lines 22-23&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Start the connector deployment in standalone mode, providing the property files just edited:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt; 1 &lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;cd&lt;/span&gt; /opt/apache/kafka_2.12-2.5.0
 2 &lt;span class="nv"&gt;$ &lt;/span&gt;bin/connect-standalone.sh &lt;span class="se"&gt;\&lt;/span&gt;
 3 config/connect-standalone.properties &lt;span class="se"&gt;\&lt;/span&gt;
 4 connectors/pubsubplus-connector-kafka-sink-&amp;lt;version&amp;gt;/etc/solace_sink.properties
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;In the console logs you should see similar message to:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;INFO ================ JCSMPSession Connected
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;At this point, PubSub+ Connector for Kafka: Sink is up and running, ready to interpret, convert, and forward new records from the test Kafka topic to PubSub+ as events.&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 4: Set up the MQTT consumers
&lt;/h2&gt;

&lt;p&gt;There are many MQTT clients around that can be used, or one could easily build one following the &lt;a href="https://github.com/SolaceSamples/solace-samples-mqtt"&gt;MQTT tutorial from Solace Samples&lt;/a&gt;. Here we will use the ready-to-go, open-source, third-party client: &lt;a href="http://mqtt-explorer.com/"&gt;MQTT Explorer&lt;/a&gt;.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Download MQTT Explorer for your OS platform, install and start it.&lt;/li&gt;
&lt;li&gt;Create a new connection and configure the MQTT connection details, obtained from &lt;em&gt;Step 1&lt;/em&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--R23LzuRv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/9o8f8r9rq9j50wyxhg5r.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--R23LzuRv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/9o8f8r9rq9j50wyxhg5r.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Configure the following Advanced settings:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Set a unique MQTT Client ID – this client will represent Bus &lt;em&gt;“1234”&lt;/em&gt;, so set it to &lt;code&gt;Bus-Nr-1234&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;We will send control messages like &lt;code&gt;ctrl/bus/1234/start&lt;/code&gt; so set it to listen to any messages sent to the topic starting with &lt;code&gt;ctrl/bus/1234&lt;/code&gt; – add wildcard topic &lt;code&gt;ctrl/bus/1234/#&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Remove the standard “listen-to-all” topic rule &lt;code&gt;#&lt;/code&gt; and only keep the ones seen below.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--1L8QnSph--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/r6bv0qjp1g4i9paqgtd5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--1L8QnSph--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/r6bv0qjp1g4i9paqgtd5.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Hit BACK, then SAVE.&lt;/p&gt;

&lt;p&gt;Initiate “CONNECT” and the connection indicator should turn green:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--hDCdZB7B--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/pcbayfn3brabo29ij9le.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--hDCdZB7B--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/pcbayfn3brabo29ij9le.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We have one of our “buses”, bus 1234 listening!&lt;/p&gt;

&lt;p&gt;Let’s set up a second “bus” – start a second instance of MQTT Explorer.&lt;/p&gt;

&lt;p&gt;It should already offer the saved connection to your PubSub+ Cloud broker.&lt;/p&gt;

&lt;p&gt;Go to the advanced settings and configure:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Set a unique MQTT Client ID – this client will represent Bus &lt;em&gt;“3456”&lt;/em&gt; so set it to &lt;code&gt;Bus-Nr-3456&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Add wildcard topic &lt;code&gt;ctrl/bus/3456/#&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Remove anything else, except &lt;code&gt;$SYS/#&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--3bzfFgSD--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/x183zb6lqifr6fki0lca.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--3bzfFgSD--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/x183zb6lqifr6fki0lca.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Hit BACK, then SAVE, and CONNECT.&lt;/p&gt;

&lt;p&gt;Now we have our second “bus” 3456 listening!&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 5: Test the system
&lt;/h2&gt;

&lt;p&gt;You are now ready to run the scenario!&lt;/p&gt;

&lt;p&gt;From a command-line session &lt;a href="https://kafka.apache.org/quickstart#quickstart_send"&gt;start the Kafka console publisher tool&lt;/a&gt; and manually emulate an Analytics Engine creating Kafka control records to be delivered to the buses:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt; 1 &lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;cd&lt;/span&gt; /opt/apache/kafka_2.12-2.5.0
 2 &lt;span class="nv"&gt;$ &lt;/span&gt;bin/kafka-console-producer.sh &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;--topic&lt;/span&gt; &lt;span class="nb"&gt;test
 &lt;/span&gt;3 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt;1234 start
 4 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt;3456 start
 5 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt;3456 stop
 6 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Checking the MQTT clients, you can see that the Connector has converted these records to PubSub+ events and set the event destinations to the topics the buses are listening to. PubSub+ Event Broker has delivered them to the consumer using the MQTT protocol:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--LEQK0hMi--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/p06yn8cznru2egoxov7q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--LEQK0hMi--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/p06yn8cznru2egoxov7q.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;The demonstration above showed how the Kafka Sink Connector enabled fine-grained filtering, protocol conversion, and scalable delivery to endpoints by connecting Kafka to PubSub+. This would have been difficult to achieve by only using Kafka.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;DynamicDestinations&lt;/code&gt; record processor is an example how a PubSub+ Kafka Connector can be easily extended to implement protocol conversion and business logic, tailored to a specific use case.&lt;/p&gt;

&lt;h1&gt;
  
  
  Additional Reading and References
&lt;/h1&gt;

&lt;p&gt;Check out these additional resources to discover the possibilities unleashed by PubSub+ Platform and Kafka integration:&lt;/p&gt;

&lt;p&gt;The &lt;a href="https://solace.com/resources/solace-with-kafka"&gt;Kafka page&lt;/a&gt; in the Solace Resource Hub.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/SolaceProducts/pubsubplus-connector-kafka-source"&gt;Solace PubSub+ Connector for Kafka: Source&lt;/a&gt; and &lt;a href="https://github.com/SolaceProducts/pubsubplus-connector-kafka-source"&gt;Solace PubSub+ Connector for Kafka: Sink&lt;/a&gt; are available as GitHub projects. They also include Quick Start instructions.&lt;/p&gt;

</description>
      <category>iot</category>
      <category>solace</category>
      <category>kafka</category>
      <category>integrate</category>
    </item>
  </channel>
</rss>
