DEV Community

Cover image for I/O Streams in Java: Understanding Blocking and Non-blocking Paradigms
parthlaw
parthlaw

Posted on

I/O Streams in Java: Understanding Blocking and Non-blocking Paradigms

Introduction

I/O streams are the heart of Input-Output operations. These are the paths through which data flows between a source and a destination.

  • Input Stream: These are used by a program or an application to read data from a source such as a file, connection, keyboard, etc.
  • Output Stream: These are used by a program or an application to write data to the destination.

Blocking and Non-Blocking I/O

Basic I/O operations are generally blocking in nature i.e. they block the thread execution until there is some data to read.
For example in a normal HTTP request, a client makes a request to an application which is bind to a certain port (port 80 for HTTP), they first establish a socket connection between them. After the connection is made, the server waits for the client to make a request and then sends the response through the same socket.

A Socket Connection
In a normal Socket connection, where we want continuous communication between client and server without going through the expensive HTTP request process again and again, we keep the socket connection open for communication. The server waits for the client to send something to respond to it. This means the thread is blocked until the client says something. This SocketServer can be created in Java like this:

ServerSocket serverSocket = new ServerSocket(port)
while (true) {
  Socket socket = serverSocket.accept();
  System.out.println("New client connected: " + socket);

  // ClientHandler.run is the function to infinitely listen 
  //and handle the client messages until the connection closes.

  ClientHandler clientHandler = new ClientHandler(socket);
  clientHandler.run();
}
Enter fullscreen mode Exit fullscreen mode

An example to explain the situation can be: Suppose two friends are playing a game in which they have a pipe to communicate with each other. One friend says something (the client) and the other friend (the server) writes the message down and responds 'ack'. This would look like this:

Two Friends playing

This is fine for the situation when only two friends want to play the game i.e. only one friend is talking and the other is listening or waiting for him to talk continuously.
The problem arises when multiple friends want to talk and play the game i.e. multiple friends arrive with their respective pipes and want their messages written down on the other end. But since only one person is listening (i.e. friend-2), he cannot wait on the end of each pipe to listen to the messages. One obvious solution for him is to hire multiple people (one person for each pipe) and assign them to the pipes:

Multiple Threads

This concept is multithreading. In this, you spawn a new thread for each new connection.

ExecutorService executorService = Executors.newSingleThreadExecutor();
ServerSocket serverSocket = new ServerSocket(port)
while (true) {
  Socket socket = serverSocket.accept();
  System.out.println("New client connected: " + socket);

  // ClientHandler.run is the function to infinitely listen 
  and handle the client messages until the connection closes.
  ClientHandler clientHandler = new ClientHandler(socket);
  executorService.submit(clientHandler::run);
}
Enter fullscreen mode Exit fullscreen mode

However, there is a major limitation to this approach as well. Friend 2 cannot keep hiring people for his each friend playing the game. If friends keep coming, there would arrive a point when Friend-2 would be left with no money i.e. a server cannot spawn an infinite number of threads. There needs to be a better solution to this problem.

The problem here is each connection is blocking a thread. That is why we need to spawn a new thread for each new connection. To solve this problem, the non-blocking I/O comes into the picture. This means, that a connection is not blocking a thread and there is some mechanism to go and listen to the connection whenever it wants to send a message.
One of the popular implementations in Java is the use of Channels and Selectors.
In the context of our current example, the idea is to install a recorder which records messages from each pipe separately. Now Friend-2 can just iterate through all the messages and respond to them one by one and process them (write in his notebook). This would eliminate the need for hiring people and friend 2 alone can listen and process messages from his each friend.
Java NIO library provides channel classes for input streams. In our current use-case i.e. for Socket connection, it has ServerSocketChannel class. A channel is just a pathway for data to be transferred (i.e. the pipe). The ServerSocketChannel class allows us to define the connection as non-blocking and register it to a Selector (the Recorder).

Non Blocking I/O

  1. Client Side
    • The client writes the message data to its outbound buffer.
    • The client then attempts to send the data over the network to the server via the channel.
  2. Network Transmission
    • The data is taken from the client's outbound buffer and sent to the server over the network.
  3. Server Side
    • The transmitted data arrives at the server's network interface and is placed into the network input buffer associated with the server's connection.
    • The server's operating system manages this input buffer and makes the data available for the server's application to read.
  4. Server Application:
    • When the server's application is ready to read from the channel (by calling channel.read()), it reads data from the network input buffer.
    • If data is waiting in the input buffer, the server reads it into its application-level buffer for processing.

You define and register your ServerSocketChannel to a Selector like this:

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// configure non blocking nature
serverSocketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress(8000);
serverSocketChannel.bind(inetSocketAddress);
System.out.println("Socket Server started on port 8000");

// Now you register the channel with selector with the required interest key (will be explained further).
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
Enter fullscreen mode Exit fullscreen mode

Interest Key is the the key for action that you are interested in, with the channel. In our current context, when we create a new socket server, we are interested in new connections. Interest Key for that action is SelectionKey.OP_ACCEPT.
After creating the ServerSocketChanel, binding it to port 8000 and registering it with a Selector, we can now start handling the connections.

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// configure non blocking nature
serverSocketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress(8000);
serverSocketChannel.bind(inetSocketAddress);
System.out.println("Socket Server started on port 8000");

// Now you register the channel with selector with the required interest key (will be explained further).
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while(true) {
 // this is the keys in our selector which are ready to be 
 // processed i.e. new connections which are ready to be 
 // accepted (OP_ACCEPT), messages from connections which are 
 // ready to be read (OP_READ), etc.

 int readyCount = selector.select();
 if(readyCount==0){
  // skip when there is no key to be processed
  continue;
 }
 // get the ready keys set
 Set<SelectionKey> readyKeysSet = selector.selectedKeys();
 // iterate over the readyKeySet
 Iterator iterator = readyKeysSet.iterator();
 // checking if there are any connection requests and the server is ready to accept the connection i.e. OP_ACCEPT is registered.
 while(iterator.hasNext()) {
  SelectionKey key = (SelectionKey) iterator.next();
  iterator.remove();
  if (key.isAcceptable()) {
   System.out.println("Accepting connection");
   // Get the channel for the client connection
   ServerSocketChannel server = (ServerSocketChannel) key.channel();
   // accept the connection
   SocketChannel client = server.accept();
   // configure non blocking behavior for this channel
   client.configureBlocking(false);
   // register the client channel with the same selector. The action we are interested is only read i.e. we only want to listen to the client messages. The SelectonKey is OP_READ.
   SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ); 
  }

  // checking if there is any client wants to send the message 
  // (this would only be true when there is any message and 
  // OP_READ is registered).
  if(key.isReadable()) {
   SocketChannel client = (SocketChannel) key.channel();
   int BUFFER_SIZE = 1024;
   // create a buffer
   ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
   try {
    // read data and store it into created buffer
    int bytesRead = client.read(buffer);
    if (bytesRead == -1) {
     System.out.println("Connection close");
     // Connection closed by client
     key.cancel();
     client.close();
     continue;
    }
    // flip to change buffer to read mode from write mode
    buffer.flip();
    // define a byte array of size as the number of bytes in the buffer.
    byte[] receivedBytes = new byte[buffer.remaining()];
    buffer.get(receivedBytes);
    // print the length of byte array
    System.out.println(receivedBytes.length);
    // now immediately after receiving the message we want to write the ack to client. So we register OP_WRITE now so that our server is ready to write to outbound buffer.
    key.interestOpsOr(SelectionKey.OP_WRITE);
   } catch (SocketException e) {
     e.printStackTrace();
     key.cancel();
     client.close();
     continue;
    } catch (Exception e) {
        e.printStackTrace();
    }
  }
  if(key.isWritable()) {
   SocketChannel client = (SocketChannel) key.channel();
   // write to outbound buffer associated with this client's connection
   client.write("ack");
   // immediately remove OP_WRITE interest key, so that server becomes not ready to write.
   key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
  }
 }
}
Enter fullscreen mode Exit fullscreen mode

This way we can handle multiple client connections and their reads and writes in a single thread. As we want to scale and increase the processing time, we should offload our blocking tasks such as processing the read message (processing can include for example saving something to db) to separate threads. This way threads would be killed once these tasks are over. To further optimize it, we can create thread pools for a context of tasks.

ExecutorService executorService = Executors.newFixedThreadPool(n);
Enter fullscreen mode Exit fullscreen mode

This created a pool of maximum n available threads. If there is no thread available, the task will wait in the queue until a thread becomes available.

To further optimize it, we can also create a pool of clients to assign a separate selector to each pool and run the processing of each pool in a separate thread. This way we can increase processing and hence response time for the clients.

Multithreading with Selectors

Full Java code for the approach with all imports is as follows:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiThreadedServer {

    private static final int PORT = 8080;
    private static final int MAX_CLIENTS_PER_POOL = 20;
    private static final int MIN_NUM_POOLS = 5; // Number of separate pools

    public static void main(String[] args) throws IOException {
        ExecutorService poolExecutor = Executors.newFixedThreadPool(MIN_NUM_POOLS);

        // Create and open a server socket channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
        serverSocketChannel.configureBlocking(false);

        // Create a single selector for the server socket channel
        Selector serverSelector = Selector.open();
        serverSocketChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

        // Array to hold selectors for client channels
        Selector[] clientSelectors = new Selector[MIN_NUM_POOLS];
        for (int i = 0; i < MIN_NUM_POOLS; i++) {
            clientSelectors[i] = Selector.open();
            ClientPoolHandler clientPoolHandler = new ClientPoolHandler(clientSelectors[i]);
            poolExecutor.submit(clientPoolHandler::run);
        }

        // Accept and handle client connections in separate threads for each pool
        while (true) {
            serverSelector.select();
            Set<SelectionKey> selectedKeys = serverSelector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();

                if (!key.isValid()) {
                    continue;
                }

                if (key.isAcceptable()) {
                    // Accept the connection
                    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                    SocketChannel clientChannel = serverChannel.accept();
                    boolean clientAdded = false;

                    // Check if any selector has capacity, otherwise create a new one
                    for (Selector selector : clientSelectors) {
                        int numClients = selector.keys().size() - 1; // Subtract 1 for the server channel
                        if (numClients < MAX_CLIENTS_PER_POOL) {
                            clientChannel.configureBlocking(false);
                            clientChannel.register(selector, SelectionKey.OP_READ);
                            clientAdded = true;
                            break;
                        }
                    }

                    // If no selector has capacity, create a new one and spawn a new thread
                    if (!clientAdded) {
                        Selector newSelector = Selector.open();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(newSelector, SelectionKey.OP_READ);
                        ClientPoolHandler clientPoolHandler = new ClientPoolHandler(newSelector); 
                        poolExecutor.submit(clientPoolHandler::run);
                    }
                }
            }
        }
    }

    private static class ClientPoolHandler{
        private Selector selector;

        public ClientPoolHandler(Selector selector) {
            this.selector = selector;
        }

        public void run() {
            try {
                while (true) {
                    selector.select();
                    // Handle selected keys
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Here we maintain a minimum number of pools running to keep the application available, whenever needed.

Conclusion

Basic I/O operations are typically blocking, causing threads to wait until data is available. However, as the need for scalability arises, especially in scenarios with multiple concurrent connections, blocking I/O becomes impractical due to resource limitations.

The shift towards non-blocking I/O, facilitated by technologies like Java NIO (Non-blocking I/O), offers a more scalable solution. By decoupling connections from threads and employing mechanisms such as Channels and Selectors, non-blocking I/O allows servers to handle numerous connections without exhausting system resources.

In essence, non-blocking I/O enables servers to asynchronously manage multiple connections, improving performance and scalability. By adopting this approach, we can build robust and efficient systems capable of handling high loads while maintaining responsiveness.

Please let me know in the comments, what are the other solutions in Java or in other languages for the problem 🙏.
Let's keep discussing and increasing each other's knowledge.

Thank you for reaching this far. I'm a new grad student from India, and this is my first blog post. I hope you liked the content. Please share in the comments on what I could improve.🙌

Thank you for reading 🙏.

Top comments (3)

Collapse
 
dcoder_1 profile image
Lakshay

Very knowledgeable 🙌

Collapse
 
juicylynx profile image
Juicylynx

Really helped me learn something new. Great content!

Collapse
 
harshitphoenix profile image
Harshit

Needed this one. Thanks for writing