DEV Community

jtenner
jtenner

Posted on

Creating a Tokio Codec

Today, I am proud to announce that I have written a Tokio codec using rust nightly and async/await from scratch.

telnet_codec

A tokio telnet codec implementation written in rust.




Tokio is an async socket server framework that can help you get your web server up and running very quickly. One of the primary ways tokio helps developers create web servers is by giving them a framework through which to use specialized codecs. Codecs are a special way of translating streamed information into manageable chunks, or better known as "frames."

For example, he following example shows how to use the telnet codec to make an echo server.

use telnet_codec::codec::TelnetCodec;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  let mut listener = TcpListener::bind("127.0.0.1:7000").await?;
  println!("listening on port 7000");
  loop {
    let (socket, _) = listener.accept().await?;
    tokio::spawn(async move {
      // The telnet codec has an internal buffer used for each connection.
      // This codec requires the buffer size to be specified up front (4096 bytes)
      let codec = TelnetCodec::new(4096_usize);
      let (mut sink, mut input) = codec.framed(socket).split();
      // Use the sink to write telnet events to the socket.
      // use the input to iterate over telnet events.
      while let Some(Ok(event)) = input.next().await {
        println!("Event {:?}", event);
        match event {
          TelnetEvent::Message(value) => {
            println!("Echoing message: {}", value);
            if let Err(error) = sink.send(TelnetEvent::Message(value)).await {
              println!("An error occured {}", error);
            }
          },
          _ => {
            // nop
          }
        }
      }
    });
  }
}

Essentially, developers will accept a connection and pass it to a codec of choice which takes over the responsibility of interpreting the inbound and outbound IO for you.

When implementing a custom codec, internally it requires an Encoder and Decoder trait to be implemented on the struct. The Encoder trait defines a means of taking a "Frame" and turning it into bytes, whereas the Decoder defines a means of transforming bytes into a set of Frames.

The first thing I did to develop the TelnetCodec struct was run cargo init --lib and modify the Cargo.toml file to include tokio latest and the bytes package. Don't forget to use tokio latest on rust nightly!

[package]
name = "telnet_codec"
version = "0.0.0"
authors = ["Joshua Tenner <tenner.joshua@gmail.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = "=0.2.0-alpha.4"
bytes = "0.4.12"

Implementing the Encoder

Next, I implemented the Encoder trait with this definition.

use tokio::codec::{ Encoder };

impl Encoder for TelnetCodec {
  type Item = TelnetEvent;
  type Error = TelnetError;

  fn encode(&mut self, event: Self::Item, buf: &mut BytesMut) ->
    Result<(), Self::Error> {
    // do something here
  }
}

Remember, the job of the Encoder is to turn TelnetEvents into bytes, so the encode() function must take three parameters. These parameters include a reference to &mut self, the telnet event itself, and a buffer that contains the bytes that need to be written.

The telnet encoding task is short and sweet. Often, a telnet server will send a DO, DONT, WILL or WONT command to the connected client. These commands only need three bytes of information to be encoded and can be implemented with a simple match expression.

  fn encode(&mut self, event: TelnetEvent, buf: &mut BytesMut) -> Result<(), Self::Error> {
      match event {
        // basic commands are IAC (COMMAND) (OPT)
        TelnetEvent::Do(opt) => {
          buf.reserve(3); // allocate 3 bytes to be written
          buf.put(IAC); // send the Interperet as Command byte (255)
          buf.put(DO); // send the command (253)
          buf.put::<u8>(opt.into()); // send the option
        },
        TelnetEvent::Dont(opt) => {
          buf.reserve(3);
          buf.put(IAC);
          buf.put(DONT);
          buf.put::<u8>(opt.into());
        },
        TelnetEvent::Will(opt) => {
          buf.reserve(3);
          buf.put(IAC);
          buf.put(WILL);
          buf.put::<u8>(opt.into());
        },
        TelnetEvent::Wont(opt) => {
          buf.reserve(3);
          buf.put(IAC);
          buf.put(WONT);
          buf.put::<u8>(opt.into());
        },
        // another event
        _ => { }
      }
  Ok(())
}

Since the pattern is the same for each command, it might be possible to combine the match arms. Sadly I am not experienced enough with rust to find a nice way to do this. If you can figure out a better way, please feel free to submit a pull request, or comment below (please!)

Once the function returns Ok(()), the bytes in the BytesMut buffer will be sent to the client. There are, of course, other options and events which can be implemented, and they currently exist in working fashion at the telnet codec repo.

Implementing the Decoder

Next, I implemented the decoder. The decoder's job is to decode incoming bytes, which may be partitioned into many different frames.

This is quite a complicated task, and instead of writing a tutorial about how to interpret incoming telnet bytes, I chose to document how a decoder function works, and how to test it manually.

Here is an example of how to implement the Decoder trait for a codec.

use tokio::codec::{ Decoder };

impl Decoder for TelnetCodec {
  type Item = TelnetEvent;
  type Error = TelnetError;

  fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
    // do something with the incoming src bytes
  }
}

The decode function is called repeatedly until it returns Ok(None) to gather all the relevant "frames" out from within a given buffer. As a result, the decode() function itself is responsible for returning a single "frame" and then truncating the BytesMut object to the start of the next frame, effectively iterating over the entire buffer with one function call at a time. The decoder logic follows a few rules.

  1. If the stream hasn't provided enough bytes yet, or is empty, the decoder should return Ok(None) to stop parsing frames
  2. If the frame is invalid, consume all of the error bytes and return Err(TelnetError::ErrorKind) to report an Error frame to the consumer
  3. If the bytes are valid, consume all the bytes in the current frame and return Ok(Some(TelnetEvent::EventKind))

Here is a bare bones example decode() function.

  fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
    let len = src.len();
    if len == 0 {
      // there are no bytes to consume, stop querying the buffer
      return Ok(None); 
    }

    // parse out the bytes from the start of the buffer

    // then check to see if an error occured
    if invalid {
      // src should be truncated to [next_start_index,len)
      src.split_to(next_start_index); 
      return Err(TelnetError::ErrorKind);
    }

    // truncate src here too
    src.split_to(next_start_index);
    return Ok(Some(TelnetEvent::EventKind));
  }

Testing the Decoder

To test a decoder, iterate over the output of each function call and push the results to a Vec<Result<Option<TelnetEvent>, TelnetError>>. Here a custom consume function I wrote to test the output of my decoder.

    fn consume(codec: &mut TelnetCodec, bytes: &mut BytesMut) -> Vec<Result<Option<TelnetEvent>, TelnetError>> {
        let mut result = Vec::new();
        loop {
            match codec.decode(bytes) {
                Ok(None) => { break; }
                output => result.push(output)
            }
        }
        return result;
    }

Now it's possible to write some tests to validate the implementation.

The testing strategy I like to use is as follows:

  1. Create some state
  2. Perform an operation
  3. Compare the output to some expected values

For example, this is the "Hello World" decoder test in the telnet_codec repo.

    #[test]
    fn finished_message() {
        // This test validates that the codec converts the byte input
        // into TelnetEvents correctly.
        // First, instantiate the codec.
        let mut codec = TelnetCodec::new(4096);
        // Then create a BytesMut buffer from some bytes to be decoded.
        let mut bytes = BytesMut::from(b"Hello world\r\n".to_vec());
        // Finally consume the input bytes, and compare the frames that
        // that the decode function returns.
        let result = consume(&mut codec, &mut bytes);

        // the bytes should be completely consumed, so `bytes.len()`
        // should be 0
        assert_eq!(bytes.len(), 0_usize);

        // Since we sent a message to the decoder that ends in "\r\n",
        // it should return a single telnet frame in the form of a
        // message event that contains the expected String value.
        assert_eq!(
            result,
            vec![
                // Decode returns Result<Option<TelnetEvent>, TelnetError>
                Ok(Some(TelnetEvent::Message(String::from("Hello world"))))
            ],
        );
    }

Testing the Encoder

We can also test the encoder to verify that the output bytes are what we expect. This is a "Hello world" encoder sample test.

    #[test]
    fn message_encode() {
        // The encoder is responsible for turning TelnetEvents into
        // byte frames. First, create the codec.
        let mut codec = TelnetCodec::new(4096);
        // Next, create a buffer to be written to.
        let mut output = BytesMut::new();
        // Finally, create the message event and encode the result.
        let message = TelnetEvent::Message(String::from("Hello world!\r\n"));

        // encode the message and read the output
        codec.encode(message, &mut output).expect("Invalid encoding sequence");

        // utf8 output
        assert_eq!(
            output,
            // The output should have the following bytes in the buffer
            BytesMut::from(vec![
                0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x0d, 0x0a,
            ]),
        );
    }

Encoding and decoding is obviously no trivial task, but having tools to help test these functions is really worth while when testing a codec. Please feel free to ask questions below, or fix my silly coding mistakes. I'm constantly learning how to use rust, and I plan on maintaining the codec so that many others can use it too.

Thank you for reading!

Latest comments (2)

Collapse
 
nmrshll profile image
Nicolas Marshall

Nice write-up, definitely helping me get started with Tokio codecs.

Just one thing: in your consume() function (for testing), you're returning a Vec<Result<Option<TelnetEvent>, TelnetError>>, but since you're handling the Ok(None) case and never returning from there, you'll never end up with a None in the Option<> in the return value.
So if i'm not mistaken this function can be simplified to return a Vec<Result<TelnetEvent,TelnetError>> ?

Collapse
 
nuxeh profile image
nuxeh

I'm curious, how come you've archived the Github project?