From Reactive Streams to Virtual Threads

Adam Warski

Virtual Threads deliver a fast, cheap (in terms of memory & switching speed) threading solution for the JVM. They promise the return of a direct-style, synchronous programming model. But is that enough to challenge the status quo regarding data streaming? Can we have the best of both worlds: the simplicity that Virtual Threads promise and the resilience and safety of Reactive Streams? Let’s find out!

Virtual threads: the story so far and what’s ahead

First, a little bit of history. Virtual Threads were released with Java 21 in September 2023, after 6 years of development. Some final limitations of that implementation will be fully lifted with the release of Java 24 in March 2025. 

Virtual Threads are part of a broader initiative called Project Loom. This initiative also covers related features, specifically the Structured Concurrency and Scoped Values APIs, which are currently in the preview phase.

Virtual Threads are predominantly a feature of the JVM (the virtual machine); however, there are also some API changes in the standard library. Hence, all JVM-based languages can use the new lightweight concurrency model, starting with Java but also Kotlin, Scala, Clojure, and others.

Why introduce Virtual Threads at all?

Let’s explore the motivation for introducing Virtual Threads to understand their relation to reactive streams better.

A long time ago, Java had a synchronous programming model. For example, when implementing an HTTP server, each request was assigned a thread, which read the data, processed it, and wrote the response.

In older Java versions, every java.lang.Thread corresponded to an operating-system-level thread. These “platform threads,” as they are called in Virtual Threads nomenclature, are a heavy-weight resource. First, they take a non-trivial amount of memory; second, creating a thread and switching the CPU’s execution context to it takes a non-trivial amount of time. Hence, a limited number of threads can be created and used (in the order of thousands).

With the rise of the web, increased traffic and vast amounts of data to process, caused this model to run into scaling issues. With threads being a scarce, expensive resource, it just didn’t make sense for them to sit idle and wait synchronously for responses from the database or other web services.

That’s how thread pools, ExecutionContexts and the like came to life, with the aim to utilize threads better. Instead of writing code in a direct style, we shifted to working with various flavors of Futures. For example, a blocking, synchronous version of some car-selling business logic might look as follows:

var person = db.findById(id);
if (person.hasLicense()) {
  bankingService.transferFunds(person, dealership, amount);
  dealerService.reserveCar(person);
}

The same code, but utilizing threads more efficiently, using CompletableFutures, looks rather different:

db.findById(id).thenCompose(person -> {
  if (person.hasLicense()) {
    return bankingService.transferFunds(person, dealership, amount)
      .thenCompose(transferResult -> dealerService.reserveCar(person));
  }
  return CompletableFuture.completedFuture(null);
})

Quite clearly, technical concerns (thread utilization) have taken priority over the code’s readability. Project Loom set out to maintain the better thread utilization we achieved with Futures while retaining the readability and simplicity of the direct-style code.

More specifically, Project Loom aims to solve three problems that have been introduced with the shift to asynchronous code as above:

  • Syntax: use direct-style, readable, synchronous syntax as the default way of using Java; this includes using Java’s built-in control flow constructs instead of library code
  • Virality: if we call a method that returns a Future, our method should probably return a Future as well (as blocking until the future is ready defeats its purpose!); this is also known as the function coloring problem
  • Lost context: exceptions reported when using Futures often capture only part of the full stack trace, since the last future transformation; this makes debugging difficult, if not impossible at times

And it did so successfully!

Futures and Virtual Threads: under the covers

It might also be beneficial to understand how futures, and Virtual Threads, achieve better thread utilization.

Behind each executor is a pool of threads, which are created upfront. These threads run tasks from a task queue. When code is submitted to the executor service, a Future value is returned to the user, which will be completed once the submitted code is (eventually) run.

Whichever thread is free picks up and executes a task from the queue. Running such code might involve submitting more tasks, transforming other futures, and returning the futures created that way.

That’s also precisely how Virtual Threads work under the covers. However, while the executors above are implemented at the library level, Project Loom shifts that and implements it at the VM level.

The JVM transparently manages a pool of threads for submitting and executing tasks. At the same time, the Thread API is kept intact: an instance might now represent a platform or a virtual thread.

However, the Virtual-to-Platform thread scheduler is not Project Loom’s only contribution to the JVM. The second crucial component is retrofitting all blocking APIs to be Virtual-Thread aware. Any time you call a blocking method, such as acquiring a semaphore or a lock, sending data over a socket, or reading from an input stream, the underlying platform thread is not blocked.

Instead, only the virtual thread is blocked and put aside until a permit/lock is available or the operation’s result is ready. The platform thread can then run other virtual threads that are ready for execution.

Some exceptions to the above retrofitting (specifically, synchronized methods) will be lifted in Java 24.

How did we arrive at Reactive Streams?

Let’s now shift our attention to Reactive Streams. The initiative to develop a standard for asynchronous, non-blocking stream processing started in late 2013. The final version was released in 2015 and included as part of Java 9 in 2017.

Reactive streams aim to “govern the exchange of stream data across an asynchronous boundary.” This exchange—or rather, a series of exchanges—should be done within bounded memory.

The JVM interfaces that are part of reactive streams (Publisher, Subscriber, and Subscription) are quite low-level and are not meant to be used directly by end-users. Instead, libraries that implement the reactive streams standard should be used.

Such libraries provide high-level interfaces for stream processing and tools for declaratively managing concurrency, interfacing with I/O operations, and safely handling errors.

Examples of reactive streams libraries include Akka Streams, Vert.x, Helidon, RxJava, Reactor (used in Spring) and others.

What problems do Reactive Streams solve?

As mentioned above, Reactive Streams allow the processing of large amounts of data in a streaming way. Libraries implementing reactive streams are often based on Futures or similar concepts, leveraging thread pooling and efficiently utilizing operating system resources as described above.

There are two classes of problems that are inherently hard, and reactive streams excel at providing robust and readable solutions for them.

The first area is concurrency. Conventional wisdom says that writing concurrent code using locks is hard and error-prone. Declarative concurrency helps, as the end-user only has to describe the “what,” not the “how.” Details of the (concurrent) execution are delegated to the runtime. 

The second area is error management. Errors can happen in the most unexpected places, and ensuring proper resource management in case of an exception is often the centerpiece of many system implementations (or a source of bugs!). Reactive streams also help here, providing facilities for describing how errors should be handled, managing the lifecycle of resources safely, and recovering from errors, if possible.

Finally, a problem that reactive streams set to address is ensuring that data is processed within bounded memory. This is implemented using backpressure. An upstream component can only send data for further processing once it receives demand from downstream components. This bi-directional flow, data from upstream to downstream and demand from downstream to upstream, ensures that there is always a bounded number of elements that are being processed at any given time.

Implementing simple streams

While delivering a fast and powerful solution for data streaming, reactive streams implementations also suffer from the problems we have identified before: virality of Futures, lost context in case of errors, and high syntax overhead due to the need to compose asynchronous computations, not simple statements.

Can we leverage what Project Loom delivered, keeping the benefits of Reactive Streams? It turns out that yes, it is possible. We’ve set out to create such an implementation, to verify our approach, taking inspiration from Go and Kotlin.

The code lives in a project called Jox, and is available under the Apache2 OSS license. We’ll briefly describe its architecture, but if you’d like to find out how direct-style “reactive” streaming feels in practice, I encourage you to try the library!

To implement a “reactive” data processing library in the spirit of Project Loom, we want to use as many constructs that are built into Java as possible. That is, ifs, fors, whiles, and try-catch-finally for error handling.

We also want to use direct syntax to describe the logic in the data processing stages. Operations should be composed using ;, without needing to be wrapped in a container.

A single data transformation pipeline processing stage is represented using a FlowStage class. A stage can be run, provided a sink to which data can be emitted:

public interface FlowStage<T> {
  void run(FlowEmit<T> emit) throws Exception;
}

public interface FlowEmit<T> {
  void apply(T t) throws Exception;
}

We also introduce a Flow wrapper class, which contains the last stage defined so far and a number of helper operations that we’ll use to build the data processing pipeline:

public class Flow<T> {
  final FlowStage<T> last;
  // ...
}

It turns out it’s all we need to describe quite complex flows! For example, here’s a factory method which produces an infinite stream of values:

public class Flows {
  public static <T> Flow<T> iterate(T zero, Function<T, T> nextFn) {
    return new Flow(emit -> {
      T current = zero;
      while (true) {
        emit.apply(current);
        current = nextFn.apply(current);
      }
    });
  }
}

The Flow instance created by the method above only describes what should happen—no data is processed at its invocation. To start processing, we need to provide a FlowEmit instance to the last stage, which might vary depending on how we want to consume this stream.

But before we get to consumption, you might notice that we’re already following some of Loom’s principles: using Java’s built-in control flow while construct and direct-style syntax.

Once we have that, here’s a method that runs the stream and creates a list of all elements processed:

public class Flow<T> {
  public List<T> runToList() throws Exception {
    List<T> result = new ArrayList<>();
    last.run(result::add);
    return result;
  }
}

Looks simple enough! We also need some stream-transformation methods. map is a popular one:

public class Flow<T> {
  public <U> Flow<U> map(ThrowingFunction<T, U> mappingFunction) {
    return new Flow<>(emit -> {
      last.run(t -> emit.apply(mappingFunction.apply(t)));
    });
  }
}

We simply emit the result of applying the mapping function to whatever the previous (then-last) stage produced! Similarly, we can implement methods such as filter, take, mapStateful, sliding, runFold, runDrain, etc. With these tools in place, we can now build synchronous, single-threaded data processing pipelines:

List<Integer> result = Flows
  .iterate(1, i -> i + 1)
  .map(i -> i*2)
  .filter(i -> i%3 == 2)
  .take(10)
  .runToList();

How does this work? As mentioned before, when we invoke the iterate, take, and map methods, we only create a description of the flow to run. Flow is a lazily-evaluated description of how to process a stream. Each new Flow being constructed contains a reference to a FlowStage, which encloses over the previous stage, and so on.

Only when we invoke .runToList is the data actually processed. This method creates a sink—a FlowEmit instance—and passes it to the previous stage. Here, the sink simply adds items to a list. The previous stage then passes a modified sink—another FlowEmit instance—to the run method of the previous stage. 

Ultimately, the producing stage calls FlowEmit.emit. Each invocation of emit sends the data through the entire pipeline. Hence, each emit call only completes once that element has been fully processed (or discarded). We thus limit processing to at most one element at a time (for now)—trivially meeting the memory-boundedness requirement.

We do have a nice stream-processing API, but so far, we could have just used Java’s streams to a similar effect:

List<Integer> result = Stream
  .iterate(1, i -> i + 1)
  .map(i -> i * 2)
  .filter(i -> i % 3 == 2)
  .limit(10)
  .collect(Collectors.toList());

It’s when we want to introduce concurrency, multi-Flow operations, and I/O when it gets interesting.

Implementing asynchronous streams

As we already mentioned, concurrency is inherently hard. Using locks all too easily leads to deadlocks, and trying to run things in parallel often ends with a once-in-a-while race condition. That’s why the best concurrency is when somebody else manages it for us. Then, it’s no longer our concern.

Reactive streams have proven great in providing tools to avoid explicit concurrency by including a number of declarative operators. Can we implement something similar as well?

It turns out that—yes, we can! First, we’ll need low-level primitives to communicate and transfer data between concurrently running virtual threads. 

We might try using Java’s blocking queues, however we’ll quickly run into some limitations. Luckily, prior art is available, giving us just the tools we need. Specifically, the Go programming language has shown that using channels to communicate between goroutines (which resemble Virtual Threads in many aspects!) is a convenient and powerful way to solve concurrency problems.

Moreover, Kotlin has successfully implemented performant Go-like channels on the JVM, albeit using coroutines. Luckily, they published their algorithm in a paper, paving the way for a Virtual-Thread-based implementation, now available as part of Jox.

Compared to a blocking queue, a Channel has two major differences:

  • It can be marked as done—meaning no more elements will be sent, or put in an “error” state, which discards any buffered element short-circuits any send/receive operations.
  • A select operation is available, which allows selecting exactly one value to be received from (or sent to) a list of channels.

The second infrastructural component, which will be crucial when discussing error handling, is structured concurrency scopes. Discussing structured concurrency would be an article in itself; hence, we’ll provide a very brief overview.

There are several ways to implement structured concurrency, some of which are explored in JEP-453. This Java feature is currently in preview and has undergone a significant revision between Java 22 and Java 23. The community hopes it will be finalized in Java 25—we’ll see!

Jox provides an alternative structured concurrency API (although behind the scenes, using the one from the JEP), which focuses on safety and simplicity of use. The base unit is a scope within which forks—Virtual Threads—can be spawned:

var result = supervised(scope -> {

  var f1 = scope.fork(() -> {
    Thread.sleep(500);
    return 5;
  });

  var f2 = scope.fork(() -> {
    Thread.sleep(1000);
    return 6;
  });

  return f1.join() + f2.join();
});

The core property of a structured concurrency scope is a guarantee that once the supervised scope completes, all forks started within it have finished either successfully, with an exception, or due to an interruption. Hence, from the outside world’s point of view, threading, and concurrency become an implementation detail!

The fact that no threads can outlive the scope is crucial in case of an error in one of the forks. The default behavior is for the scope to shut down, interrupting any other forks still running and waiting until they are complete.

We can implement concurrent streaming operators now that we have those two tools. Let’s start by merging two streams. Just as before, we are creating a lazily evaluated flow description. Once the merged flow is run, it creates a structured concurrency scope and, within that scope, two forks that run the two child flows. Each flow produces its output to a channel.

We can then use select to receive values from the channel that has data available (some simplifications are in place regarding handling stream completion):

public class Flow<T> {
  public Flow<T> merge(Flow<T> other) {
    return new Flow<>(emit -> {
      supervised(scope -> {
        Channel<T> c1 = this.runToChannel(scope);
        Channel<T> c2 = other.runToChannel(scope);

        boolean continueLoop = true;
        while (continueLoop) {
          switch (selectOrClosed(c1.receiveClause(), c2.receiveClause())) {
            case ChannelDone _ -> continueLoop = false;
            case ChannelError error -> throw error.toException();
            case Object r -> emit.apply((T) r);
          }
        }

      return null;
      });
    });
  }
}

Note that we are once again using Java’s built-in control flow constructs (while), Java’s built-in pattern matching (switch), and Java’s built-in error handling (throw).

In a similar way (albeit with more code), we can implement streaming operators such as mapPar, zip, buffer, flatMap, grouped, etc. These operators take inspiration from the features available in Reactive Streams libraries such as Akka Streams or Reactor.

Moreover, we now have the tools to interface with the external world, creating byte streams from files, parsing them as lines, or running to obtain an InputStream.

To conclude, here’s a longer example using some of the functionalities mentioned above:

Flows.unfold(0, i -> Optional.of(Map.entry(i + 1, i + 1)))
  .throttle(1, Duration.ofSeconds(1))
  .mapPar(4, i -> {
    Thread.sleep(5000);
    var j = i * 3;
    return j + 1;
  })
  .filter(i -> i % 2 == 0)
  .zip(Flows.repeat("x"))
  .runForeach(System.out::println);

Backpressure

This all sounds great, but is our implementation on par when it comes to resiliency qualities of reactive streams, such as guaranteeing that memory usage is bounded? After all, in the implementations that we’ve inspected, backpressure propagation is nowhere to be seen?

The truth is that backpressure is there; however, it’s implicit; there’s no need to create explicit code paths to manage it.

Backpressure arises from limited buffers (channels, queues) and virtual thread blocking. That is, if some stage runs in a background fork (thus asynchronously), and the downstream can’t keep up with processing the data, eventually the channel’s buffer will fill up, and the .send invocation will block the thread—naturally pausing production in the upstream stage.

Hence, backpressure is provided by the fact that we use thread-blocking operations.

Error handling

The final facet of virtual threads we must consider is error handling. In the case of a simple, synchronous stream, error handling is pretty straightforward: any exception thrown by a processing stage will propagate through the .emit call chain and be rethrown by whatever consumption method we use (such as .runForeach or .runDiscard). 

Java’s error-handling mechanisms are sufficient here. Convenience methods such as Flow.onError or Flow.onComplete simply create stages that use try-catch or try-finally.

The situation is only slightly different for asynchronous streams, which spawn multiple forks (threads) and run flows in the background. That is thanks to the crucial structured concurrency principle we’ve discussed: if any fork throws an exception, the whole scope is shut down, and the exception (appropriately wrapped) is rethrown, but only when all forks are complete after being interrupted.

Hence, from the point of view of other flow stages, it makes no difference whether some other processing stage is asynchronous or not. Threading becomes an implementation detail (although an important one).

Performance

Functionality-wise, we can replicate at least large parts of reactive stream libraries. But what about performance?

We have yet to design benchmarks for both approaches to streaming, so we have only partial data at our disposal. First of all, we’ve benchmarked, profiled, and optimized the implementation of the channel that we’re using for communicating between asynchronously running stages; as a result, it’s on par with Java’s concurrent data structures and Kotlin’s implementation, slightly lagging behind Go’s.

Secondly, we’ve looked at the performance of Loom-based and Reactive-libraries-based HTTP servers. In our tests, we didn’t notice any significant differences.

That shouldn’t come as a surprise: the core idea of using a pool of OS-level platform threads to execute multiple “virtual” threads (coroutines, futures, etc.) is the same in the “reactive” and “Loom” approaches.

Summary

Can you implement a Virtual-Thread-based library with feature parity of Reactive Streams implementations? Yes. Jox is a case in point.

Will Virtual Threads replace Reactive Streams? Partially. The Reactive Streams specification is still very useful as an interoperability standard. I suspect that it will continue to play an essential role in implementing low-level, performant integration code. For example, Jox implements the reactive streams Publisher interface to integrate with any third-party library that is reactive-streams-aware.

However, when it comes to writing data processing pipelines, the popularity of reactive stream libraries will most likely diminish. The Virtual Threads model is simpler, both when writing and reading code. It allows using Java’s control flow constructs and error model (exceptions) and avoiding function coloring (Future virality). It also doesn’t need a library-level runtime, which manages the pool of threads on which execution happens, instead shifting that concern to the JVM.

Libraries such as Akka Streams or Reactor still have an edge when it comes to maturity, API versatility, and integration availability. But the Virtual Threads ecosystem will eventually catch up and, if my predictions are correct, take over.

Virtual Threads provide a robust, performant platform that allows us to build applications leveraging the knowledge gained from operating “reactive” systems and expanding upon it, providing a more programmer-friendly way of writing resilient systems. 

In a way, the emergence of Virtual Threads is the ultimate validation of the ideas behind Reactive Streams, which proved so valuable that they’ve been incorporated into the JVM itself.

Total
0
Shares
Previous Post

Move Fast, Break Laws: AI, Open Source and Devs (Part 2)

Next Post

The quick check for your software – the Software Coma Scale (SCS)

Related Posts