Project loom: the light at the end of the tunnel

Posted on by

Categories:     

100DaysOfProgramming_Day018

In this article, I’m going to write an echo server in multiple ways. The server will not do anything; it will just keep listening for connections on a port and echo back whatever is sent. Then I will compare which way wins.

Let’s first write the single-threaded echo sever-

package com.bazlur;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;

public class Day016 {
  public static void main(String[] args) throws IOException {
    if (args.length != 1) throw new IllegalArgumentException("Please specify port");
    var port = Integer.parseInt(args[0]);

    var serverSocket = new ServerSocket(port);
    System.out.println("Started server on port " + port);

    while (true) {
      try (var socket = serverSocket.accept();
           var in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
           var out = new PrintWriter(socket.getOutputStream(), true)) {
        String line;
        while ((line = in.readLine()) != null) {
          out.println(line.toUpperCase());
        }
      } catch (IOException e) {
        System.out.println("Was unable to establish or communicate with client socket:" + e.getMessage());
      }
    }
  }
}

The above code is simple and intuitive. We have used a ServerSocket that listens to a port and waits for an incoming connection. Whenever a client connects to it, it reads whatever is sent to it and replies back.

The only problem is, it cannot handle multiple connections simultaneously. If one client connects to it, no other will be able to make it to the server at that particular time. Once that specific client disconnect, other will get a chance.

However, we can change the experience just by allowing multiple threads to handle each connection here. Let’s look at the code-

package com.bazlur;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;

public class Day017 {
  public static void main(String[] args) throws IOException {
    if (args.length != 1) throw new IllegalArgumentException("Please specify port");
    var port = Integer.parseInt(args[0]);

    var executorService = Executors.newCachedThreadPool();
    var serverSocket = new ServerSocket(port);
    System.out.println("Started server on port " + port);

    while (true) {
      var socket = serverSocket.accept();
      executorService.submit(() -> handle(socket));
    }
  }

  private static void handle(Socket socket) {
    try (socket;
         var in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         var out = new PrintWriter(socket.getOutputStream(), true)) {
      String line;
      while ((line = in.readLine()) != null) {
        out.println(line.toUpperCase());
      }
    } catch (IOException e) {
      System.out.println("Was unable to establish or communicate with client socket:" + e.getMessage());
    }
  }
}

In the above code, we haven’t done anything extraordinary. We have just created a ThreadPool using Executors, and whenever a client connects, we put that in a thread to handle that connection. This server will do just fine dealing with many connections simultaneously. However, we are not just happy with the word many. We want to know exactly how many connections it can handle.
The answer lies in how many threads we can spawn. It is certainly not limited by the socket. Because we know modern OS practically can handle millions of open sockets at a time. So let’s rewrite the question again- Can OS handle such an amount of Threads?

Well, the answer is no. Threads are limited and heavy. Let’s find out how much we can write a simple java program.

package com.bazlur;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

public class ThreadCount {
  public static void main(String[] args) {
    AtomicInteger threadCount = new AtomicInteger();
    for (; ; ) {
      Thread thread = new Thread(() -> {
        var count = threadCount.incrementAndGet();
        System.out.println("count = " + count);
        LockSupport.park();
      });
      thread.start();
    }
  }
}

This little program will demonstrate how many threads we can create on a machine. We can conclude that we are limited by the number of threads we can have and that many connections we can handle simultaneously.

Now the bigger question is, is there any alternative solution to it? We cannot have more threads; what else we can do to achieve an outstanding throughput.

Well, the answer is Non-Blocking IO. While doing the IO operation, the relationality is that most threads are just waiting idle, doing nothing. It connects to a client, and the client keeps communicating with a dedicated thread. But the communication is slow, and most of the time, the thread is idle, sitting there doing nothing. Why not use this thread’s idle time to serve other clients. That seems to be a fantastic idea. Let’s do this.

package com.bazlur;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Day018 {
  static Map<SocketChannel, Queue<ByteBuffer>> pendingData = new HashMap<>();

  public static void main(String[] args) throws IOException {
    if (args.length != 1) throw new IllegalArgumentException("Please specify port");
    var port = Integer.parseInt(args[0]);

    ServerSocketChannel socketChannel = ServerSocketChannel.open();
    socketChannel.bind(new InetSocketAddress(port));
    socketChannel.configureBlocking(false);

    var selector = Selector.open();
    socketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
      var select = selector.select();
      if (select == 0) continue;
      var selectionKeys = selector.selectedKeys();
      var iterator = selectionKeys.iterator();

      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        if (key.isValid()) {
          if (key.isAcceptable()) {
            accept(key);
          } else if (key.isReadable()) {
            read(key);
          } else if (key.isWritable()) {
            write(key);
          }
        }
        iterator.remove();
      }
    }
  }

  private static void accept(SelectionKey selectionKey) throws IOException {
    var channel = (ServerSocketChannel) selectionKey.channel();
    var sc = channel.accept(); //never null
    System.out.println("Connected: " + sc);
    pendingData.put(sc, new ConcurrentLinkedDeque<>());
    sc.configureBlocking(false);
    sc.register(selectionKey.selector(), SelectionKey.OP_READ);
  }

  private static void read(SelectionKey selectionKey) throws IOException {
    var channel = (SocketChannel) selectionKey.channel();
    var byteBuffer = ByteBuffer.allocateDirect(80);
    var read = channel.read(byteBuffer);
    if (read == -1) {
      channel.close();
      pendingData.remove(channel);
      return;
    }

    if (read > 0) {
      processBuffer(byteBuffer);
      pendingData.get(channel).add(byteBuffer);
      selectionKey.interestOps(SelectionKey.OP_WRITE);
    }
  }

  private static void write(SelectionKey selectionKey) throws IOException {
    var channel = (SocketChannel) selectionKey.channel();
    var queue = pendingData.getOrDefault(channel, new ArrayDeque<>());
    while (!queue.isEmpty()) {
      var buff = queue.peek();
      if (buff.position() != buff.limit()) {
        buff.flip();
      }
      var written = channel.write(buff);
      if (written == -1) {
        channel.close();
        pendingData.remove(channel);
        return;
      }
      if (buff.hasRemaining()) return;
      queue.remove();
    }
    selectionKey.interestOps(SelectionKey.OP_READ);
  }

  private static void processBuffer(ByteBuffer byteBuffer) {
    byteBuffer.flip();
    StringBuilder line = new StringBuilder();
    line.append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.getDefault())))
            .append("<<server>>: ");

    for (int b = 0; b < byteBuffer.limit(); b++) {
      var b1 = byteBuffer.get(b);
      line.append(Character.toUpperCase((char) b1));
    }
    var s = line.toString();
    var bytes = s.getBytes();
    byteBuffer.clear();
    byteBuffer.put(bytes);
    System.out.println("Executing from: " + Thread.currentThread());
  }
}

The above code seems a lot complex than the earlier and a lot more lines than the previous codebase. However, with the above code, we can achieve whatever we want to achieve. This single-threaded would give us a better throughput. We can even make it even better if we throw a few more threads to it.

We achieved it, but with a complex programming model, which isn’t easy. It has many other problems; one particular is that it will become harder to debug with this complex model.
Is there any other way?

Well, that’s where we see the light at the end of the tunnel, the Project loom. It would enable just writing million virtual threads without making any effort, and the programming model would remain the same, and we will achieve the outstanding throughput we intended to. Let’s see an example-

package com.bazlur;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;

public class Day018_1 {
  public static void main(String[] args) throws IOException {
    if (args.length != 1) throw new IllegalArgumentException("Please specify port");
    var port = Integer.parseInt(args[0]);
    var serverSocket = new ServerSocket(port);
    System.out.println("Started server on port " + port);

    try (var executors = Executors.newVirtualThreadExecutor()) {
      while (true) {
        var socket = serverSocket.accept();
        executors.submit(() -> handle(socket));
      }
    }
  }

  private static void handle(Socket socket) {
    try (socket;
         var in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         var out = new PrintWriter(socket.getOutputStream(), true)) {
      String line;
      while ((line = in.readLine()) != null) {
        out.println(line.toUpperCase());
      }
    } catch (IOException e) {
      System.out.println("Was unable to establish or communicate with client socket:" + e.getMessage());
    }
  }
}

The programming model is the same as we did earlier. We are just using a different Executor, that’s it. But this would enable us to have millions of threads.

Cheers.

for copy/paste pleasure: https://github.com/rokon12/100DaysOfJava/blob/main/src/main/java/com/bazlur/Day018_1.java

                     

Share on:

Author: A N M Bazlur Rahman

Java Champion | Software Engineer | JUG Leader | Book Author | InfoQ & Foojay.IO Editor | Jakarta EE Ambassadors| Helping Java Developers to improve their coding & collaboration skills so that they can meet great people & collaborate

100daysofcode 100daysofjava access advance-java agile algorithm arraylist article bangla-book becoming-expert biginteger book calculator checked checked-exceptions cloning code-readability code-review coding coding-convention collection-framework compact-strings completablefuture concatenation concurrency concurrentmodificationexception concurrentskiplistmap counting countingcollections critical-section daemon-thread data-race data-structure datetime day002 deliberate-practice deserialization design-pattern developers duration execute-around executors export fibonacci file file-copy fork/join-common-pool functional future-java-developers groupby hash-function hashmap history history-of-java how-java-performs-better how-java-works http-client image import inspiration io itext-pdf java java-10 java-11 java-17 java-8 java-9 java-developers java-performance java-programming java-thread java-thread-programming java11 java16 java8 lambda-expression learning learning-and-development linkedlist list local-type-inference localdatetime map methodology microservices nio non-blockingio null-pointer-exception object-cloning optional packaging parallel pass-by-reference pass-by-value pdf performance prime-number programming project-loom race-condition readable-code record refactoring review scheduler scrum serialization serversocket simple-calculator socket software-development softwarearchitecture softwareengineering sorting source-code stack string string-pool stringbuilder swing thread threads tutorial unchecked vector virtual-thread volatile why-java zoneid