Managing complex communication over raw I/O streams using async-io-typed and async-io-converse
Introduction
Over the past few months I’ve been working on a pair of Rust crates that I think might be useful to the wider world.
async-io-typed
and async-io-converse
which provide the ability to exchange typed messages between two endpoints linked by any kind of byte based I/O channel.
This channel could be a TCP connection, a named pipe, a unix socket, even a file (if you don’t need the communication to be real time).
Why two crates?
Each provides a different set of features. async-io-typed
provides a unidirectional channel over which you can asynchronously send
serde
compatible types. async-io-converse
depends on async-io-typed
, and adds the
ability to receive replies to your messages. It requires a duplex connection. async-io-typed
can also be used duplex,
but it doesn’t require it.
Here’s a table to summarize.
Typed Messages | Duplex communication | Unidirectional communication | Receive Replies | Requires tokio |
|
---|---|---|---|---|---|
async-io-typed | Yes | Optional | Yes | No | No |
async-io-converse | Yes | Required | No | Yes | Yes |
Assembling the puzzle pieces
The I/O channel
The I/O channel can be any byte based I/O interface which implements
futures_io::AsyncRead
or
futures_io::AsyncWrite
.
tokio::io::AsyncRead
and
tokio::io::AsyncWrite
are
okay too! You’ll just need to use the compatibility layer from
tokio_util
in that case.
Both async-io-typed
and async-io-converse
provide the ability to include checksums with each message. If you’re completely confident that
your I/O channel is reliable, or 100% accuracy just isn’t important for your use case, you can turn off the checksums. Some examples of reliable
channels are TCP, and most forms of IPC. UDP would be an example of an unreliable channel. If the first message doesn’t arrive correctly then
the connection as a whole will fail, regardless of whether checksums are on or off.
The message type
In order to exchange messages you’ll need a struct
or enum
which implements the traits
serde::se::Serialize
and
serde::de::DeserializeOwned
.
Fortunately, as people who are experienced with serde
know, deriving these traits is pretty easy. Here’s an example.
Cargo.toml
[dependencies]
serde = { version = "1.0", features = ["derive"] }
main.rs
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
pub struct MyMessage {
pub field_1: bool,
pub field_2: String,
}
You can alter MyMessage
however suits your needs, just add fields for the information you want to exchange.
async-io-typed
example
For this example we’re going to use the tokio
TcpStream
to establish a TCP loopback
connection. Remember though that this can function over any I/O channel.
use std::net::{Ipv4Addr, SocketAddrV4};
use futures_util::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use async_io_typed::DuplexStreamTyped;
// Listen for new connections on a TCP binding assigned by the operating system.
let tcp_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
.await
.unwrap();
Get the port number that the operating system assigned to us.
let port = tcp_listener.local_addr().unwrap().port();
Start accepting a new connection. We intentionally don’t await this future yet, because that would prevent us from creating the client connecting to it.
let accept_fut = tcp_listener.accept();
Create a new
DuplexStreamTyped
which exchanges MyMessage
type, by connecting to the newly opened TCP port. We then wrap the
connection in a compatibility layer so that tokio::io
and futures_io
can work together.
The second parameter here disables checksums. TCP already does error checking for us, so we can disable the
checksum.
let mut client_stream = DuplexStreamTyped::<_, MyMessage>::new(
TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port))
.await
.unwrap()
.compat(),
ChecksumEnabled::No,
);
Now that a connection is established, let’s resolve our accept future from earlier.
let (server_stream, _address) = accept_fut.await.unwrap();
Wrap the connection in a DuplexStreamTyped
, with checksums off.
let mut server_stream = DuplexStreamTyped::new(server_stream.compat_write(), ChecksumEnabled::No);
Let’s compose the message to send
let message = MyMessage {
field_1: true,
field_2: String::from("Hello World!"),
};
Send the message
server_stream.send(message).await.unwrap();
Receive the message
let received = client_stream.next().await.unwrap().unwrap();
And with that you can now do whatever you want with the received message.
async-io-converse
example
Similar to the above example, we’re going to use TCP, but you don’t have to use TCP for your code.
Let’s use a different type for the message. This time we’re going to use an enum.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub enum MyMessage {
/// Initial Message
HelloThere,
/// Reply
GeneralKenobiYouAreABoldOne,
}
Now for the example code itself
use std::net::{Ipv4Addr, SocketAddrV4};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use async_io_converse::new_duplex_connection;
// Listen for new connections on a TCP binding assigned by the operating system.
let tcp_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
.await
.unwrap();
Get the port number that the operating system assigned to us.
let port = tcp_listener.local_addr().unwrap().port();
Start accepting a new connection. We intentionally don’t await this future yet, because that would prevent us from creating the client connecting to it.
let accept_fut = tcp_listener.accept();
Connect to the newly opened TCP port.
let mut client_stream = TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port))
.await
.unwrap();
Now that a connection is established, let’s resolve our accept future from earlier.
let (server_stream, _address) = accept_fut.await.unwrap();
Split both connections into a read half and a write half.
let (server_read, server_write) = server_stream.into_split();
let (client_read, client_write) = client_stream.into_split();
Use these pieces to form a typed duplex connection, using the tokio_util
compatibility functions
to adapt tokio
interfaces into futures_io
interfaces.
let (server_recv, mut server_send) = new_duplex_connection(ChecksumEnabled::No, server_read.compat(), server_write.compat_write());
let (mut client_recv, _client_send) = new_duplex_connection(ChecksumEnabled::No, client_read.compat(), client_write.compat_write());
In order for the reply mechanism to work something must be driving the server_recv
and client_recv
values.
We don’t want to drive the server_recv
manually, so we’re going to use .drive_forever()
to make the
tokio
runtime responsible for driving it. This means we can never use server_recv
again, but for this example we won’t need to.
tokio::spawn(server_recv.drive_forever());
Let’s compose the message to send
let message = MyMessage::HelloThere;
There’s four ways we could send this message, the first two ways, the .send
and .send_timeout
methods are
appropriate if you don’t wish to wait for a reply. You may choose to wait for a reply with these methods, but you
don’t have to. The .ask
and .ask_timeout
methods are simpler to use if you want a reply, but they do require
you to wait for a reply (or for a timeout to occur).
This time we do want a reply, however we want to process the client side code before we wait for a reply, so we’ll
use .send
as this enables us to delay waiting for the reply. Note that in a real world use case .ask
may be
more convenient.
let reply_fut = server_send.send(message).await.unwrap();
Receive the message
let received = client_recv.next().await.unwrap().unwrap();
Let’s formulate a reply.
let reply = match received.message() {
MyMessage::HelloThere => MyMessage::GeneralKenobiYouAreABoldOne,
MyMessage::GeneralKenobiYouAreABoldOne => panic!("Wait! That's my line!"),
};
Now send it.
received.reply(reply).await.unwrap();
On the server side we can now wait for a reply to arrive.
let received_reply = reply_fut.await.unwrap();
assert_eq!(received_reply, MyMessage::GeneralKenobiYouAreABoldOne);
And with that, the exchange is complete.
Can I use this protocol outside of Rust?
I don’t recommend it. Strictly speaking you could, but that’d be much harder than many of the alternatives out there. This library is designed for a Rust centered use case.
What’s next?
I consider these crates to be more or less feature complete. I’ll likely work on improving the documentation, adding more tests, and fixing any bugs reported to me on my GitHub issue trackers. By the way, if you like what you see here, I’m currently looking for a job. Here’s my LinkedIn.