Von Reactive Streams zu Virtual Threads

Adam Warski

Virtual Threads bieten eine schnelle und ressourcenschonende Threading-Lösung für die JVM – sowohl im Hinblick auf Speicherverbrauch als auch auf Umschaltgeschwindigkeit. Sie versprechen die Rückkehr zu einem direkten, synchronen Programmiermodell. Doch reicht das aus, um den Status quo im Bereich Daten-Streaming infrage zu stellen? Können wir das Beste aus beiden Welten haben: die Einfachheit, die Virtual Threads versprechen, und gleichzeitig die Robustheit und Sicherheit von Reactive Streams? Finden wir es heraus!

Virtual Threads: Der bisherige Weg und ein Blick in die Zukunft

Zunächst ein kurzer Rückblick: Virtual Threads wurden im September 2023 mit Java 21 veröffentlicht – nach sechs Jahren Entwicklungszeit. Einige verbleibende Einschränkungen dieser Implementierung werden mit dem Release von Java 24 im März 2025 vollständig aufgehoben.

Virtual Threads sind Teil einer größeren Initiative namens Project Loom. Diese Initiative umfasst auch verwandte Funktionen, insbesondere die APIs für Structured Concurrency und Scoped Values, die sich derzeit in der Vorschauphase befinden.

Virtual Threads sind in erster Linie ein Feature der JVM (Java Virtual Machine). Dennoch gibt es auch einige Änderungen an der Standardbibliothek. Dadurch können alle JVM-basierten Sprachen dieses neue, leichtgewichtige Nebenläufigkeitsmodell nutzen – angefangen bei Java, aber auch Kotlin, Scala, Clojure und andere.

Warum wurden Virtual Threads überhaupt eingeführt?

Um die Beziehung zwischen Virtual Threads und Reactive Streams besser zu verstehen, lohnt sich ein Blick auf die Motivation hinter ihrer Einführung.

Früher folgte Java einem synchronen Programmiermodell. Zum Beispiel erhielt bei der Implementierung eines HTTP-Servers jede eingehende Anfrage ihren eigenen Thread, der die Daten las, verarbeitete und anschließend die Antwort zurückschickte.

In älteren Java-Versionen entsprach jedes Objekt vom Typ java.lang.Thread einem Betriebssystem-Thread. Diese sogenannten Plattform-Threads (in der Terminologie der Virtual Threads) sind jedoch schwergewichtige Ressourcen. Zum einen benötigen sie eine nicht unerhebliche Menge an Speicher, zum anderen ist sowohl das Erzeugen eines Threads als auch das Umschalten des CPU-Kontexts zeitintensiv. Daher konnten nur eine begrenzte Anzahl solcher Threads erstellt und genutzt werden – meist nur einige Tausend.

Mit dem Aufstieg des Webs, zunehmendem Datenverkehr und riesigen Datenmengen stieß dieses Modell schnell an seine Skalierungsgrenzen. Threads wurden zu einer knappen und teuren Ressource – und es ergab schlicht keinen Sinn mehr, sie untätig auf Antworten von Datenbanken oder Webdiensten warten zu lassen.

Aus diesem Grund entstanden Thread-Pools, ExecutionContexts und ähnliche Mechanismen, um Threads effizienter zu nutzen. Statt in direktem Stil zu programmieren, wechselte man zu verschiedenen Varianten von Futures. Zum Beispiel könnte eine blockierende, synchrone Implementierung einer Auto-Verkauf-Logik folgendermaßen aussehen:

var person = db.findById(id);
if (person.hasLicense()) {
  bankingService.transferFunds(person, dealership, amount);
  dealerService.reserveCar(person);
}

Die gleiche Logik, jedoch effizienter mit Threads durch den Einsatz von CompletableFutures, sieht deutlich anders aus:

db.findById(id).thenCompose(person -> {
  if (person.hasLicense()) {
    return bankingService.transferFunds(person, dealership, amount)
      .thenCompose(transferResult -> dealerService.reserveCar(person));
  }
  return CompletableFuture.completedFuture(null);
})

Ganz offensichtlich haben technische Überlegungen – insbesondere die effiziente Nutzung von Threads – Vorrang vor der Lesbarkeit des Codes erhalten. Project Loom verfolgt das Ziel, die durch den Einsatz von Futures erzielte bessere Thread-Ausnutzung beizubehalten und gleichzeitig die Lesbarkeit und Einfachheit eines direkten Programmierstils zurückzubringen.

Konkret adressiert Project Loom drei Probleme, die mit dem Übergang zur asynchronen Programmierung entstanden sind:

  • Syntax: Java soll wieder bevorzugt im direkt lesbaren, synchronen Stil genutzt werden – inklusive der eingebauten Kontrollflusskonstrukte statt zusätzlicher Bibliotheken.
  • Viralität: Wird eine Methode aufgerufen, die ein Future zurückliefert, muss oft auch die eigene Methode ein Future zurückgeben (denn blockierendes Warten würde den Vorteil des Futures zunichtemachen); dieses Problem ist auch als Function Coloring bekannt.
  • Verlorener Kontext: Bei Fehlern im Zusammenspiel mit Futures enthält der Stacktrace häufig nur den letzten Teil der Kette, was das Debugging erschwert oder gar unmöglich macht.

Und genau diese Probleme konnte Project Loom erfolgreich lösen!

Futures und Virtual Threads: ein Blick unter die Haube

Es lohnt sich auch, einen genaueren Blick darauf zu werfen, wie sowohl Futures als auch Virtual Threads eine bessere Ausnutzung von Threads ermöglichen.

Hinter jedem Executor verbirgt sich ein Thread-Pool, dessen Threads vorab erstellt werden. Diese Threads holen sich Aufgaben aus einer Warteschlange (Task Queue) und führen sie aus. Wenn Code an einen Executor übergeben wird, erhält man ein Future-Objekt zurück – dieses wird abgeschlossen, sobald die übergebene Aufgabe ausgeführt wurde.

Der nächste freie Thread übernimmt also eine Aufgabe aus der Warteschlange. Beim Ausführen dieses Codes können weitere Aufgaben entstehen, neue Futures erzeugt und miteinander verkettet werden.

Genau so funktionieren Virtual Threads – allerdings mit einem wichtigen Unterschied: Während klassische Executor-Services auf Bibliotheksebene implementiert sind, wird das Scheduling der Virtual Threads auf VM-Ebene realisiert – durch Project Loom.

Die JVM verwaltet also im Hintergrund automatisch einen Pool von Plattform-Threads, die die Aufgaben ausführen. Gleichzeitig bleibt die Thread-API erhalten: Eine Instanz kann entweder einen Plattform-Thread oder einen Virtual Thread darstellen.

Allerdings ist der Scheduler, der Virtual Threads auf Plattform-Threads abbildet, nicht der einzige Beitrag von Project Loom zur JVM. Ein weiterer entscheidender Bestandteil ist die nachträgliche Anpassung aller blockierenden APIs, damit sie Virtual-Thread-fähig sind.

Immer wenn Sie eine blockierende Methode aufrufen – etwa beim Erwerb eines Semaphors oder Locks, beim Senden von Daten über einen Socket oder beim Lesen eines InputStreams – wird nicht der zugrunde liegende Plattform-Thread blockiert.

Stattdessen wird nur der Virtual Thread blockiert und solange „beiseitegelegt“, bis z. B. ein Permit oder Lock verfügbar ist oder das Ergebnis der Operation bereitsteht. Der Plattform-Thread kann währenddessen andere Virtual Threads weiter ausführen, die bereit zur Ausführung sind.

Einige Ausnahmen von dieser Anpassung (insbesondere synchronized-Methoden) werden mit dem Release von Java 24 aufgehoben.

Wie kamen wir zu Reactive Streams?

Wenden wir uns nun den Reactive Streams zu. Die Initiative zur Entwicklung eines Standards für asynchrone, nicht-blockierende Stream-Verarbeitung begann Ende 2013. Die finale Version wurde 2015 veröffentlicht und 2017 mit Java 9 offiziell in die Sprache aufgenommen.

Reactive Streams verfolgen das Ziel, den Austausch von Datenströmen über asynchrone Grenzen hinweg zu regeln. Dieser Austausch – oder vielmehr eine Reihe von Exchanges – soll dabei stets innerhalb begrenzter Speicherressourcen erfolgen.

Die JVM-Interfaces, die Teil des Reactive-Streams-Standards sind (Publisher, Subscriber und Subscription), befinden sich auf sehr niedriger Abstraktionsebene und sind nicht für die direkte Nutzung durch Endanwender vorgesehen. Stattdessen sollten Bibliotheken verwendet werden, die den Standard implementieren.

Solche Bibliotheken bieten hochgradig abstrahierte Schnittstellen zur Stream-Verarbeitung sowie Werkzeuge für deklaratives Concurrency-Management, die Anbindung an I/O-Operationen und eine sichere Fehlerbehandlung.

Beispiele für Reactive-Streams-Bibliotheken sind Akka Streams, Vert.x, Helidon, RxJava, Reactor (used in Spring) und andere.

Welche Probleme lösen Reactive Streams?

Wie bereits erwähnt, ermöglichen Reactive Streams die Verarbeitung großer Datenmengen im Streaming-Modus. Bibliotheken, die den Standard umsetzen, basieren häufig auf Futures oder ähnlichen Konzepten und nutzen Thread-Pooling sowie Betriebssystemressourcen besonders effizient – wie zuvor beschrieben.

Grundsätzlich gibt es zwei Problemfelder, die als besonders komplex gelten – und genau hier bieten Reactive Streams robuste und gleichzeitig lesbare Lösungen:

  1. Nebenläufigkeit (Concurrency):
    Es gilt als allgemein bekannt, dass das Schreiben nebenläufiger Programme mit Locks schwierig und fehleranfällig ist. Der deklarative Ansatz von Reactive Streams hilft dabei, da Entwickler*innen lediglich beschreiben müssen, was passieren soll – wie es geschieht, übernimmt die Runtime. Die Details der Ausführung (einschließlich Thread-Management) werden vollständig abstrahiert.
  2. Fehlerbehandlung:
    Fehler können an den unerwartetsten Stellen auftreten – und eine korrekte Ressourcenverwaltung im Ausnahmefall ist oft das Herzstück komplexer Systeme (oder die Quelle zahlreicher Bugs). Reactive Streams bieten gezielte Mechanismen zur Beschreibung des Fehlerverhaltens, zur sicheren Verwaltung von Ressourcenlebenszyklen und – wo möglich – zur Wiederherstellung nach einem Fehler.

Ein weiteres zentrales Problem, das Reactive Streams adressieren, ist die Verarbeitung von Daten innerhalb begrenzter Speicherressourcen. Dies wird durch das Konzept des Backpressure umgesetzt. Eine Upstream-Komponente darf Daten nur dann zur Weiterverarbeitung senden, wenn sie von der Downstream-Komponente eine entsprechende Anforderung (Demand) erhält. Dieser bidirektionale Datenfluss – Daten von Upstream nach Downstream und Demand von Downstream nach Upstream – stellt sicher, dass stets nur eine begrenzte Anzahl an Elementen gleichzeitig verarbeitet wird.

Einfache Streams implementieren

Trotz ihrer leistungsstarken Streaming-Fähigkeiten bringen Reactive-Streams-Implementierungen auch die zuvor genannten Herausforderungen mit sich: die Viralität von Futures, der verlorene Kontext bei Fehlern und ein hoher Syntax-Overhead durch die Komposition asynchroner Operationen statt einfacher Anweisungen.

Doch können wir die Vorteile von Project Loom nutzen, ohne die Stärken von Reactive Streams aufzugeben? Die Antwort lautet: Ja, das ist möglich! Genau das war unser Ziel bei der Entwicklung einer passenden Implementierung – inspiriert von Go und Kotlin.

Der Quellcode befindet sich im Projekt Jox, das unter der Apache2 Open-Source-Lizenz veröffentlicht wurde. Im Folgenden geben wir einen kurzen Überblick über die Architektur. Wer jedoch herausfinden möchte, wie sich “reaktives” Streaming im Direct-Style anfühlt, sollte die Bibliothek unbedingt selbst ausprobieren!

Ziel war es, eine Datenverarbeitungsbibliothek im Geiste von Project Loom zu entwickeln, die möglichst viele Java-eigene Konstrukte verwendet – etwa if, for, while sowie try-catch-finally zur Fehlerbehandlung.

Auch die Beschreibung der Logik innerhalb der einzelnen Verarbeitungsstufen soll in direkter Syntax erfolgen. Die Operationen werden einfach mit ; verknüpft, ohne dass sie in zusätzliche Container eingeschlossen werden müssen.

Eine einzelne Verarbeitungsstufe innerhalb einer Datenpipeline wird durch die Klasse FlowStage dargestellt. Eine solche Stage kann mit einem Ziel (Sink), an das Daten übergeben werden, ausgeführt werden:

public interface FlowStage<T> {
  void run(FlowEmit<T> emit) throws Exception;
}

public interface FlowEmit<T> {
  void apply(T t) throws Exception;
}

Zusätzlich führen wir eine Flow-Wrapper-Klasse ein, die die zuletzt definierte Verarbeitungsstufe sowie eine Reihe von Hilfsmethoden enthält, mit denen wir unsere Datenverarbeitungspipeline aufbauen können:

public class Flow<T> {
  final FlowStage<T> last;
  // ...
}

Es zeigt sich, dass genau diese einfachen Bausteine ausreichen, um selbst komplexe Datenflüsse zu beschreiben! Hier zum Beispiel eine Factory-Methode, die einen unendlichen Werte-Stream erzeugt:

public class Flows {
  public static <T> Flow<T> iterate(T zero, Function<T, T> nextFn) {
    return new Flow(emit -> {
      T current = zero;
      while (true) {
        emit.apply(current);
        current = nextFn.apply(current);
      }
    });
  }
}

Die durch diese Methode erzeugte Flow-Instanz beschreibt lediglich, was geschehen soll – beim Aufruf wird noch keine Verarbeitung ausgeführt. Um die Verarbeitung zu starten, muss eine FlowEmit-Instanz an die letzte Stage übergeben werden – je nach Anwendungsfall und gewünschter Art der Konsumierung.

Bevor wir zur konkreten Nutzung kommen, fällt bereits auf, dass wir einige zentrale Prinzipien von Project Loom umsetzen: wir verwenden Java’s eingebaute Kontrollflussstruktur while sowie eine direkte, lesbare Syntax zur Beschreibung der Logik.

Sobald dies steht, kann eine Methode wie die folgende verwendet werden, um den Stream auszuführen und alle verarbeiteten Elemente in einer Liste zu sammeln:

public class Flow<T> {
  public List<T> runToList() throws Exception {
    List<T> result = new ArrayList<>();
    last.run(result::add);
    return result;
  }
}

Sieht ziemlich einfach aus, oder? Natürlich benötigen wir auch Methoden zur Transformation von Streams, eine der beliebtesten ist map:

public class Flow<T> {
  public <U> Flow<U> map(ThrowingFunction<T, U> mappingFunction) {
    return new Flow<>(emit -> {
      last.run(t -> emit.apply(mappingFunction.apply(t)));
    });
  }
}

Hier wird einfach das Ergebnis der Mapping-Funktion ausgegeben – basierend auf den Werten, die von der vorherigen (nun “letzten”) Verarbeitungsstufe geliefert wurden. Auf ähnliche Weise lassen sich auch Methoden wie filter, take, mapStateful, sliding, runFold, runDrain usw. implementieren. Mit diesen Bausteinen können wir nun synchrone, single-threaded Datenverarbeitungspipelines aufbauen:

List<Integer> result = Flows
  .iterate(1, i -> i + 1)
  .map(i -> i*2)
  .filter(i -> i%3 == 2)
  .take(10)
  .runToList();

Wie funktioniert das? Wie bereits erwähnt, erstellen Aufrufe von Methoden wie iterate, take und map lediglich eine Beschreibung des Datenflusses. Flow ist eine lazy-evaluierte Beschreibung, wie ein Stream verarbeitet werden soll.

Jede neu erzeugte Flow-Instanz enthält eine Referenz auf eine FlowStage, die wiederum auf die vorherige Stufe verweist – so entsteht eine Kette.

Erst beim Aufruf von .runToList() wird die tatsächliche Verarbeitung gestartet. Diese Methode erstellt ein Ziel (Sink) – eine FlowEmit-Instanz – und übergibt sie an die vorherige Stufe. In diesem Fall fügt das Sink die Elemente einfach einer Liste hinzu. Die vorherige Stufe übergibt wiederum ein angepasstes Sink an ihre eigene run-Methode – und so weiter, bis die gesamte Pipeline durchlaufen ist.

Letztlich ruft die produzierende Stufe FlowEmit.emit auf. Jeder einzelne emit-Aufruf schleust die Daten durch die gesamte Pipeline. Das bedeutet: Ein emit-Aufruf wird erst dann abgeschlossen, wenn das jeweilige Element vollständig verarbeitet (oder verworfen) wurde. Dadurch ist garantiert, dass immer nur ein Element gleichzeitig verarbeitet wird – zumindest vorerst – und die Anforderung der Speicherbegrenzung (memory-boundedness) trivial erfüllt ist.

Wir verfügen damit über ein elegantes, streambasiertes API – doch bis zu diesem Punkt hätten wir ein ähnliches Ergebnis auch mit den Stream-Gatherern von Java erzielen können:

List<Integer> result = Stream
  .iterate(1, i -> i + 1)
  .map(i -> i * 2)
  .filter(i -> i % 3 == 2)
  .limit(10)
  .collect(Collectors.toList());

Spannend wird es aber erst dann, wenn wir Nebenläufigkeit, mehrere parallele Flows oder I/O-Verarbeitung ins Spiel bringen.

Asynchrone Streams implementieren

Wie bereits erwähnt, ist Nebenläufigkeit von Natur aus komplex. Der Einsatz von Locks führt nur allzu leicht zu Deadlocks, und Versuche, Abläufe parallel auszuführen, enden oft in schwer reproduzierbaren Race Conditions. Aus diesem Grund ist die beste Art der Nebenläufigkeit die, bei der sich jemand anderes darum kümmert und dann müssen Sie sich nicht mehr damit beschäftigen.

Reactive Streams haben sich als äußerst nützlich erwiesen, wenn es darum geht, explizite Nebenläufigkeit zu vermeiden – unter anderem durch eine Vielzahl deklarativer Operatoren. Können wir etwas Ähnliches auch selbst umsetzen?

Die Antwort lautet: Ja, das ist möglich! Zunächst benötigen wir jedoch Low-Level-Primitiven, um Daten zwischen gleichzeitig laufenden Virtual Threads zu übertragen.

Man könnte versucht sein, Java’s blockierende Queues zu verwenden, stößt dabei aber schnell auf gewisse Einschränkungen. Zum Glück gibt es bereits bewährte Konzepte, die genau die Werkzeuge liefern, die wir brauchen. Besonders die Programmiersprache Go hat gezeigt, dass Channels ein äußerst praktischer und leistungsfähiger Weg sind, um zwischen Goroutines (die Virtual Threads in vielerlei Hinsicht ähneln!) zu kommunizieren und Nebenläufigkeit elegant zu lösen.

Auch Kotlin hat erfolgreich performante, Go-ähnliche Channels auf der JVM implementiert – basierend auf Coroutines. Glücklicherweise haben sie den Algorithmus in einem Fachartikel veröffentlicht – das ebnete den Weg für eine Virtual-Thread-basierte Implementierung, die nun Teil von Jox ist.

Im Vergleich zu blockierenden Queues bieten Channels zwei entscheidende Vorteile:

  • Ein Channel kann als done markiert werden – also als abgeschlossen, sodass keine weiteren Elemente mehr gesendet werden. Alternativ kann er in einen Fehlerzustand versetzt werden, wodurch alle gepufferten Elemente verworfen und laufende Sende-/Empfangsoperationen abgebrochen werden.
  • Eine select-Operation erlaubt es, genau einen Wert aus einer Liste von Channels zu empfangen (oder zu senden) – eine Funktionalität, die sich in komplexen parallelen Workflows als äußerst hilfreich erweist.

Der zweite grundlegende Baustein – besonders wichtig im Zusammenhang mit Fehlerbehandlung – ist das Konzept der Structured Concurrency. Eine vollständige Einführung würde den Rahmen dieses Artikels sprengen, daher hier nur ein kurzer Überblick.

Es gibt mehrere Möglichkeiten, strukturierte Nebenläufigkeit umzusetzen – einige davon werden in JEP 453 diskutiert. Dieses Java-Feature befindet sich derzeit in der Vorschauphase und wurde zwischen Java 22 und Java 23 grundlegend überarbeitet. Die Community hofft auf eine finale Version in Java 25 – wir dürfen gespannt sein!

Jox bietet eine alternative API für strukturierte Nebenläufigkeit (die intern allerdings auf der JEP-Implementierung basiert), mit besonderem Fokus auf Sicherheit und einfache Anwendbarkeit. Die zentrale Einheit ist ein Scope, innerhalb dessen parallele Tasks – also Virtual Threads – gestartet werden können:

var result = supervised(scope -> {

  var f1 = scope.fork(() -> {
    Thread.sleep(500);
    return 5;
  });

  var f2 = scope.fork(() -> {
    Thread.sleep(1000);
    return 6;
  });

  return f1.join() + f2.join();
});

Die zentrale Eigenschaft eines Structured-Concurrency-Scopes ist die Garantie, dass bei Abschluss des supervised-Scopes alle gestarteten Forks beendet sind – entweder erfolgreich, mit einer Ausnahme oder durch Unterbrechung. Aus Sicht der Außenwelt werden Threads und Nebenläufigkeit so zu einem Implementation Detail.

Die Tatsache, dass kein Thread den Scope überleben kann, ist insbesondere im Fehlerfall entscheidend. Tritt in einem der Forks ein Fehler auf, wird der gesamte Scope standardmäßig heruntergefahren, verbleibende Forks werden unterbrochen, und der Scope wartet, bis alle beendet sind.

Mit diesen beiden Werkzeugen – Channels und Structured Concurrency – können wir jetzt auch konkurrente Streaming-Operatoren implementieren. Beginnen wir mit dem Mergen zweier Streams. Genau wie zuvor erstellen wir eine lazy evaluierte Flow-Beschreibung. Beim Ausführen des kombinierten Flows wird ein Structured-Concurrency-Scope erzeugt, in dem zwei Forks starten, die jeweils die untergeordneten Flows ausführen. Jeder Flow gibt seine Daten an einen Channel weiter.

Über die select-Funktion können wir dann jeweils den Wert aus dem Channel empfangen, der gerade Daten bereitstellt (die Behandlung von abgeschlossenen Streams wurde hier vereinfacht dargestellt):

public class Flow<T> {
  public Flow<T> merge(Flow<T> other) {
    return new Flow<>(emit -> {
      supervised(scope -> {
        Channel<T> c1 = this.runToChannel(scope);
        Channel<T> c2 = other.runToChannel(scope);

        boolean continueLoop = true;
        while (continueLoop) {
          switch (selectOrClosed(c1.receiveClause(), c2.receiveClause())) {
            case ChannelDone _ -> continueLoop = false;
            case ChannelError error -> throw error.toException();
            case Object r -> emit.apply((T) r);
          }
        }

      return null;
      });
    });
  }
}

Beachten Sie, dass wir auch hier erneut auf die in Java integrierten Kontrollfluss-Konstrukte zurückgreifen: das while-Konstrukt zur Wiederholung, switch mit Pattern Matching zur Auswahl von Channel-Antworten und die eingebaute Fehlerbehandlung mittels throw. So entsteht ein leistungsstarkes, aber dennoch natürlich lesbares API – ganz im Sinne von Project Loom.

In ähnlicher Weise – auch wenn mit etwas mehr Code – lassen sich weitere Streaming-Operatoren implementieren, etwa mapPar, zip, buffer, flatMap, grouped und viele mehr. Diese Operatoren orientieren sich an den bekannten Funktionalitäten aus Reactive-Streams-Bibliotheken wie Akka Streams oder Reactor.

Darüber hinaus verfügen wir nun über die Werkzeuge, um mit der Außenwelt zu interagieren: Wir können beispielsweise Bytestreams aus Dateien erzeugen, diese zeilenweise parsen oder ein Ergebnis direkt als InputStream ausgeben lassen.

Zum Abschluss folgt ein etwas umfangreicheres Beispiel, das einige der oben beschriebenen Funktionalitäten kombiniert:

Flows.unfold(0, i -> Optional.of(Map.entry(i + 1, i + 1)))
  .throttle(1, Duration.ofSeconds(1))
  .mapPar(4, i -> {
    Thread.sleep(5000);
    var j = i * 3;
    return j + 1;
  })
  .filter(i -> i % 2 == 0)
  .zip(Flows.repeat("x"))
  .runForeach(System.out::println);

Backpressure

Das klingt alles vielversprechend – doch wie sieht es mit der Resilienz unserer Implementierung aus? Können wir, wie bei Reactive Streams, garantieren, dass der Speicherverbrauch begrenzt bleibt? Schließlich haben wir in den bisherigen Beispielen keine explizite Backpressure-Weitergabe gesehen.

Die Wahrheit ist: Backpressure ist vorhanden, allerdings implizit – es ist nicht nötig, eigene Codepfade für deren Management zu definieren.

Backpressure entsteht durch begrenzte Puffer (Channels, Queues) und Blockierung von Virtual Threads. Wenn eine Verarbeitungsstufe z. B. in einem Fork im Hintergrund läuft (also asynchron), die nachgelagerte Stufe jedoch nicht schnell genug Daten verarbeiten kann, füllt sich der Puffer des Channels irgendwann. Der .send-Aufruf blockiert dann den entsprechenden Thread – und pausiert so ganz natürlich die Produktion in der vorgelagerten Stufe. Backpressure ergibt sich hier also durch den Einsatz von thread-blockierenden Operationen – ein eleganter, natürlicher Mechanismus.

Fehlerbehandlung

Der letzte Aspekt, den wir im Zusammenhang mit Virtual Threads betrachten müssen, ist die Fehlerbehandlung.

Im Fall eines einfachen, synchronen Streams ist die Fehlerbehandlung relativ unkompliziert: Jede Ausnahme, die in einer Verarbeitungsstufe auftritt, wird über die .emit-Kette weitergereicht und schließlich in der jeweiligen Konsummethode (z. B. .runForeach oder .runDiscard) erneut ausgelöst.

Java’s eigene Fehlerbehandlungsmechanismen reichen in diesem Fall völlig aus. Komfortmethoden wie Flow.onError oder Flow.onComplete erstellen lediglich zusätzliche Stages, die mit try-catch bzw. try-finally arbeiten.

Bei asynchronen Streams – also solchen, die mehrere Forks (Threads) starten und Flows im Hintergrund ausführen – ist die Situation nur leicht komplexer. Und das nur dank strukturierter Nebenläufigkeit, wie wir sie zuvor besprochen haben: Wenn ein Fork eine Ausnahme wirft, wird der gesamte Scope heruntergefahren. Die Ausnahme (gegebenenfalls verpackt) wird jedoch erst dann erneut ausgelöst, wenn alle anderen Forks beendet bzw. unterbrochen wurden.

Daher macht es aus Sicht anderer Verarbeitungsstufen keinen Unterschied, ob eine bestimmte Stage asynchron ausgeführt wird oder nicht. Threading wird zu einem Implementierungsdetail – wenn auch zu einem wichtigen.

Performance

Was die Funktionalität betrifft, so lässt sich mit unserer Lösung ein Großteil der Features von Reactive-Streams-Bibliotheken nachbilden. Aber wie sieht es mit der Performance aus?

Ein vollständiges Benchmarking beider Streaming-Ansätze steht noch aus, daher verfügen wir bislang nur über Teildaten. Immerhin haben wir den Channel, den wir zur Kommunikation zwischen asynchron laufenden Stages verwenden, bereits benchmarkt, profiliert und optimiert. Das Ergebnis: Die Performance ist vergleichbar mit den Concurrent-Datenstrukturen von Java sowie mit Kotlin’s Channel-Implementierung – und liegt nur leicht hinter Go zurück.

Zweitens haben wir die Performance von HTTP-Servern, die entweder auf Loom oder auf Reactive-Bibliotheken basieren, miteinander verglichen (Benchmark hier). In unseren Tests konnten wir keine signifikanten Unterschiede feststellen.

Das sollte kaum überraschen: Die Grundidee – nämlich ein Pool von Betriebssystem-Threads, auf dem viele „virtuelle“ Threads (Coroutines, Futures etc.) ausgeführt werden – ist sowohl bei „reactiven“ als auch bei „Loom“-Ansätzen dieselbe.

Fazit

Kann man eine Virtual-Thread-basierte Bibliothek mit Funktionsumfang vergleichbar zu bestehenden Reactive-Streams-Implementierungen entwickeln? Ja. Das Projekt Jox ist der Beweis.

Werden Virtual Threads Reactive Streams vollständig ersetzen? Teilweise. Die Reactive-Streams-Spezifikation bleibt weiterhin ein nützlicher Standard zur Interoperabilität. Sie wird vermutlich auch in Zukunft eine wichtige Rolle bei der Umsetzung von performanter Low-Level-Integrationslogik spielen. So implementiert auch Jox das Publisher-Interface von Reactive Streams, um die Integration mit Drittanbieter-Bibliotheken zu ermöglichen, die diesen Standard unterstützen.

Wenn es jedoch um das Schreiben von Datenverarbeitungspipelines geht, wird die Popularität klassischer Reactive-Streams-Bibliotheken wahrscheinlich zurückgehen. Das Modell der Virtual Threads ist einfacher, sowohl beim Schreiben als auch beim Lesen von Code. Es ermöglicht die Nutzung der nativen Java-Kontrollstrukturen und des Exception-Handling-Modells – ganz ohne Function Coloring (Viralität von Futures). Außerdem ist keine zusätzliche Runtime auf Bibliotheksebene notwendig, da die Thread-Verwaltung direkt von der JVM übernommen wird.

Bibliotheken wie Akka Streams oder Reactor haben aktuell noch die Nase vorn, was Reife, API-Vielfalt und Integrationen betrifft. Doch das Virtual-Threads-Ökosystem wird diese Lücke schließen – und wenn meine Einschätzung richtig ist, es langfristig sogar ablösen.

Virtual Threads bieten eine robuste und performante Plattform, auf deren Basis sich Systeme entwickeln lassen, die von den Prinzipien reaktiver Architekturen profitieren – und sie gleichzeitig weiterentwickeln. Sie eröffnen einen deutlich entwicklerfreundlicheren Weg, resiliente Systeme zu bauen.

In gewisser Weise ist das Aufkommen der Virtual Threads die ultimative Bestätigung der Grundideen hinter Reactive Streams, deren Wert so groß war, dass sie schließlich direkt in die JVM aufgenommen wurden.

Total
0
Shares
Previous Post

IBM-Architekt Mark Stoodley enthüllt JVM-Optimierungstechniken

Next Post

30 Jahre Java – Wie sich die Sprache entwickelt hat

Related Posts