<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Michael van Niekerk</title>
    <description>The latest articles on DEV Community by Michael van Niekerk (@mvniekerk).</description>
    <link>https://dev.to/mvniekerk</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F360312%2F90a5b201-52ae-45e3-8074-cd9562261a4b.png</url>
      <title>DEV Community: Michael van Niekerk</title>
      <link>https://dev.to/mvniekerk</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/mvniekerk"/>
    <language>en</language>
    <item>
      <title>Write a Tokio 0.2 decoder for a TCP stream and push its events to Apache Kafka</title>
      <dc:creator>Michael van Niekerk</dc:creator>
      <pubDate>Sat, 04 Apr 2020 07:04:58 +0000</pubDate>
      <link>https://dev.to/mvniekerk/write-a-tokio-2-decoder-for-a-tcp-stream-and-push-its-events-to-apache-kafka-2mo0</link>
      <guid>https://dev.to/mvniekerk/write-a-tokio-2-decoder-for-a-tcp-stream-and-push-its-events-to-apache-kafka-2mo0</guid>
      <description>&lt;h2&gt;
  
  
  What we'll be making
&lt;/h2&gt;

&lt;p&gt;We'll be listening to a port. This port is streaming out some XML events. Only caveat is is that it is first padded (sometimes) with a 32bit number to tell you how much bytes are on its way.&lt;/p&gt;

&lt;p&gt;Now, we'll be parsing this stream of events and make it fit into our model. After converting these events into JSON we'll push it onto an Apache Kafka stream. As an added bonus these Kafka messages will be mimicking the messages generated by Spring Cloud Stream.&lt;/p&gt;

&lt;p&gt;We'll be using futures and async code for most of the time.&lt;/p&gt;

&lt;h2&gt;
  
  
  Some dependencies
&lt;/h2&gt;

&lt;p&gt;Over to your Cargo.toml file, add the following&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight toml"&gt;&lt;code&gt;&lt;span class="nn"&gt;[dependencies]&lt;/span&gt;
&lt;span class="py"&gt;futures&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"0.3.4"&lt;/span&gt;
&lt;span class="c"&gt;# Bytes - we'll be using them&lt;/span&gt;
&lt;span class="py"&gt;bytes&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"0.5.4"&lt;/span&gt;
&lt;span class="c"&gt;# Pull in the tokio framework&lt;/span&gt;
&lt;span class="nn"&gt;tokio&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="py"&gt;version&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"0.2.15"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="py"&gt;features&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;["full"]&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c"&gt;# Using this to make the tokio codec&lt;/span&gt;
&lt;span class="nn"&gt;tokio-util&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="py"&gt;version&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"0.3.1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="py"&gt;features&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;["codec"]&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c"&gt;# Serde is a framework for serializing and deserializing Rust data structures efficiently and generically.&lt;/span&gt;
&lt;span class="py"&gt;serde_derive&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"1.0.105"&lt;/span&gt;
&lt;span class="py"&gt;serde&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"1.0.105"&lt;/span&gt;
&lt;span class="c"&gt;# Derive JSON from your data &lt;/span&gt;
&lt;span class="py"&gt;serde_json&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"1.0.50"&lt;/span&gt;
&lt;span class="c"&gt;# We'll be adding UUIDs to our Kafka messages&lt;/span&gt;
&lt;span class="nn"&gt;uuid&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="py"&gt;version&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"0.8.1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="py"&gt;features&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;["v4"]&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c"&gt;# Rust bindings for Apache Kafka&lt;/span&gt;
&lt;span class="nn"&gt;rdkafka&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="py"&gt;version&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"0.23"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="py"&gt;features&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;["cmake-build"]&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;A quick &lt;code&gt;cargo build&lt;/code&gt; will show you how bad everything is going to blow up in your face. The rdkafka crate is a beast. It compiles the C client library statically (or dynamically), links to it and its dependencies statically. That is, of course, if you've got everything installed. Best head over to &lt;a href="https://github.com/fede1024/rust-rdkafka"&gt;https://github.com/fede1024/rust-rdkafka&lt;/a&gt; to check what you'll need.&lt;br&gt;
I've used this crate for almost over a year, and I must say the build system isn't as near as brittle as it was a year back. Its build system is actually a work of (Makefile and other build scripts) art.&lt;/p&gt;
&lt;h2&gt;
  
  
  On with the decoder
&lt;/h2&gt;
&lt;h3&gt;
  
  
  But first the domain model
&lt;/h3&gt;

&lt;p&gt;Our port we'll be listening to is sending out a report every 1 second, if it doesn't get an event. When it gets an event it'll send the packet out earlier.&lt;/p&gt;

&lt;p&gt;In plain text (if you would do a &lt;code&gt;nc 192.168.1.2 10000&lt;/code&gt; to listen to this port) the packets can look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt; 🎃🎃&lt;span class="cp"&gt;&amp;lt;?xml version="1.0" encoding="UTF-8"?&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;ReceiverReport&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;NetAddress&lt;/span&gt; &lt;span class="na"&gt;value=&lt;/span&gt;&lt;span class="s"&gt;"192.168.1.2:10000"&lt;/span&gt;&lt;span class="nt"&gt;/&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/ReceiverReport&amp;gt;&lt;/span&gt;
🎃🎃&lt;span class="cp"&gt;&amp;lt;?xml version="1.0" encoding="UTF-8"?&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;ReceiverReport&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;NetAddress&lt;/span&gt; &lt;span class="na"&gt;value=&lt;/span&gt;&lt;span class="s"&gt;"192.168.1.2:10000"&lt;/span&gt;&lt;span class="nt"&gt;/&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;Transmission&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;Message&lt;/span&gt; &lt;span class="na"&gt;value=&lt;/span&gt;&lt;span class="s"&gt;"000000010011011001001000000100"&lt;/span&gt; &lt;span class="na"&gt;units=&lt;/span&gt;&lt;span class="s"&gt;"bits"&lt;/span&gt;&lt;span class="nt"&gt;/&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;/Transmission&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/ReceiverReport&amp;gt;&lt;/span&gt;
...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note the 🎃🎃 - they are the padding (that is also optional) that tells us how big the packet is after the 🎃🎃.&lt;/p&gt;

&lt;p&gt;So let's make a model out of the above:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;extern&lt;/span&gt; &lt;span class="n"&gt;crate&lt;/span&gt; &lt;span class="n"&gt;serde_xml_rs&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;serde_xml_rs&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;from_reader&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;#[derive(Deserialize)]&lt;/span&gt; &lt;span class="c"&gt;// Deserialize using serde&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;NetAddress&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;String&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nd"&gt;#[derive(Deserialize)]&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="n"&gt;units&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c"&gt;// (2)&lt;/span&gt;
  &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;String&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nd"&gt;#[derive(Deserialize)]&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;Transmission&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;#[serde(rename&lt;/span&gt; &lt;span class="nd"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Message"&lt;/span&gt;&lt;span class="nd"&gt;,&lt;/span&gt; &lt;span class="nd"&gt;default)]&lt;/span&gt;
  &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="n"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="c"&gt;// (1)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nd"&gt;#[derive(Deserialize)]&lt;/span&gt;
&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;ReceiverReport&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;#[serde(rename&lt;/span&gt; &lt;span class="nd"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Transmission"&lt;/span&gt;&lt;span class="nd"&gt;,&lt;/span&gt; &lt;span class="nd"&gt;default)]&lt;/span&gt;
  &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="n"&gt;transmissions&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Transmission&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;

  &lt;span class="nd"&gt;#[serde(rename&lt;/span&gt; &lt;span class="nd"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"NetAddress"&lt;/span&gt;&lt;span class="nd"&gt;,&lt;/span&gt; &lt;span class="nd"&gt;default)]&lt;/span&gt;
  &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="n"&gt;net_addresses&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;NetAddress&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="nn"&gt;std&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;str&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;FromStr&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;ReceiverReport&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="c"&gt;// (3)&lt;/span&gt;
  &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="nb"&gt;Err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;serde_xml_rs&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

  &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;from_str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;xml&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nn"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nb"&gt;Err&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nf"&gt;from_reader&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;xml&lt;/span&gt;&lt;span class="nf"&gt;.as_bytes&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(1). Remember in XML you can have more than 1 of the same type of element as a child node, hence these are vectors. Here we also see how to rename a field in the XML to an element in the deserialized struct.&lt;br&gt;
(2). Note that these are properties of the XML node and thus aren't vectors&lt;br&gt;
(3). Parse an XML string and convert it to a ReceiverReport struct&lt;/p&gt;
&lt;h3&gt;
  
  
  Now for the actual decoder
&lt;/h3&gt;

&lt;p&gt;The Decoder trait comes from the tokio-util crate.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;tokio_util&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;codec&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Decoder&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;bytes&lt;/span&gt;&lt;span class="p"&gt;::{&lt;/span&gt;&lt;span class="n"&gt;BufMut&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;BytesMut&lt;/span&gt;&lt;span class="p"&gt;};&lt;/span&gt;

&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;MyCodec&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;Decoder&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;MyCodec&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Item&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;  &lt;span class="c"&gt;// (1)&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Error&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;std&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;io&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="c"&gt;// (2)&lt;/span&gt;
    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;BytesMut&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nn"&gt;io&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;Option&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nn"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Item&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="c"&gt;// (3)&lt;/span&gt;
      &lt;span class="c"&gt;// &amp;lt;- Insert magic here -&amp;gt;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(1). Item is the return type. We'll be returning a vector of Message structs&lt;br&gt;
(2). We can return an io::Error&lt;br&gt;
(3). The function of the Decoder trait you need to implement.&lt;/p&gt;

&lt;p&gt;So lets discuss the Decoder trait. We are being fed a buffer of bytes. &lt;br&gt;
Scenario 1: The whole buffer constitutes exactly one ReceiverReport.&lt;br&gt;
Scenario 2: The first part of the buffer constitutes one ReceiverReport.&lt;br&gt;
Scenario 3: We don't have enough bytes to have something that can be one ReceiverReport.&lt;br&gt;
Scenario 4: We're off our trolley and none of the data makes any sense.&lt;/p&gt;

&lt;p&gt;Scenario 3's return is an easy &lt;code&gt;Ok(None)&lt;/code&gt; return value.&lt;br&gt;
Scenario 1 and 2 is exactly the same in how it is handled. We'll be getting the piece of the buffer that's the XML and forward the buffer to after the point we've read (after &lt;code&gt;"&amp;lt;/ReceiverReport&amp;gt;\r\n"&lt;/code&gt;). Then we'll parse the XML, fish out the Transmission vector and return that to tokio using an &lt;code&gt;Ok(Some(messages_from_transmissions(ReceiverReport::from_str(&amp;amp;xml).unwrap())))&lt;/code&gt;.&lt;br&gt;
Scenario 4 would return a io::Err().&lt;/p&gt;
&lt;h3&gt;
  
  
  Adding codec magic
&lt;/h3&gt;

&lt;p&gt;Below is code to pull out a data frame out the buffer.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;
&lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;messages_from_transmissions&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rp&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ReceiverReport&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;rp&lt;/span&gt;&lt;span class="py"&gt;.transmissions&lt;/span&gt;&lt;span class="nf"&gt;.is_empty&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;rp&lt;/span&gt;&lt;span class="py"&gt;.transmissions&lt;/span&gt;&lt;span class="nf"&gt;.iter&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="nf"&gt;.filter&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="py"&gt;.messages&lt;/span&gt;&lt;span class="nf"&gt;.is_empty&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
            &lt;span class="nf"&gt;.filter&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="py"&gt;.messages&lt;/span&gt;&lt;span class="nf"&gt;.get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="py"&gt;.units&lt;/span&gt;&lt;span class="nf"&gt;.eq&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"bits"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
            &lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="py"&gt;.messages&lt;/span&gt;&lt;span class="nf"&gt;.get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
            &lt;span class="nf"&gt;.collect&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nd"&gt;vec!&lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;MyCodec&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;Decoder&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;MyCodec&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Item&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Error&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;std&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;io&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; 
  &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;BytesMut&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nn"&gt;io&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;Option&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nn"&gt;Self&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Item&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="nf"&gt;.len&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="c"&gt;// We don't even have enough data to get the length. Scenario 3&lt;/span&gt;
          &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;None&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="p"&gt;};&lt;/span&gt;

      &lt;span class="c"&gt;// Read the length of the packet&lt;/span&gt;
      &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="n"&gt;in&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;..&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&amp;lt;=&lt;/span&gt; &lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
          &lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;usize&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;

      &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0x6D7_83F3C&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="c"&gt;// We are starting with &amp;lt;xml and not the padding&lt;/span&gt;
          &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;end_tx&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"&amp;lt;/ReceiverReport&amp;gt;&lt;/span&gt;&lt;span class="se"&gt;\r\n&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
          &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;end_tx_b&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;u8&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;end_tx&lt;/span&gt;&lt;span class="nf"&gt;.as_ref&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
          &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;end_tx_len&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;end_tx&lt;/span&gt;&lt;span class="nf"&gt;.len&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

          &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="nf"&gt;.len&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;end_tx_len&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
              &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;None&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
          &lt;span class="p"&gt;}&lt;/span&gt;

          &lt;span class="c"&gt;// We don't have a frame length to work with&lt;/span&gt;
          &lt;span class="c"&gt;// Search through the buffer and try to find the end of the ReceiverReport&lt;/span&gt;
          &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;pos&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="nf"&gt;.windows&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;end_tx_len&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.position&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;window&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="n"&gt;window&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;end_tx_b&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
          &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="n"&gt;pos&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
              &lt;span class="c"&gt;// We've found the end of it. Scenario 1 or 2&lt;/span&gt;
              &lt;span class="nf"&gt;Some&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                  &lt;span class="c"&gt;// Forward the buffer onwards and copy that which you've forwarded past from&lt;/span&gt;
                  &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="nf"&gt;.split_to&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;end_tx_len&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
                  &lt;span class="c"&gt;// Get the XML string&lt;/span&gt;
                  &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;xml&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;String&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;from_utf8&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="nf"&gt;.to_vec&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;&lt;span class="nf"&gt;.expect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"invalid utf8 data"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
                  &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;Some&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;messages_from_transmissions&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;ReceiverReport&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;from_str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;xml&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;())))&lt;/span&gt;
              &lt;span class="p"&gt;},&lt;/span&gt;
              &lt;span class="c"&gt;// Scenario 3&lt;/span&gt;
              &lt;span class="nb"&gt;None&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;None&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
          &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;usize&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

          &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
              &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;None&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
          &lt;span class="p"&gt;}&lt;/span&gt;

          &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;buf_len&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="nf"&gt;.len&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

          &lt;span class="c"&gt;// We still need some bytes&lt;/span&gt;
          &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;buf_len&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="c"&gt;// Scenario 3&lt;/span&gt;
                &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;None&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
          &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
              &lt;span class="c"&gt;// Forward the buffer&lt;/span&gt;
              &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="nf"&gt;.split_to&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;len&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
              &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;xml&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;..&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt;
              &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;xml&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;String&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;from_utf8&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;xml&lt;/span&gt;&lt;span class="nf"&gt;.to_vec&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;&lt;span class="nf"&gt;.expect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"invalid utf8 data"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
              &lt;span class="c"&gt;// Scenario 1 or 2&lt;/span&gt;
              &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;Some&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;messages_from_transmissions&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;ReceiverReport&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;from_str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;xml&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;())))&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  A framed TCP stream
&lt;/h3&gt;

&lt;p&gt;So we've got our swanky new decoder. Now we want to connect to a socket and get notified one by one as these messages are sent.&lt;/p&gt;

&lt;p&gt;First we need an encoder - because the Framed trait says so (even though we're not going to use the encoder). This one below gets a vector of bytes and stuffs a buffer with them:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;tokio_util&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;codec&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Encoder&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;Encoder&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;u8&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;MyCodec&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Error&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;io&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;u8&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;BytesMut&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nn"&gt;io&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;buf&lt;/span&gt;&lt;span class="nf"&gt;.put&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;..&lt;/span&gt;&lt;span class="p"&gt;]);&lt;/span&gt;
        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we create a Framed TcpStream:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;MyCodecConnection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Framed&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TcpStream&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;MyCodec&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="c"&gt;// (1)&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;MyCodec&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;addr&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;SocketAddr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;MyCodecConnection&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nn"&gt;io&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;tcp_stream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;TcpStream&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;addr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="o"&gt;?&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;MyCodec&lt;/span&gt;&lt;span class="nf"&gt;.framed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tcp_stream&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="c"&gt;// (2)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(1). MyCodecConnection type&lt;br&gt;
(2). The .framed function comes from the implemented Decoder trait.&lt;/p&gt;
&lt;h2&gt;
  
  
  Sending something to Kafka
&lt;/h2&gt;
&lt;h3&gt;
  
  
  Base
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;
&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;rdkafka&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;FutureProducer&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="n"&gt;Sender&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;FutureProducer&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h3&gt;
  
  
  Connect
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;rdkafka&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;config&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;ClientConfig&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;rdkafka&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;error&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;KafkaResult&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;rdkafka&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;FutureProducer&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;Sender&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;create_and_connect&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Sender&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;brokers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"kafkabroker1.kaas.com,kafkabroker2.kaas.com"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;KafkaResult&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;FutureProducer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;ClientConfig&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="nf"&gt;.set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"bootstrap.servers"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;brokers&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="nf"&gt;.set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"message.timeout.ms"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"5000"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="nf"&gt;.create&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Sender&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="p"&gt;}),&lt;/span&gt;
            &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(())&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Above sets up a connection to 2 Kafka brokers and makes up the producer (that'll be used to send messages later on).&lt;/p&gt;
&lt;h3&gt;
  
  
  Send
&lt;/h3&gt;

&lt;p&gt;Given a topic, a key and a message, send it to a list of Kafka brokers. But also try to mimic Spring Cloud Stream's packets.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;impl&lt;/span&gt; &lt;span class="n"&gt;Sender&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;rdkafka&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;FutureRecord&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;uuid&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Uuid&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;rdkafka&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;DeliveryFuture&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;rdkafka&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;message&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;OwnedHeaders&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;std&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;time&lt;/span&gt;&lt;span class="p"&gt;::{&lt;/span&gt;&lt;span class="n"&gt;SystemTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;UNIX_EPOCH&lt;/span&gt;&lt;span class="p"&gt;};&lt;/span&gt;

    &lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;DeliveryFuture&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c"&gt;// Get the timestamp&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;start&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;SystemTime&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;since_the_epoch&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;start&lt;/span&gt;&lt;span class="nf"&gt;.duration_since&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;UNIX_EPOCH&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="nf"&gt;.expect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Time went backwards"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;in_ms&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;since_the_epoch&lt;/span&gt;&lt;span class="nf"&gt;.as_secs&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
            &lt;span class="n"&gt;since_the_epoch&lt;/span&gt;&lt;span class="nf"&gt;.subsec_nanos&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;u64&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mi"&gt;1_000_000&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nd"&gt;format!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"{}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;in_ms&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="nf"&gt;.as_ref&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="c"&gt;// Generate a UUID&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;my_uuid&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;Uuid&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new_v4&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;my_uuid&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nd"&gt;format!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"{}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;my_uuid&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;my_uuid&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;my_uuid&lt;/span&gt;&lt;span class="nf"&gt;.as_ref&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="c"&gt;// Content type - some Spring Cloud Stream extras&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;content_type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;serde_json&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nd"&gt;json!&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
            &lt;span class="s"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s"&gt;"application"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="s"&gt;"subtype"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s"&gt;"json"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="s"&gt;"parameters"&lt;/span&gt;&lt;span class="p"&gt;:{},&lt;/span&gt;
            &lt;span class="s"&gt;"concrete"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="s"&gt;"wildCardType"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="s"&gt;"wildCardSubtype"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;

        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;content_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;content_type&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;content_type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;content_type&lt;/span&gt;&lt;span class="nf"&gt;.as_ref&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="c"&gt;// More Spring Cloud Stream copying&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;spring_json_header_types&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nd"&gt;json!&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
            &lt;span class="s"&gt;"contentType"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"org.springframework.util.MimeType"&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;spring_json_header_types&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spring_json_header_types&lt;/span&gt;&lt;span class="nf"&gt;.to_string&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;spring_json_header_types&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spring_json_header_types&lt;/span&gt;&lt;span class="nf"&gt;.as_ref&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="c"&gt;// Actually make the record that we're sending&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;FutureRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
            &lt;span class="nn"&gt;FutureRecord&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;to&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="nf"&gt;.payload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="nf"&gt;.key&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="nf"&gt;.headers&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;OwnedHeaders&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
                    &lt;span class="nf"&gt;.add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"content-type"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"application/json"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="nf"&gt;.add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="nf"&gt;.add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;my_uuid&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="nf"&gt;.add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"contentType"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;content_type&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="nf"&gt;.add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"spring_json_header_types"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;spring_json_header_types&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;self&lt;/span&gt;&lt;span class="py"&gt;.producer&lt;/span&gt;&lt;span class="nf"&gt;.send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Enter Tokio
&lt;/h2&gt;

&lt;p&gt;Now to glue everything up together.&lt;br&gt;
Below is some very hairy stuff. In short, we'll be listening for these messages that come from our TcpStream. When we do get a list of messages, we set up an MPSC (multiple producer single consumer) channel - the receiving end of the channel sends the json produced by the producer to a Kafka stream.&lt;br&gt;
The resulting futures of the sending of the Kafka messages is then mapped to be either a boolean true or false depending on their delivery state.&lt;br&gt;
In between all of this, these small tasks are spawned using the Tokio runtime.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight rust"&gt;&lt;code&gt;&lt;span class="k"&gt;pub&lt;/span&gt; &lt;span class="k"&gt;fn&lt;/span&gt; &lt;span class="nf"&gt;connect_to_xml_stream&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;futures&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;prelude&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;runtime&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Runtime&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;std&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;net&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;SocketAddr&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;use&lt;/span&gt; &lt;span class="nn"&gt;futures&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;mpsc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="c"&gt;// Setup a Tokio runtime&lt;/span&gt;
    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;rt&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;Runtime&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;loop&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;rt&lt;/span&gt;&lt;span class="nf"&gt;.block_on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="c"&gt;// Convert a string to a socket address&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;addr&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"192.168.2.100"&lt;/span&gt;&lt;span class="py"&gt;.parse&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;SocketAddr&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
            &lt;span class="c"&gt;// Connect to the TCP stream&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;streams&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;MyCodec&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;addr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;streams&lt;/span&gt;&lt;span class="nf"&gt;.is_err&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;streams&lt;/span&gt;&lt;span class="nf"&gt;.err&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
                &lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error with opening up connection {:?}, waiting 10s"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
                &lt;span class="c"&gt;// Wait for 10 seconds before retrying the loop&lt;/span&gt;
                &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;time&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;delay_for&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nn"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;from_secs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;false&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;_&lt;/span&gt;&lt;span class="n"&gt;sink&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ins&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;streams&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.split&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
            &lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Connected"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

            &lt;span class="c"&gt;// Map the streamed messages to futures&lt;/span&gt;
            &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;fut&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ins&lt;/span&gt;&lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;move&lt;/span&gt; &lt;span class="p"&gt;|&lt;/span&gt;&lt;span class="n"&gt;chunk&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Result&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;Vec&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nn"&gt;io&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="n"&gt;chunk&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="nf"&gt;.len&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                            &lt;span class="c"&gt;// Only send to Kafka if we have at least 1 message&lt;/span&gt;
                            &lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;sender&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;Sender&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;create_and_connect&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
                                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;sender&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sender&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
                                &lt;span class="c"&gt;// Open up a channel for the sender&lt;/span&gt;
                                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;rx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
                                &lt;span class="c"&gt;// Iterate over the list of events&lt;/span&gt;
                                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="nf"&gt;.iter&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                                    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nn"&gt;futures&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;mpsc&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;Sender&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

                                    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="nf"&gt;.clone&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                                &lt;span class="p"&gt;})&lt;/span&gt;&lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;mut&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;)|&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;move&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                                    &lt;span class="c"&gt;// Send the event's json to the channel&lt;/span&gt;
                                    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;serde_json&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;to_string&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="nf"&gt;.unwrap&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
                                    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="nf"&gt;.try_send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
                                    &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="nf"&gt;.is_ok&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
                                &lt;span class="p"&gt;});&lt;/span&gt;

                                &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nn"&gt;future&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;join_all&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
                                &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;task&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;spawn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

                                &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;task&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;spawn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;move&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                                    &lt;span class="c"&gt;// On the receiving end of the channel, send the received json to the Kafka sender&lt;/span&gt;
                                    &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="n"&gt;l&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;bool&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;rx&lt;/span&gt;&lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;move&lt;/span&gt; &lt;span class="p"&gt;|&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                                        &lt;span class="n"&gt;sender&lt;/span&gt;&lt;span class="nf"&gt;.send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"kafka_topic"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafka_key"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="o"&gt;..&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
                                            &lt;span class="nf"&gt;.map&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                                                &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                                                    &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                                    &lt;span class="mi"&gt;_&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;false&lt;/span&gt;
                                                &lt;span class="p"&gt;}&lt;/span&gt;
                                            &lt;span class="p"&gt;})&lt;/span&gt;
                                    &lt;span class="p"&gt;})&lt;/span&gt;
                                        &lt;span class="nf"&gt;.fold&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;|&lt;/span&gt;&lt;span class="n"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;move&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;acc&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;
                                        &lt;span class="k"&gt;.await&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                                    &lt;span class="n"&gt;l&lt;/span&gt;
                                &lt;span class="p"&gt;});&lt;/span&gt;
                                &lt;span class="nn"&gt;future&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;ready&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;true&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                            &lt;span class="p"&gt;},&lt;/span&gt;
                            &lt;span class="mi"&gt;_&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                                &lt;span class="nn"&gt;future&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;ready&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;false&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                            &lt;span class="p"&gt;}&lt;/span&gt;
                        &lt;span class="p"&gt;}&lt;/span&gt;
                    &lt;span class="p"&gt;},&lt;/span&gt;
                    &lt;span class="nf"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;_&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="nn"&gt;future&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;ready&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;false&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="p"&gt;})&lt;/span&gt;
            &lt;span class="nf"&gt;.then&lt;/span&gt;&lt;span class="p"&gt;(|&lt;/span&gt;&lt;span class="n"&gt;v&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="c"&gt;// The mapped futures should be spawned&lt;/span&gt;
                &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;task&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;spawn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;v&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;})&lt;/span&gt;&lt;span class="nf"&gt;.fold&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;|&lt;/span&gt;&lt;span class="n"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;|&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;move&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;acc&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="k"&gt;let&lt;/span&gt; &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;false&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;});&lt;/span&gt;
            &lt;span class="c"&gt;// Await until the connection is closed&lt;/span&gt;
            &lt;span class="nn"&gt;tokio&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nn"&gt;task&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="nf"&gt;spawn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fut&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="k"&gt;.await&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
        &lt;span class="k"&gt;match&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="nf"&gt;Ok&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Finished with connection"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="mi"&gt;_&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nd"&gt;println!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Exited connection"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;I've written this in hope that it helps someone. Please send me a message if this was you, or buy me a beer &lt;/p&gt;

&lt;p&gt;&lt;a href="https://liberapay.com/mvniekerk/donate"&gt;&lt;img alt="Donate using Liberapay" src="https://res.cloudinary.com/practicaldev/image/fetch/s--D0JWHkxh--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://liberapay.com/assets/widgets/donate.svg"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Photo by &lt;br&gt;
&lt;a href="https://unsplash.com/@neptunian?utm_medium=referral&amp;amp;utm_campaign=photographer-credit&amp;amp;utm_content=creditBadge" rel="noopener noreferrer" title="Download free do whatever you want high-resolution photos from Torsten Muller"&gt;&lt;span&gt;&lt;/span&gt;&lt;span&gt;Torsten Muller&lt;/span&gt;&lt;/a&gt;&lt;/p&gt;

</description>
      <category>rust</category>
      <category>tokio</category>
      <category>kafka</category>
      <category>futures</category>
    </item>
  </channel>
</rss>
