In the previous blog post, we covered single threaded client server communication. In this blog post, we will be covering multi-threaded server client communication. Every new client request will be handled in its own separate thread. We will be using tokio::spawn
to spawn a new thread to handle the incoming request from the client.
We will also be implementing server shutdown using ctrl + c
. It will close the server but the clients will be gracefully shutdown without panicking. For this we need to communication from server to client. Let's use tokio::sync::broadcast
channel broadcast messages to client. But this is only possible when all the clients subscribe to the broadcast sender. This means our client will be holding at least fields socket
and receiver
.
Also, since multiple clients can connect simultaneously, they also read and write data simultaneously. This means, we also need to supply a copy of Db
to each thread. As of now, we just have one instance variable of Db
, this object needs to be shared across the threads. For this purpose, 2 things need to happen.
- We will use
Arc<Mutex>
and wrap it around theHashMap<String, Bytes>
in ourDb
to create shared references. We could useRc<RefCell>
for this purpose, but since we are talking about multi-threadingRc
is not a great fit as it is not thread-safe. - Since, we are looking at many values associated with a client connection, we should create a struct
Handler
to encapsulate all the fields together.
Let's try to breakdown our code such that hitting ctrl + c
while the server is running triggers the shutdown process for the server. In the main function of bin/server.rs
let's capture the shutdown signal using tokio::signal
.
use tokio::signal;
// in main function
let shutdown = signal::ctrl_c();
Whenever we hit ctrl+ c
this shutdown
future completes on the .await
or Future::poll
.
We want to run 2 branches at this point, one to handle the incoming requests from clients and the other to listen on this shutdown. So whenever ctrl+c
is received the shutdown process starts dropping the client handle branch. This is where tokio::select
comes into picture. It takes any number of async branches in form of futures(similar to promise in js) and runs them concurrently, waiting for value from any branch. Upon receiving value from a branch, it execuates the handler function and drops the rest of the branches.
tokio::select! {
res = server::run(&mut listener) => {
if let Err(_err) = res {
println!("failed to accept connection");
}
}
_ = shutdown => {
println!("inside shutdown loop");
}
}
// execution reaches here only when shutdown future returns value or server encounters and error.
In the code above, both the branches will run and wait for any one to return a value. To keep the server running, we will not return a value from server::run
unless there is an error. If we hit ctrl+c
at this point, then shutdown branch will run stopping our server::run()
future.
At this point, lets create our Handler
and Listener
structs. The Listener struct is the server object holding TcpListener
and broadcast channel.
Handler is also responsible for following in addition to manage socket and db clone:-
- Receiving shutdown notification from server, in case server start shutting down.
- Notifying the server when the handler is going out of scope.
Let's combine the shutdown related functionality into a separate field into a Shutdown
struct and move the socket into a new field which is Connection
struct.
File src/handler.rs
pub struct Handler {
pub connection: Connection,
pub db: Db,
pub shutdown: Shutdown,
// when the handler object is dropped, this sends a message to the receiver
_shutdown_complete: mpsc::Sender<()>,
}
pub struct Connection {
pub stream: TcpStream,
}
pub struct Shutdown {
shutdown: bool,
notify: broadcast::Receiver<()>,
}
The shutdown struct holds notify
which is a broadcast receiver which subscribes to the server broadcasting shutdown. It's time to define the Listener struct and connect the broadcast from listener to handler subscriber.
Let's create a new file src/listener.rs
.
use tokio::{
net::{TcpListener, TcpStream},
sync::{broadcast, mpsc},
};
use crate::Db;
pub struct Listener {
pub db: Db,
pub listener: TcpListener,
pub notify_shutdown: broadcast::Sender<()>,
pub shutdown_complete_rx: mpsc::Receiver<()>,
pub shutdown_complete_tx: mpsc::Sender<()>,
}
In listener struct, notify_shutdown is a broadcast channel sender. It will broadcast a message when the listener notify_shutdown is dropped.
The other 2 fields shutdown_complete_rx
and shutdown_complete_tx
are mpsc objects used to communicate between the multiple handlers to the server. Imagine, there are many clients connected to the server and the server decides to shutdown. In that case, the mpsc::Receiver<()>
will only await till all the senders have closed before the mpsc::Receiver<()>
stops blocking for receiving the messages. In short, all senders should be completed/dropped for the receiver to allow for further execution of code.
The flow of messages is follows:-
- Server start shutting down. It sends a message to all handlers.
- All the handlers get this message from the subscription of
broadcast::Sender<()>
subscription. - All the handlers decide to wrap functioning and drop themselves.
- Once the handler object goes out of scope (drops) they send message via
mpsc::Sender<()>
clone(explained later). - The server receives message in
shutdown_complete_rx
and knows for sure that handler object is dropped. -
shutdown_complete_rx
keeps receiving messages till all the handlers go out of scope. - Server closes gracefully having shutdown all the other handlers.
Note that we are using mpsc
channel which stands for multiple producers single consumer
. It fits our use case well as we are interested in sending messages from multiple handlers to a single server thread.
Now let's look at how we will create objects:-
- Upon server start, we will create a listener object, which initiates the
Db
and channel receiver and senders. - Then we wait for client socket connections in a loop.
- When a new socket connection starts, we will create a handler object which will subscribe to the
broadcast::sender
into a receiver. It will also initiate_shutdown_complete
field usingmpsc::sender
clone. - This handler object then becomes the lifetime of the client connection.
listener object create
In bin/server.rs
use blog_redis::server;
use blog_redis::Listener;
use tokio::signal;
use tokio::{
net::TcpListener,
sync::{broadcast, mpsc},
};
#[tokio::main]
pub async fn main() -> Result<(), std::io::Error> {
let listener = TcpListener::bind("127.0.0.1:8081").await?;
let shutdown = signal::ctrl_c();
let (notify_shutdown, _) = broadcast::channel(1);
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
let mut listener = Listener::new(
listener,
notify_shutdown,
shutdown_complete_tx,
shutdown_complete_rx,
);
tokio::select! {
res = server::run(&mut listener) => {
if let Err(_err) = res {
println!("failed to accept connection");
}
}
_ = shutdown => {
println!("inside shutdown loop");
}
}
// graceful shutdown code will go here. Refer Graceful shutdown section
Ok(())
}
Next, we will change our src/server.rs
file
use crate::Handler;
use crate::Listener;
pub async fn run(listener: &Listener) -> std::io::Result<()> {
loop {
let socket = listener.accept().await?;
let mut handler = Handler::new(listener, socket);
tokio::spawn(async move {
if let Err(_err) = process_method(&mut handler).await {
println!("Connection Error");
}
});
}
}
async fn process_method(handler: &mut Handler) -> Result<(), std::io::Error> {
while !handler.shutdown.is_shutdown() {
let result = tokio::select! {
_ = handler.shutdown.listen_recv() => {
return Ok(());
},
res = handler.connection.read_buf_data() => res,
};
let (cmd, vec) = match result {
Some((cmd, vec)) => (cmd, vec),
None => return Ok(()),
};
handler.process_query(cmd, vec).await?;
}
Ok(())
}
Let's now implement all the methods on the Handler
, Shutdown
and Connection
structs.
impl Connection {
fn new(stream: TcpStream) -> Connection {
Connection { stream: stream }
}
pub async fn read_buf_data(&mut self) -> Option<(Command, Vec<String>)> {
let mut buf = BytesMut::with_capacity(1024);
match self.stream.read_buf(&mut buf).await {
Ok(size) => {
if size == 0 {
// returning from empty buffer
return None;
}
}
Err(err) => {
println!("error {:?}", err);
return None;
}
};
let attrs = buffer_to_array(&mut buf);
Some((Command::get_command(&attrs[0]), attrs))
}
}
impl Shutdown {
fn new(shutdown: bool, notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown { shutdown, notify }
}
pub async fn listen_recv(&mut self) -> Result<(), tokio::sync::broadcast::error::RecvError> {
self.notify.recv().await?; // returns error of type `tokio::sync::broadcast::error::RecvError`
self.shutdown = true;
Ok(())
}
pub fn is_shutdown(&self) -> bool {
self.shutdown
}
}
impl Handler {
pub fn new(listener: &Listener, socket: TcpStream) -> Handler {
Handler {
connection: Connection::new(socket),
db: listener.db.clone(),
shutdown: Shutdown::new(false, listener.notify_shutdown.subscribe()),
_shutdown_complete: listener.shutdown_complete_tx.clone(),
}
}
pub async fn process_query(
&mut self,
command: Command,
attrs: Vec<String>,
) -> Result<(), std::io::Error> {
let connection = &mut self.connection;
let db = &self.db;
match command {
Command::Get => {
let result = db.read(&attrs);
match result {
Ok(result) => {
connection.stream.write_all(&result).await?;
}
Err(_err) => {
connection.stream.write_all(b"").await?;
}
}
return Ok(());
}
Command::Set => {
let resp = db.write(&attrs);
match resp {
Ok(result) => {
connection.stream.write_all(&result.as_bytes()).await?;
}
Err(_err) => {
connection.stream.write_all(b"").await?;
}
}
return Ok(());
}
Command::Invalid => {
connection.stream.write_all(b"invalid command").await?;
Err(std::io::Error::from(ErrorKind::InvalidData))
}
}
}
}
Db changes to Arc
In the previous blog post, we had defined Db
entries as HashMap<String, Bytes>
it worked fine because we just had one-one server client communication. But in case of multi-threading we need to pass shared mutable references to all the handler objects. One way to achieve this is using Arc<Mutex>
wrapper on our HashMap
. Arc<T>
provides the shared ownership of the value it encapsulates T
. It is a thread-safe way of updating the shared data. To pass mutable references, we will clone the Arc
which produces a new Arc
instance which points to the same memory location. Mutex
provides a way to get the lock on the data such that only one acquirer can update the data at a time. Once the acquirer is done updating the data it releases the lock for other acquirers to do the same.
File src/db.rs
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct Db {
pub entries: Arc<Mutex<HashMap<String, Bytes>>>,
}
impl Db {
pub fn new() -> Db {
Db {
entries: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn write(&self, arr: &[String]) -> Result<&str, &'static str> {
let key = &arr[1];
let value = &arr[2];
// we need to clone the referenced value since Bytes::from() function expects a 'static lifetime
// variable but `value` has unknown lifetime in this function context
let val = value.clone();
let p = self.entries.lock().unwrap()
.insert(String::from(key), Bytes::from(val));
match p {
Some(_p) => Ok("r Ok"), // if they key was already present
None => Ok("Ok"), // if the key was not present
}
}
/// Reads data from the database
pub fn read(&self, arr: &[String]) -> Result<Bytes, &'static str> {
let key = &arr[1];
let query_result = self.entries.lock().unwrap();
let res = query_result.get(key);
match res {
Some(value) => Ok(value.clone()),
None => Err("no such key found"),
}
}
}
You might have noticed that we are calling lock()
method on &self.entries
where self
is Db
. There is no such method as lock on entries
which is of type Arc
. This method is present in Mutex
module. The reason why this lock method is accessible on Arc is because both Arc and Mutex implement Deref
trait. It stands for derefrencing of the smart pointer and Deref
returns the reference to the inner data.
Deref & Deref Coercion
Deref does something known as Deref Coercion. Meaning it will modify the type of the reference into what is expected from the method being called on the reference in this case lock()
.
Here the entries field of type Arc<Mutex<HashMap<..>>>
is coerced to &Mutex<Hashmap<..>>
.
It does this by implementing the deref method which borrows the self
(entries: Arc<>) and returns a reference to the inner data (&Mutex<>). It happens automatically when we pass a reference to a particular type’s value as an argument to a function or method that doesn’t match the parameter type in the function or method definition. A sequence of calls to the deref method converts the type we provided into the type the parameter needs.
Arc
and Mutex
implement both Deref
and DerefMut
. In case of get, we only need a reference to the &HashMap
but in case of set
we need a mutable reference to the &mut HashMap
. .lock().unwrap()
returns a MutexGuard<HashMap<..>>
which gets coerced to &mut HashMap<..>
since the insert method expects a mutable reference to the HashMap.
One thing to notice is we are calling the insert method in a single line call whereas, the get method is called in 2 separate lines. There is a reason for it.
The insert method returns an Option<Bytes>
whereas the get method returns an Option<&Bytes>
. In case of get
we need to persist the reference of the Bytes until the function returns which depends on the lifetime of the hashmap under the hood. But in case of set, the return value is owned by the function itself. Let's try to understand the lifetime of the HashMap
in both the cases.
The lock()
method returns a &MutexGuard<HashMap<..>>
. MutexGuard also implements Deref
hence it will return the reference of HashMap using the deref function from Deref
trait.
trait Deref {
//...
fn deref<'a>(&'a self) -> &'a T
// T is the data encapsulated inside self eg. MutexGuard<HashMap<..>>
}
When we call query_result.get(key)
under the hood, MutexGuard
pointer derefrencing takes place using the above deref
instance method. It means that the lifetime of the wrapped data HashMap
will be the same as MutexGuard
.
Since we do not store the value of MutexGuard
anywhere it gets dropped right after the .get
call if used in a single line. Since the hashmap has the same lifetime it also gets dropped and along with it any references it has returned as a result of get method also gets dropped.
However, when we are storing the mutexguard in a local variable of the function the lifetime of the mutexguard becomes the lifetime of the function scope. The lifetime of hashMap also becomes the lifetime of the function and hence the lifetime of the value returned from the hashMap also becomes the lifetime of the function scope.
We can use the query_result
inside the function. The reason why we had to clone the value when returning from the function is because, query_result
gets dropped after the scope of this function. The value
is &Bytes
type it also gets dropped after the function scope. Hence, using clone()
to make a new copy is needed.
Handler object create
Let's move the main server logic to a new file src/server.rs
. This file will hold main execution of server. We will move our code from bin/server.rs
to src/server.rs
.
File src/server.rs
use crate::Handler;
use crate::Listener;
pub async fn run(listener: &Listener) -> std::io::Result<()> {
loop {
let socket = listener.accept().await?;
let mut handler = Handler::new(listener, socket);
tokio::spawn(async move {
if let Err(_err) = process_method(&mut handler).await {
println!("Connection Error");
}
});
}
}
async fn process_method(handler: &mut Handler) -> Result<(), std::io::Error> {
while !handler.shutdown.is_shutdown() {
let result = tokio::select! {
_ = handler.shutdown.listen_recv() => {
return Ok(());
},
res = handler.connection.read_buf_data() => res,
};
let (cmd, vec) = match result {
Some((cmd, vec)) => (cmd, vec),
None => return Ok(()),
};
handler.process_query(cmd, vec).await?;
}
Ok(())
}
One very important conideration in the code above is that we are invoking handler.shutdown.listen_recv()
and handler.connection.read_buf_data()
in the tokio::select!
macro block. Since the handler
object is &mut
i.e. mutable reference one might think that this violates the borrow rule that there can't be more than one borrow mutable reference of one object. However, we have 2 different fields being passed down and there is no overlap between them hence the compiler doesn't complain of the borrow rule being voilated.
Initially, to keep things simple, I had not created Connection
and Shutdown
structs. So I was getting the error of borrow rules. Later I decided to encapsulate the respective fields in different structs and the compiler stopped complaining of the borrow error.
Graceful Shutdown
The handler objects are listening for messages from notify_shutdown
of listener object on the notify
field in the shutdown object inside handler.
Let's explore the process_method
function inside src/server.rs
. This method exits in following cases:
- a successful completion of the process_query method at the bottom of the function.
- When the buffer is empty and
handler.read_buf_data
returnsNone
. - When
handler.shutdown.listen_recv()
returns a value.
In 3rd case, the value is only returned from handler.shutdown.listen_recv()
when the notify_shutdown
from listener sends a message.
So let's send a message from notify_shutdown
using ctrl+c
from bin/server.rs
. This will result in tokio::select!
going into the _ = shutdown => {}
branch. The code moves out of the tokio::select!
block. One way is to drop the notify_shutdown
. All this message passing happens on the broadcast
channel.
File bin/server.rs
// tokio::select! { ... }
drop(listener.notify_shutdown);
File src/handler.rs
. This is where the message is received.
pub async fn listen_recv(&mut self) -> Result<(), tokio::sync::broadcast::error::RecvError> {
self.notify.recv().await?; // the message from drop(notify_shutdown) is received here and await unblock the control flow
self.shutdown = true;
Ok(())
}
When the receiver notify
receives the message, it immediately returns from the while loop into the run
method of server.rs
. Taking the control flow to the tokio::spawn()
. This is where the handler object is dropped from the scope. At this point, the handler sends a message from _shutdown_complete
to the receiver. This happens on the mpsc
channel. Now we need to listen to these messages from the handler objects getting dropped.
// tokio::select! { ... }
drop(listener.notify_shutdown);
drop(listener.shutdown_complete_tx);
let _ = listener.shutdown_complete_rx.await;
// this point is only reached when all the handlers are dropped.
In the code above, we are first dropping the shutdown_complete_tx
. This needs to be done because, if we don't drop this sender then the shutdown_complete_rx.await
will wait forever for a message from the sender going into an infinite wait time. The listener.shutdown_complete_rx.await;
will wait for all the handlers to go out of scope and then return None
when all handlers(cloned senders) have been dropped.
Using .await
we are guaranteed that all the handlers are dropped at this point.
Congratualtions! you have successfully created a multi-threaded redis server and client using tokio.
Here is the latest code repository found on github.
If you have any queries of questions feel free to ask me in the comments section or discord channel.
Top comments (0)