Managing complex communication over raw I/O streams using async-io-typed and async-io-converse

Introduction

Over the past few months I’ve been working on a pair of Rust crates that I think might be useful to the wider world. async-io-typed and async-io-converse which provide the ability to exchange typed messages between two endpoints linked by any kind of byte based I/O channel. This channel could be a TCP connection, a named pipe, a unix socket, even a file (if you don’t need the communication to be real time).

Why two crates?

Each provides a different set of features. async-io-typed provides a unidirectional channel over which you can asynchronously send serde compatible types. async-io-converse depends on async-io-typed, and adds the ability to receive replies to your messages. It requires a duplex connection. async-io-typed can also be used duplex, but it doesn’t require it.

Here’s a table to summarize.

  Typed Messages Duplex communication Unidirectional communication Receive Replies Requires tokio
async-io-typed Yes Optional Yes No No
async-io-converse Yes Required No Yes Yes

Assembling the puzzle pieces

The I/O channel

The I/O channel can be any byte based I/O interface which implements futures_io::AsyncRead or futures_io::AsyncWrite. tokio::io::AsyncRead and tokio::io::AsyncWrite are okay too! You’ll just need to use the compatibility layer from tokio_util in that case. Both async-io-typed and async-io-converse provide the ability to include checksums with each message. If you’re completely confident that your I/O channel is reliable, or 100% accuracy just isn’t important for your use case, you can turn off the checksums. Some examples of reliable channels are TCP, and most forms of IPC. UDP would be an example of an unreliable channel. If the first message doesn’t arrive correctly then the connection as a whole will fail, regardless of whether checksums are on or off.

The message type

In order to exchange messages you’ll need a struct or enum which implements the traits serde::se::Serialize and serde::de::DeserializeOwned. Fortunately, as people who are experienced with serde know, deriving these traits is pretty easy. Here’s an example.

Cargo.toml

[dependencies]
serde = { version = "1.0", features = ["derive"] }

main.rs

use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub struct MyMessage {
  pub field_1: bool,
  pub field_2: String,
}

You can alter MyMessage however suits your needs, just add fields for the information you want to exchange.

async-io-typed example

For this example we’re going to use the tokio TcpStream to establish a TCP loopback connection. Remember though that this can function over any I/O channel.

use std::net::{Ipv4Addr, SocketAddrV4};
use futures_util::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use async_io_typed::DuplexStreamTyped;

// Listen for new connections on a TCP binding assigned by the operating system.
let tcp_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
    .await
    .unwrap();

Get the port number that the operating system assigned to us.

let port = tcp_listener.local_addr().unwrap().port();

Start accepting a new connection. We intentionally don’t await this future yet, because that would prevent us from creating the client connecting to it.

let accept_fut = tcp_listener.accept();

Create a new DuplexStreamTyped which exchanges MyMessage type, by connecting to the newly opened TCP port. We then wrap the connection in a compatibility layer so that tokio::io and futures_io can work together. The second parameter here disables checksums. TCP already does error checking for us, so we can disable the checksum.

let mut client_stream = DuplexStreamTyped::<_, MyMessage>::new(
    TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port))
        .await
        .unwrap()
        .compat(),
    ChecksumEnabled::No,
);

Now that a connection is established, let’s resolve our accept future from earlier.

let (server_stream, _address) = accept_fut.await.unwrap();

Wrap the connection in a DuplexStreamTyped, with checksums off.

let mut server_stream = DuplexStreamTyped::new(server_stream.compat_write(), ChecksumEnabled::No);

Let’s compose the message to send

let message = MyMessage {
    field_1: true,
    field_2: String::from("Hello World!"),
};

Send the message

server_stream.send(message).await.unwrap();

Receive the message

let received = client_stream.next().await.unwrap().unwrap();

And with that you can now do whatever you want with the received message.

async-io-converse example

Similar to the above example, we’re going to use TCP, but you don’t have to use TCP for your code.

Let’s use a different type for the message. This time we’re going to use an enum.

#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub enum MyMessage {
    /// Initial Message
    HelloThere,
    /// Reply
    GeneralKenobiYouAreABoldOne,
}

Now for the example code itself

use std::net::{Ipv4Addr, SocketAddrV4};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use async_io_converse::new_duplex_connection;

// Listen for new connections on a TCP binding assigned by the operating system.
let tcp_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
    .await
    .unwrap();

Get the port number that the operating system assigned to us.

let port = tcp_listener.local_addr().unwrap().port();

Start accepting a new connection. We intentionally don’t await this future yet, because that would prevent us from creating the client connecting to it.

let accept_fut = tcp_listener.accept();

Connect to the newly opened TCP port.

let mut client_stream = TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port))
    .await
    .unwrap();

Now that a connection is established, let’s resolve our accept future from earlier.

let (server_stream, _address) = accept_fut.await.unwrap();

Split both connections into a read half and a write half.

let (server_read, server_write) = server_stream.into_split();
let (client_read, client_write) = client_stream.into_split();

Use these pieces to form a typed duplex connection, using the tokio_util compatibility functions to adapt tokio interfaces into futures_io interfaces.

let (server_recv, mut server_send) = new_duplex_connection(ChecksumEnabled::No, server_read.compat(),  server_write.compat_write());
let (mut client_recv, _client_send) = new_duplex_connection(ChecksumEnabled::No, client_read.compat(),  client_write.compat_write());

In order for the reply mechanism to work something must be driving the server_recv and client_recv values. We don’t want to drive the server_recv manually, so we’re going to use .drive_forever() to make the tokio runtime responsible for driving it. This means we can never use server_recv again, but for this example we won’t need to.

tokio::spawn(server_recv.drive_forever());

Let’s compose the message to send

let message = MyMessage::HelloThere;

There’s four ways we could send this message, the first two ways, the .send and .send_timeout methods are appropriate if you don’t wish to wait for a reply. You may choose to wait for a reply with these methods, but you don’t have to. The .ask and .ask_timeout methods are simpler to use if you want a reply, but they do require you to wait for a reply (or for a timeout to occur).

This time we do want a reply, however we want to process the client side code before we wait for a reply, so we’ll use .send as this enables us to delay waiting for the reply. Note that in a real world use case .ask may be more convenient.

let reply_fut = server_send.send(message).await.unwrap();

Receive the message

let received = client_recv.next().await.unwrap().unwrap();

Let’s formulate a reply.

let reply = match received.message() {
   MyMessage::HelloThere => MyMessage::GeneralKenobiYouAreABoldOne,
   MyMessage::GeneralKenobiYouAreABoldOne => panic!("Wait! That's my line!"),
};

Now send it.

received.reply(reply).await.unwrap();

On the server side we can now wait for a reply to arrive.

let received_reply = reply_fut.await.unwrap();
assert_eq!(received_reply, MyMessage::GeneralKenobiYouAreABoldOne);

And with that, the exchange is complete.

Can I use this protocol outside of Rust?

I don’t recommend it. Strictly speaking you could, but that’d be much harder than many of the alternatives out there. This library is designed for a Rust centered use case.

What’s next?

I consider these crates to be more or less feature complete. I’ll likely work on improving the documentation, adding more tests, and fixing any bugs reported to me on my GitHub issue trackers. By the way, if you like what you see here, I’m currently looking for a job. Here’s my LinkedIn.

Written on January 24, 2023