Die Nebenläufige Programmierung Java ist so alt wie Java selbst. Schon in der ersten Version waren die Basiskonstrukte enthalten, auf denen unsere heutigen nebenläufigen Prozesse beruhen. Mit Threads und Runable ist es möglich, die sequenzielle Abarbeitung von Operationen zu entkoppeln und einen höheren Durchsatz zu erzielen. Über die Jahre sind einige Veränderungen an der Concurrency API von Java entstanden, um verschiedene Schwachstellen zu beheben. Die ExecutorService API um Ressourcen zu sparen oder die Future API um nicht auf lang läufige Operationen warten zu müssen. Nun ist erneut Bewegung in die Concurreny Thematik gekommen, virtuelle Threads entkoppeln die JVM-Threads vom Betriebssystem, um weitere Ressourcen zu sparen und die JVM effizienter zu machen. Darauf aufbauend ergeben sich neue Möglichkeiten, elegante APIs zu entwickeln. In diesem Beitrag möchte ich zuerst einen Blick zurück wagen und betrachten, welche Abstraktionen vorhanden sind, was die Schwierigkeiten waren und warum virtuelle Threads hier eine Lösung sind. Die darauf aufbauenden Konstrukte Structured Concurrency und Scope Values einführen und das Ganze anhand einiger Beispiele betrachten. Diese Beispiele sind auf GitHub als GIST verfügbar und können ergänzend zu diesem Artikel betrachtet werden.
Threads im Allgemeinen
Threads sind ein Konstrukt der nebenläufigen Programmierung, um Operationen zusammen zu gruppieren und diese Gruppen parallel auszuführen. Die Operationen innerhalb eines Threads laufen in einer gewissen Reihenfolge ab. Ein Thread ist damit ein kleines Programm innerhalb eines Programms. Ein Thread wird innerhalb eines Threads oder Prozesses erzeugt. Diese Operation nennt sich fork und ist in der Abbildung unten skizziert.
Beim Forken entsteht aus einem Prozess oder Thread ein neuer Thread als sein Kind, wobei ein Thread mehrere Kinder, aber nur einen Eltern-Thread haben kann. Der Eltern-Thread erzeugt zwei neue Threads, die unabhängig voneinander existieren und zur Ausführung kommen. Nachdem sie Ihre Operationen abgearbeitet haben, vereinigt sich die Ausführung wieder im Eltern-Thread. Erst nachdem alle Kind-Threads fertig sind, kann der Eltern-Thread beendet werden. Auf den Abschluss eines Kind-Threads kann explizit mit der join-Operation gewartet werden oder der Join findet implizit statt, bevor der Eltern-Thread aufgeräumt wird.
Ein Thread besteht aber nicht nur aus seinem Programmcode, sondern auch aus Metadaten, die benutzt werden, damit das Betriebssystem Threads zur Ausführung bringen kann. Zu diesen Metadaten gehört unter anderem eine eigene ID, die ID des Eltern-Threads, ein bestimmter Bereich im Speicherbereich und ein Status. Dieser Status ist notwendig, um zu verstehen, warum die direkte Kopplung zwischen einem Betriebssystemthread und einem JVM-Thread problematisch sein kann, das Statusdiagramm ist hier gegeben.
Ein Thread beginnt zuerst im Status New. In diesem Status ist er bisher nicht lauffähig und wird nicht ausgeführt. Dieser Status ist zur Initialisierung gedacht. Es werden Bereiche auf dem Speicher allokiert, die IDs gesetzt und Prioritäten konfiguriert. Erst wenn die Konfiguration abgeschlossen, geht der Thread in den Status Runable über. In diesem Status kann der Thread vom Betriebssystem zur Ausführung ausgewählt werden. Wird der Thread ausgewählt, so führt dieser eine gewisse Anzahl an Operationen aus seinem Programm aus, bis er unterbrochen wird. Im Normalfall verbleibt der Thread dann im Status Runable. Der Thread kann, aber auch nachdem er seine Arbeit abgeschlossen hat, in den Status Terminated übergehen. In diesem Status ist der Thread fertig und wird nicht mehr zur Ausführung ausgewählt, belegt aber noch weiterhin Ressourcen. Zwei weitere Zustände sind Blocked und Waiting, beide signalisieren einen Thread, der aktuell nicht lauffähig ist. Blocked bezeichnet einen Thread, von einem anderen Thread blockiert ist. Zum Beispiel durch einen syncronized-Block oder den Zugriff auf IO. Der Status Waiting signalisiert, dass der Thread gerade auf einen anderen Thread wartet. Zum Beispiel wird auf eine Antwort gewartet. Diese zwei Zustände, Blocked und Waiting sind Zustände mit Auswirkung auf die Ausführbarkeit eines Threas. Alle fünf Zustände sind in der JVM gespiegelt. Das heißt, wird ein JVM-Thread als Blocked markiert, wird der zugehörige Thread auch im Betriebssystem als Blocked markiert, da JVM-Threads eine Spiegelung von Betriebsystemthreads sind. In allen Betriebssystemen gibt es eine Obergrenze für die Anzahl an Threads. Ist diese erreicht, kann kein neuer Thread erzeugt werden und das Betriebssystem ist handlungsunfähig. Deswegen wird bei der nebenläufigen Programmierung versucht, sparsam mit dem Thread umzugehen und nicht jede Operation in einem neuen Thread auszuführen.
Java Threads API
Die Thread-API in Java existiert seit der Version 1.0 und hat lediglich kleine Veränderungen erfahren. Ein Thread in der JVM spiegelt immer einen Betriebsystemthread wider. Um einen Thread zu definieren, wird die Klasse java.lang.Thread erweitert und die Methode Run mit den benötigten Operationen implementiert. Im Beispiel unten wird eine Klasse ExampleThread definiert. Diese kann wie jede Klasse mit dem new Operator zu einer Instanz überführt werden.
class ExampleThread extends Thread {
public void run() { /* Do it */}
}
var t1 = new ExampleThread();
t1.start(); t1.join();
Diese Instanz des Threads ist die kleinste schedulbare Einheit der JVM. Doch wie bereits im letzten Absatz skizziert, verwaltet nicht die JVM, sondern das Betriebsystem die Auswahl des laufenden Threads. Nach der Erzeugung mit new befinden sich der JVM und Plattform-Thread im Status New und wurde initialisiert. In unserem Java-Programm können weitere Initialisierungen vorgenommen werden, bevor mit dem Methodenaufruf Thread#start der Thread vom Status New in den Status Runable überführt wird. Wird der Thread vom Scheduler der Plattform zur Ausführung ausgewählt, startet die Abarbeitung der Logik. Um im erzeugenden Thread auf die Terminierung eines Kind-Threads zu warten, gibt es die Methode Thread#join und eine Version mit einem expliziten Timeout.
Eine weitere Möglichkeit, Code parallel auszuführen, ist die Implementierung des Interface Runnable wie im Beispiel gezeigt.
class ExampleRunable implements Runable {
public void run() { /* Do it again */}
}
var runable = new ExampleRunable();
var t2 = new Thread(runable);
Ein Runnable ist ein Bestandteil eines Threads und die kleinste Einheit von nebenläufiger Arbeit. Ein Runnable gruppiert zusammengehörige Operationen. Der große Unterschied zwischen Runnable und Thread ist, dass Runnable nicht an einen Plattform-Thread gebunden ist. Sondern definiert, welche Operationen ausgeführt werden und so die Aufgaben von der technischen Ausführung entkoppelt.
Die Entkopplung von technischem Thread-Management, Ausführung und Ablauf der Businesslogik haben auch die beiden Concurrencymodelle in Java als Ziel. Der ExecutorService ist ein Interface und seit Java 5 verfügbar. Sein Ziel ist es: die Betriebssystemthreads so effizient wie möglich zu nutzen. Ihm können Aufgaben in Form eines Runables bereitgestellt werden, die später ausgeführt werden. Ein ExecutorService kann einen oder mehrere Threads verwenden, um die Arbeit durchzuführen. Das Future-Konstrukt ist seit Java 5 und hat den Zweck: lang läufige Operationen nebenläufig auszuführen. Dadurch muss der Businesscode nicht blockiert werden, solange auf eine Antwort einer Datenbank gewartet wird. Beide sind kombinierbar und führen zu einer effizienten Entkopplung von landläufigem Businesscode sowie Ressourcen schonender paralleler Ausführung. Zwei Probleme teilen beide Lösungen:
- Exceptions, die in den nebenläufigen Operationen geworfen werden, werden in anderen Exceptions gekapselt und machen somit die Fehlerbehandlung nicht gradlinig möglich
- Eine blockierte Ausführung blockiert auch den verwendeten Betriebssystemthread
Um die Betriebssystemthreads noch effizienter nutzen zu können, muss die direkte Verbindung zwischen Betriebssystem und JVM-Thread aufgelöst werden.
JEP 444 Virtual Threads
Mit JEP 444 Virtual Threads soll die Kopplung zwischen JVM- und Betriebssystem-Threads aufgelöst werden. Als Folge sind Virtual Threads nicht mehr an einen Betriebssystemthread gebunden. Da weniger Informationen vorgehalten werden müssen, sind Virtual Threads deutlich leichtgewichtiger als Plattform-Threads. Natürlich muss ein virtueller Thread auch durch einen Plattform-Thread und damit einen Betriebssystemthread zur Ausführung gebracht werden. Die effiziente Verwendung der verfügbaren Threads wird auf Ebene des JVM realisiert.
Die durch JEP 444 in Java 21 finalisierte API ist eher klein, aber umso gewaltiger waren die notwendigen Änderungen innerhalb der JVM. Im Kern der API-Änderungen für Softwareentwickler:innen stehet die neue Factory Methode im Listing unten.
Thread.ofVirtual()
.name("virtualThread-1")
.inheritInheritableThreadLocals(true)
.uncaughtExceptionHandler((thread, throwable) ->
System.err.println(thread.getName() + ": " + throwable))
.start(() -> IO.println("Hello from virtual thread"));
Mit Thread#ofVirtual wird eine Art Builder für einen Virtual Thread erzeugt. Im Folgenden wird der Name gesetzt sowie konfiguriert, dass die ThreadLocals geerbt werden sollen. Zusätzlich wird noch ein expliziter Fallback ExceptionHandler erstellt. Mit der start-Methode wird das auszuführende Runnable konfiguriert und der Thread gestartet. Hierbei wird kein eigener Betriebssystemthread erstellt, bei der Erzeugung eines Plattform-Threads mit der Thread#ofPlatform Methode wird auch ein Betriebssystemthread erzeugt. Nach der Erstellung verhalten sich aus Anwendungssicht die virtuellen und Plattformthreads gleich.
Zusätzlich zu den Änderungen in der Thread-Klasse wurde auch noch ein neuer ExecutorService bereitgestellt. Dieser verwendet anstelle eines Threadpools für jeden Task einen neuen virtuellen Thread.
Executors
.newVirtualThreadPerTaskExecutor()
.submit(() -> IO.println("Hello, World!"));
Die durch JEP 444 eingeführten Änderungen an der API sind nicht groß, aber die dadurch ermöglichten Features sind es. Die neuen Sprachkonstrukte Structured Concurrency von Scope Values bauen auf den seit Java 21 finalen virtuellen Threads auf.
Structured Concurrency
Die Zielsetzung des aktuell in Preview befindlichen Features Structured Concurrency ist es, verwandte Aufgaben, die in verschiedenen Thread ausgeführt werden, als eine Einheit von Arbeit zu verstehen. Damit soll das Error Handling, das Abbruchverhalten, die Lesbarkeit und Wartbarkeit verbessert werden. Ein Beispiel einer Gruppe von zusammengehörigen Aufgaben findet sich in der Abbildung.
In diesem Beispiel sind zwei exemplarische Aufrufe an einen Service dargestellt. Dieser Service lädt für jeden Aufruf eine Adresse und Namen von Datenbanken oder aus anderen Services. Wenn beide Informationen vorliegen, wird eine Antwort generiert. Damit die Ausführung nicht in die Länge gezogen wird, werden die Adresse und der Name parallel geladen. Nachdem sie vorhanden sind, wird die Antwort erzeugt. Bis zur Einführung von Structured Concurrency wäre dieses Vorgehen mit einigen CompletableFutures realisiert worden. Das Problem bei Futures ist die nicht optimale Fehlerbehandlung und die schlechte Lesbarkeit durch verstreute Interaktionen mit den Ergebnissen. Mit der StructuredTaskScope API (siehe Listing) werden alle Unzulänglichkeiten adressiert.
public class StructuredTaskScope<T> implements AutoCloseable {
public <U extends T> Subtask<U> fork(Callable<? extends U> task);
public StructuredTaskScope<T> join() throws InterruptedException;
public StructuredTaskScope<T> joinUntil(Instant deadline)
throws InterruptedException, TimeoutException;
public void shutdown();
public void close();
}
Der StructuredTaskScope implementiert AutoCloseable und signalisiert damit, dass die Verwendung mit einem Try-with-Resource-Block gedacht ist. Durch diesen Block wird klar, welche Tasks zusammengehören und die Fehlerbehandlung konsolidiert. Innerhalb eines StructuredTaskScope kann mit fork eine neue Aufgabe initialisiert werden. Diese Aufgabe wird, anders als ein Thread, mit einem Callable configuriert und liefert Subtask zurück, der das Supplier Interface implementiert. Neben dem Erzeugen von Aufgaben gibt es auch zwei join Methoden, die dafür gedacht sind, um die aktuell gestarteten Verarbeitungen zu warten. Jeder join Aufruf ist blockierend und wartet, bis alle in diesem Scope gestarteten Tasks beendet wurden. Mit der joinUntil Methode kann auch eine Frist gesetzt werden, zudem ein Timeout geworfen wird. Die beiden Methoden shutdown und close müssen durch Entwickler:innen nicht selbstständig aufgerufen werden, der Try-With-Resource Block koordiniert das Herunterfahren. Im Listing unten ist ein Auszug aus einer Concurrent RuleEngine gegeben, die mit einem StructuredConcurrency arbeitet.
try (var scope = new StructuredTaskScope.ShutdownOnFailure()){
Supplier<String> adr = scope.fork(() -> readAddress());
scope.join().throwIfFailed();
var addres = adr.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
In der Resourcendefinition des Try-With-Resource Block wird der Scope als ein ShutdownOnFailure-Scope initialisiert. ShutdownOnFailure heißt an dieser Stelle, dass der erste fehlgeschlagene Subtask zu einem Herunterfahren und damit Beenden aller Tasks führt. In der Java-API sind weitere häufige Anwendungsfälle implementiert und eigene Implementierungen sind möglich. In der zweiten Zeile wird die Aufgabe, eine Adresse zu laden, in den Scope zur Ausführung übergeben und der Supplier für das Ergebnis gespeichert. Um sicher auf den geladenen Wert zugreifen zu können, wird mit der join Methode in Zeile 3 gewartet. Mit der Konfiguration throwIfFailed wird der Scope angehalten, einen fehlgeschlagenen Task als Exception zu melden. Sind alle Tasks des Scope abgeschlossen, kann mit der get Methode des Supplier Interfaces auf den produzierten Wert zugegriffen werden. Der Try-With-Resource-Block ermöglicht auch eine explizite Exception-Behandlung in Catch-Blöcken. Dies sind die Exceptions, die durch den Scope produziert werden. So wird eine zentrale Fehlerbehandlung ermöglicht, der Ablauf der Aufgaben klar strukturiert und insgesamt die Wartbarkeit verbessert. Dem Einsatz von Virtual Threads in unseren Anwendungen, um nebenläufige Arbeit zu koordinieren, steht nichts mehr im Wege. Um gemeinsam genutzte Daten effizient zu nutzen, wurden Scopend Values eingeführt.
Scoped Values
In vielen nebenläufigen Anwendungen müssen gewisse Instanzen oder Daten einheitlich verwendet werden. Ein Beispiel hierfür ist ein SSLContext. Die Erzeugung einer SSLContext Instanz ist aufwendig und ressourcenintensiv. In Millionen virtuellen Threads jeweils eine neue Instanz zu verwenden, ist ineffizient und pulverisiert die gewonnene Leichtigkeit von Virtual Threads. Um immutable Daten zwischen einem Thread und seinen Kindthreads zu teilen, wurden Scoped Values in Java konzipiert. Die Unterschiede zu ThreadLocal-Instanzen werden in der Abbildung verdeutlicht.
Eine ThreadLocal-Instanz stellt für jeden Thread eine neue Instanz des gespeicherten Werte bereit, zu sehen an den unterschiedlichen Adressen in T1 und T2 für sslCtx. Durch dieses Konstrukt können Verhalten und Konfiguration geteilt werden, jedoch wird durch Duplizierung der Speicherverbrauch in die Höhe getrieben. Bei Scope Values ist der Ansatz dieselbe Instanz, in den Kinde-Threads zur Verfügung stellen. Die Ziele wurden wie folgt definiert:
- Ease of Use – der Fluss der Daten soll einfach nachverfolgbar sein
- Comprehensibility – Die Lebensdauer der gemeinsam genutzten Daten sollte aus der syntaktischen Struktur des Codes ersichtlich sein
- Robustness – Von einem Anrufer freigegebene Daten sollten nur von rechtmäßigen Anrufern abrufbar sein
- Performance – Die Daten sollten effizient über eine große Anzahl von Threads gemeinsam genutzt werden können
Die für die Verwaltung und Konfiguration eines Scoped Values benötigte API ist klein gehalten, aber umso mächtiger.
final static ScopedValue<SSLContext> VAL = ScopedValue.newInstance();
ScopedValue.where(VAL, value).run(() -> VAL.get());
Mit der Factory Methode ScopedValue#newInstance wird eine neue Instanz eines ScopedValues erzeugt. Diese Instanz ist nicht der Wert selbst, sondern ein Data Carrier, der auf Anfrage den tatsächlichen Wert bereitstellt. Über die Konfigurationsmethode ScopedValue#where wird konfiguriert, welchen Wert der ScopedValue in diesem Scope haben soll. Über die in der Methode ScopedValue.Carrier#run wird ein Runnable übergeben welches in diesem Scope ausgeführt wird und über ScopedValue#get Zugriff auf die konfigurierten Werte bekommt. Im folgenden Codebeispiel werden zwei Scoped Values verwendet, um einen SSLContext und Eingabedaten als Map zu verwalten.
ScopedValue<Map<String, String>> INPUTDATA = ScopedValue.newInstance();
ScopedValue<SSLContext> SSL_CTX = ScopedValue.newInstance();
ScopedValue.Carrier executionScope = ScopedValue
.where(INPUTDATA, Map.copyOf(data))
.where(SSL_CTX, SSLContext.getDefault());
executionScope.run(() -> {
String inputName = INPUTDATA
.orElseThrow(IllegalArgumentException::new)
.getOrDefault("name", "JON");
if(SSL_CTX.isBound()) {
String lastName = getLastName(SSL_CTX.get());
inputName = "%s %s".formatted(inputName, lastName);
}
IO.println(inputName);
});
In den ersten zwei Zeilen werden die zwei ScopedValue Instanzen erzeugt. In Zeile 3 bis 5 werden für den Scope die geltenden Werte gesetzt. Es wird eine immutable Kopie der Inputdaten sowie eine Default SSLContext konfiguriert. An dieser Stelle wird nicht direkt ein Run aufgerufen, sondern der ScopedValue.Carrier zur späteren Verwendung gespeichert. Dieser repräsentiert einen Scope in welchem eine Kombination von ScopedValues definierte Werte hat. Dieses ScopedValue.Carrier werden immer dann gebrauchen, wenn zum Beispiel mehrere Tasks in einem Scope ausgeführt werden sollen. Im letzten Block wird ein solcher Task zur Ausführung übergeben. Innerhalb des Tasks wird zuerst geprüft, ob ein Wert für INPUTDATA konfiguriert wurde. Ist dies nicht der Fall, wird eine IllegalArgumentException erzeugt und geworfen. Ein Check die zu gegriffenen Werte auch tatsächlich gesetzt wurden ist notwendig, da es innerhalb eines ScopedValue.Carriers keine Garantie dafür gibt. Eine andere Möglichkeit zu prüfen, ob der Wert gesetzt ist, sehen wir beim SSLContext. Hier wird die Methode ScopedValue#isBound aufgerufen, bevor der Zugriff durchgeführt wird. Die grundsätzliche Semantik erinnert an die eines Optional. Wichtig zu bemerken ist, dass der run Aufruf das Runable in einem eigenen virtuellen Thread ausführt.
In der Tabelle unten sind die Unterschiede zwischen ScopeValue und dem ähnlichen ThreadLocal gegeben.
ScopedValue | ThreadLocal |
---|---|
Immutable Carrier | Supplier |
Same instance | Instance per Thread |
Rebindable | Mutable Value |
Bei einem ScopeValue handelt es sich um einen unveränderlichen Datentransporteur. Im Gegensatz zum ThreadLocal, der ein Supplier eines bestimmten Wertes ist. Ein ScopeValue ist es dabei immer dieselbe Instanz, wobei der bereitgestellte Wert für Kinde-Threads neu zuweisbar ist. Durch ihre Ähnlichkeit in der Anwendung sollten ThreadLocals wo immer möglich, durch ScopedValues ersetzt werden, um die effiziente Nutzung von Virtual Threads zu ermöglichen.
Abschließende Gedanken
In diesem Beitrag haben wir Virtual Threads als leichtgewichtige Alternative zu Plattformenthreads betrachtet. Wir haben Scoped Values und Structured Concurrency gesehen. Diese sind für den Einsatz mit Virtual Threads optimiert, um Daten zu teilen und asynchrone Operationen durchzuführen. Die gezeigten Beispiele stammen aus einem Open-Source-Projekt, in dem eine Concurrent Rule Engine implementiert wurde.
Zu diesen neuen Themen gibt es ein paar Hinweise, die ich euch sehr gerne mit auf den Weg geben möchte. Mit virtuellen Threads sollte nicht sparsam umgegangen werden, sie sind als Wegwurfinstanzen konzipiert. Durch die Wiederverwendung werden in der JVM implementierte Optimierungen ausgehebelt und der gewünschte Effekt stellt sich nicht ein. Deswegen empfehle ich Euch, wo immer möglich, von Plattformthreads auf Virtual Threads umzusteigen. Im einfachsten Fall ist es nur das Austauschen einer ExecutorService-Instanz.
Achtet bei eurer Migration auch direkt darauf, ob ihr ThreadLocal-Instanzen identifizieren könnten, die niemals verändert werden. Dies sind Kandidaten für eine direkte Migration zu ScopeValues. Wenn ihr IO-Operationen in nebenläufigem Code verwendet, ist dies der Ort für Structured Concurrency. Ihr benötigt Unterstützung bei der Erstellung eines Migrationsplans oder habt weitere Fragen zum Verständnis? Dann stehe ich gerne auf Konferenzen und über gängige Social-Media-Kanäle zur Verfügung.