Task-Parallelität mit CompletableFutures

Obwohl viele Java-Entwickler die CompletableFuture-Klasse kennen, wird sie in Projekten immer noch selten eingesetzt. Das mag daran liegen, dass die Klasse auf den ersten Blick komplex wirkt oder dass die Einsatzmöglichkeiten nicht immer offensichtlich sind. Dabei lässt sich mit Hilfe von CompletableFutures recht einfach Task-Parallelität realisieren. Im Folgenden werden einige Best-Practices vorgestellt und gezeigt, wie durch die Verwendung von Task-Parallelität die Reaktivität einer Anwendung deutlich gesteigert wird.

Java bietet schon seit geraumer Zeit sehr viele komfortable Abstraktionen für den Umgang mit Nebenläufigkeit bzw. Parallelität. Nebenläufigkeit existiert immer dann, wenn zwei Aktionen (Tasks) keine direkten Abhängigkeiten besitzen und somit die Ausführungsreihenfolge keine Rolle spielt. Ein Task repräsentiert hier typischerweise eine Zusammenfassung von zusammengehörigen Anweisungen oder Methodenaufrufen. Die Identifikation von Nebenläufigkeiten ist die Basis für den Einsatz von Parallelität. Auf Multicore- oder Multiprozessor-Maschinen können nebenläufige Tasks dann auch wirklich parallel (zeitgleich) ausgeführt werden. Ein Task sollte allerdings immer genügend Arbeit haben, damit der mit einer Parallelisierung verbundene Overhead nicht ins Gewicht fällt. Ein weiterer wichtiger Punkt ist die Koordinierung von Wettbewerbssituationen (Race-Conditions), falls Tasks doch nicht ganz unabhängig sind und z.B. auf gemeinsam benutzte Daten gleichzeitig schreibend und lesend zugreifen. Bezüglich der Strukturierung für Nebenläufigkeit unterscheidet man zwischen Daten- und Task-Parallelität (Abb. 1). Die Datenparallelität lässt sich noch weiter in eine synchrone (blockierende) und in eine asynchrone Verarbeitung unterteilen. Bei einer synchronen Verarbeitung wird typerischweise die Datensammlung partitioniert und die Teilbereiche einzelnen Tasks zur weiteren Verarbeitung zugewiesen. Jeder Task führt dabei dieselben Aktionen auf den Daten durch. Datenparallelität wird heute bei Java gewöhnlich mit parallelen Streams realisiert.

 

 

Parallelisierungsmöglichkeiten. (Abb. 1)

Bei Task-Parallelität besitzt jeder Task eine eigene Aufgabe. Oft ist es so, dass Tasks kausal miteinander verkettet sind und somit ganze Prozessabläufe realisieren. Als Analogon kann man sich beispielsweise einen Projektplan für einen Hausbau vorstellen. Hier gibt es als Aktivitäten die Erstellung des Rohbaus, die Verlegung der elektrischen Leitungen, das Einsetzen der Fenster oder die Gestaltung der Außenanlage, etc. Einige dieser Aktivitäten besitzen Abhängigkeiten, wie z.B. die Verlegung der elektrischen Leitungen, die erst erfolgen kann, wenn der Rohbau fertig ist. Andere Aktivitäten können dagegen parallel ausgeführt werden, wie z.B. die Verlegung der elektrischen Leitungen und die Gestaltung der Außenanlage.

 

Im Gegensatz zur Datenparallelität ist eine Task-Parallelität bei der Softwareentwicklung nicht immer direkt offensichtlich identifizierbar. Der Hauptgrund dafür ist, dass bei der herkömmlichen Programmierung der sequentielle Programmcode oft dem zeitlichen Programmablauf entspricht. Wird davon abgewichen, wie etwa beim Observer-Pattern, bei dem der lineare Programmcode nicht mehr die zeitliche Ausführung wiederspiegelt (Callback-Methoden werden zu einem späteren Zeitpunkt aufgerufen), wird der Programmablauf schwerer nachvollziehbar. Um diesen Verständnishürden entgegen zu wirken, werden neue Abstraktionen in die Programmiersprachen eingeführt. So stellt die reaktive Programmierung den Datenfluss als Design-Prinzip in den Fokus (asynchrone Datenparallelität). Die Strukturierung des Programmcodes richtet sich hier nach den Datenflüssen und nicht mehr nach der zeitlichen Abarbeitungsfolge (siehe hierzu die Artikel in JAVAPRO 1/2018[1] und 2/2018[2]).

 

 

Strukturierung nebenläufiger Tasks aufgrund von vorhandenen Abhängigkeiten, die durch rote Pfeile dargestellt sind. (Abb. 2)

 

Für die Realisierung von Task-Parallelität stehen Tasks und deren Nebenläufigkeitseigenschaft im Vordergrund. Oft ist eine lineare Task-Anordnung Ausgangspunkt für die Analyse. Die Tasks werden auf Abhängigkeiten untersucht und anschließend umstrukturiert. Die parallelen Abläufe werden oft mit UML-Aktivitätsdiagrammen dargestellt (Abb. 2). Mit Hilfe der CompletableFuture-Klasse lassen sich die parallelen Ablaufstrukturen dann einfach im Programmcode abbilden. Für die meisten Fälle reichen wenige Basis-Idiome (Code-Pattern-Vorlagen) aus, von denen im nächsten Abschnitt einige vorgestellt werden.

 

Die CompletableFuture-Klasse

Die CompletableFuture-Klasse, die zwei Interfaces implementiert, erscheint auf den ersten Blick komplex, da sie eine Vielzahl von Methoden besitzt. Um die Übersicht zu behalten, lassen sich die Methoden in verschiedene Kategorien einteilen (Abb. 3). Es existieren Methoden für die Erzeugung, für die Verkettung und für Statusabfragen. Die große Anzahl der Methoden rührt daher, dass die Verkettungsmethoden aus dem Interface CompletionStage immer in drei Versionen (Überladungen) vorhanden sind. Diese Methoden dienen dazu komplexe Task-Abläufe zu realisieren, wobei hierbei wahlweise die Tasks synchron oder asynchron ausgeführt werden können. Bei einer synchronen Ausführung wird der Task vom Vorgänger-Thread oder von dem die Verknüpfung erzeugenden Thread ausgeführt. Bei einer asynchronen Ausführung wird der Task garantiert immer von einem separaten Thread verarbeitet. Bei der asynchronen Variante kann deshalb noch explizit ein Executor (Thread-Pool) angegeben werden, falls nicht der Standard-Pool benutzt werden soll.

API-Kategorien der CompletabeFuture-Klasse. (Abb. 3)

Am einfachsten lassen sich CompletableFuture-Instanzen über eine der angebotenen Fabrikmethoden erzeugen. Asynchrone Tasks mit Rückgabe werden mit supplyAsync und Tasks ohne Rückgabe mit runAsync gestartet. (Listing 1) zeigt ein Code-Pattern für das Starten eines asynchronen Tasks ohne Rückgabe. In (Abb. 4) ist der Ablauf schematisch abgebildet. Da von Task 2 keine weiteren Tasks abhängen (keine eingehenden roten Pfeile), kann dieser asynchron ausgeführt werden. Bei einer asynchronen Verarbeitung sollten immer zwei Punkte berücksichtigt werden. Das sind ein adäquates Fehlerhandling und das Arbeiten mit Timeouts. Da die Ausführung des Tasks in einem separaten Thread erfolgt, werden auftretende Exceptions nicht automatisch an den Erzeuger gemeldet. Weiter sollte ein asynchroner Aufruf nicht beliebig lange andauern, damit Ressourcen nicht unnötig blockiert werden. In dem Codebeispiel wird deshalb ein Timeout gesetzt und eine Methode für die Behandlung auftretender Fehler angegeben.

 

Listing 1

Fire-and-Forget-Pattern. Auslagern eines Tasks aus einer sequentiellen Anordnung. (Abb. 4)

Soll ein Task verzögert ausgeführt werden, wird dies durch die Angabe eines entsprechenden delayedExecutor erreicht. (Listing 2) zeigt dazu ein Beispiel, wobei hier nach einem abgelaufenen Timeout ein Default-Wert zurückgeliefert wird.

Listing 2

Bei einem ausgelösten Timeout wird der gerade ausgeführte Task in der Regel nicht gestoppt. Dies sollte man bei langlaufenden Tasks beachten und ggf. entsprechende Vorkehrungen treffen. Besteht ein Task aus mehreren Teilaufgaben, können entweder die Teilaufgaben als separate Tasks verketten  oder mit einer Kontrollvariable getrennt werden, wie dies in (Listing 3) gezeigt ist. Wird hier während der Ausführung von doPart1 ein Timeout ausgelöst, wird diese Methode in der Regel noch zu Ende laufen. Alle folgenden Aktionen, insbesondere die Methoden doPart2 und doPart3 werden aber nicht mehr ausgeführt. Für die Abbruchsteuerung wird hier ein AtomicBoolean verwendet. Dadurch wird sichergestellt, dass eine aktuelle Wertänderung von isCancelled auch innerhalb des ersten Tasks, d.h. zwischen den Aufrufen der doPart-Methoden sichtbar ist.

Listing 3

Die eigentliche Stärke von CompletableFutures zeigt sich durch die verschiedenen Möglichkeiten Tasks zu verknüpfen. So können z.B. mehrere Tasks sehr einfach sequentiell, je nachdem ob Daten an den Nachfolger weiter gegeben werden oder nicht, mit thenApplyAsync oder thenRunAsync verkettet werden. Oder es kann ein Task aufgespaltet und die Ergebnisse wieder durch ein logisches UND oder ODER zusammengeführt werden. (Listing 4) zeigt ein Code-Pattern für das Aufspalten und Zusammenführen von parallelen Tasks. In (Abb. 5) ist der zugehörige Ablauf schematisch dargestellt.

Listing 4

Die Zusammenführung der beiden Tasks futurePreis und futureRabatt wird hier durch thenCombine (UND-Verkettung) und nicht durch thenCombineAsync ausgeführt. Es lohnt sich in dem Beispiel nicht, für die Berechnung des aktuellen Preises einen separaten Thread aus dem Thread-Pool anzufordern. Der dadurch entstehende Overhead wäre nicht gerechtfertigt.

 

Split-and-Join-Pattern. Abspaltung von Tasks aus einem sequentiellen Ablauf und Abfragen des Ergebnisses. (Abb. 5)

Auch parallele IO-Aufrufe lassen sich mit Hilfe von CompletableFutures realisieren. IO-Aufrufe sind in der Regel blockierend und sollten somit bei parallelen Aufrufen mit Vorsicht eingesetzt werden. Der Code in (Listing 5) führt leicht dazu, dass alle Threads im Common-Pool längere Zeit blockiert werden und sollte deshalb so nicht verwendet werden!

Listing 5

Angenommen die URL-Liste in (Listing 5) enthält 80 Einträge. Geht man davon aus, dass eine URL-Anfrage eine Sekunde benötigt und hierbei die meiste Zeit mit Warten verbracht wird, so dauert die gesamte Verarbeitung bei einem commonPool mit 8 Threads ca. 10 Sekunden. Während dieser Zeit stehen die Threads für keine anderen Aufgaben zur Verfügung. (Listing 6) zeigt eine bessere Variante. Hier wird explizit mit einem separaten Thread-Pool gearbeitet. Um die Maschine vor einer Überlast zu schützen, wird die Maximalzahl der Pool-Threads begrenzt. Die URL-Anfragen erfolgen jetzt asynchron mit Threads aus dem neu erzeugten Pool. Wichtig ist hierbei, dass die Erzeugung der CompletableFuture-Instanzen und das Abfragen der Ergebnisse separat erfolgen. Würde das Auslesen des Ergebnisses im ersten Stream realisiert werden, so würde die Verarbeitung 80 Sekunden dauern, was einer sequentiellen Verarbeitung entsprechen würde. Bei der gezeigten Variante dauert das Lesen der URLs mit 80 Threads dagegen insgesamt nur eine Sekunde, da wartende Threads keine Last produzieren und vom Betriebssystem gut verwaltet werden können.

Listing 6

Mit dem Java-Projekt Loom[3] werden sich, wenn es zukünftig in das JDK integriert ist, neue Möglichkeiten für paralleles IO ergeben. Die von Loom eingeführten leichtgewichtigen User-Mode-Threads, sogenannte Fibers, erlauben eine bessere Ressourcenausnutzung. Der obige Code wird sich dadurch stark vereinfachen.

 

Einsatz von Task-Parallelität

Im Folgenden werden nun die Einsatzmöglichkeiten von CompletableFutures an einem konkreten Beispiel demonstriert. Hierzu wird eine JavaFX-Anwendung betrachtet, mit der die Bildkompression auf Basis einer zweidimensionalen Fourier-Transformation veranschaulicht wird (Abb. 6). Interessierte Leser finden den kompletten Code der Anwendung auf Github[4].

 

Beispielanwendung. (Abb. 6)

Über das Menü wird ein Verzeichnis mit JPG-Bildern ausgewählt. Die Bilddateien werden daraufhin eingelesen und als Vorschau in der unteren Leiste angezeigt. Weiter wird auf der UI auch die Auslastung des Prozessors über einen entsprechenden Farbbalken sichtbar gemacht. Wird ein Vorschaubild ausgewählt, wird die Graudarstellung des Bildes berechnet und oben in der linken Anzeigespalte dargestellt. Zur Bestimmung der Fourier-Koeffizienten des Bildes (zweidimensionale Matrix) werden die Grauwerte über den jeweiligen Pixeln aufgetragen. Das dadurch entstehende Gebirge (Abb. 7) wird als zweidimensionales Signal interpretiert und ist somit einer Fourier-Transformation zugänglich.

 

Farbwerte eines Bilds können als zweidimensionales Signal interpretiert werden. (Abb. 7)

Die Absolutwerte der resultierenden Fourier-Koeffizienten sind in der linken unteren Anzeige graucodiert dargestellt (Abb. 6). Der größte Wert wird mit weiß und Null-Werte mit schwarz codiert. Bei gewöhnlichen Bildern ist ein Großteil der Fourier-Koeffizienten sehr klein. Wie auf der Abbildung zu erkennen ist, sind die meisten Koeffizienten dunkelgrau und enthalten somit recht wenig Information über das Bild. Dies kann man sich nun für eine Kompression zunutze machen. Hierzu werden z.B. nur die größten 10% der Koeffizienten beibehalten, alle anderen werden auf Null gesetzt. Mit einer inversen Fourier-Transformation lässt sich dann das Bild hieraus wieder rekonstruieren. Das entstehende Ergebnis kann kaum vom Original unterschieden werden. Selbst bei einer Reduktion um 97% wird das Bild recht gut wiederhergestellt (Abb. 6). Die für die Rekonstruktion beibehaltenen Fourier-Koeffizienten und das daraus abgeleitete Bild sind in der rechten Spalte angezeigt.

Die Anwendung besitzt viele Stellen, an denen CompletableFutures für die Reaktionsfähigkeit und eine verbesserte Ablaufgeschwindigkeit gewinnbringend eingesetzt werden können. Beim Start der JavaFX-Anwendungen wird die Initialize-Methode des UI-Controllers aufgerufen. Hier werden neben den UI-Attributen (Attribute mit der @FXML Annotierung) auch der Hardware-Zugriff zum Auslesen der Prozessorlast initialisiert. Die Hardware-Zugriffe bestehen zum Teil aus lange laufenden und blockierenden Methoden. Diese führen bei einer synchronen Ausführung somit zu einer erheblichen Verzögerung. Durch die Auslagerung der Hardware-Initialisierung in einen asynchronen Task wird der Anwendungsstart deutlich beschleunigt (Listing 7).

 

Listing 7

 

Bei der Erzeugung der Vorschaubilder werden die Bilddateien gelesen und ImageView-Instanzen erstellt. Auch dieser Vorgang lässt sich durch Parallelisierung beschleunigen. Dazu werden die Dateizugriffe mit anschließender ImageView-Erzeugung in asynchrone Tasks ausgelagert. Zur Realisierung wird ein zu (Listing 6) analoges Code-Pattern benutzt.

Durch die Auswahl eines Bildes aus der Vorschau wird eine entsprechende Callback-Methode des UI-Controllers aufgerufen. In der Methode wird von dem gewählten Bild eine Graudarstellung berechnet, ein Image-Objekt erzeugt und anschließend angezeigt. Parallel zur Image-Erzeugung werden die Fourier-Koeffizienten (FFT-Matrix) der Graudarstellung berechnet und ein weiteres Image-Objekt für deren graphische Darstellung erzeugt und angezeigt. Sobald die Fourier-Koeffizienten vorliegen, wird auch schon eine sortierte Liste mit den Fourier-Koeffizienten angelegt, die später für die Bildkompression benötigt wird. Die Tasks für die Aktualisierung der UI müssen vom JavaFX-Thread durchgeführt werden. Bei diesen Tasks wird deshalb bei der Ausführung der JavaFX-Thread als Executor (Platform::runLater) angegeben. Die rechenintensiven Tasks können z.B. zur Ausführung an den Default-Executor (commonPool) übergeben werden. Die Strukturierung des kompletten Vorgangs entspricht einem erweiterten Fire-and-Forget-Pattern (Abb. 8).

 

Aufbau der Callback-Methode für die Bild- und Fourier-Koeffizientenberechnung. Die orange eingefärbten Tasks greifen auf die UI zu und müssen im JavaFX-Thread ausgeführt werden. (Abb. 8)

 

Auch bei der Bildkompression kann Task-Parallelität eingesetzt werden. Sobald die reduzierten Fourier-Koeffizienten (reduzierte FFT-Matrix) vorliegen, werden zeitgleich die Darstellungen des reduzierten Bildes und der reduzierten Fourier-Koeffizienten berechnet. Sind die Berechnungen abgeschlossen, werden die zugehörigen Image-Objekte erzeugt und angezeigt. Abschließend wird der während der Berechnung eingeblendeter Fortschrittsbalken wieder entfernt (Abb. 9).

 

 

Aufbau der Callback-Methode für Bildreduktion. Tasks (orange eingefärbt), die auf die UI zugreifen müssen im JavaFX-Thread ausgeführt werden. (Abb. 9)

Neben den hier beschriebenen Szenarien für Task-Parallelität, lässt sich für die Berechnungen der zweidimensionalen Fourier-Transformationen auch Daten-Parallelität (parallel-Streams) einsetzen, worauf hier aber nicht näher eingegangen werden soll. Weiter werden für die Anzeige der Prozessorlast Reactive-Streams eingesetzt. Die CPU-Werte entsprechen einem kontinuierlichen Datenstrom (Publisher), bei dem sich der Anzeigebalken als Subscriber registriert.

 

Fazit:

Für den Einsatz von Parallelisierung müssen bei einer Anwendung zuerst nebenläufige Aktionen bzw. Tasks identifiziert werden. Im Anschluss wird jeweils entschieden, ob es sich um Daten- oder Task-Parallelität handelt. Java stellt für die Umsetzung einer Parallelisierung verschiedene Abstraktionen bzw. Frameworks zur Verfügung. Neben parallelen Streams für die Implementierung von Datenparallelität kommt hier insbesondere die CompletableFuture-Klasse zum Einsatz. Mit ihrer Hilfe wird auf einfache Art und Weise Task-Parallelität umgesetzt, wobei ggf. auf die Koordinierung von Race-Conditions geachtet werden muss. Besitzt man erst einmal eine gewisse Erfahrung für die Identifikation und Handhabung von Nebenläufigkeit, lässt sich durch den Einsatz von CompletableFutures elegant Task-Parallelität realisieren. Durch den Einsatz einiger Best-Practices werden Rechenressourcen besser ausgenutzt und Anwendungen somit schneller und reaktiver.

 

Prof. Jörg Hettel war als Berater bei nationalen und internationalen Firmen tätig. Er begleitete zahlreiche Firmen bei der Einführung von objektorientierten Technologien und übernahm als Softwarearchitekt Projektverantwortung. Seit 2003 ist er Professor an der Hochschule Kaiserslautern am Standort Zweibrücken.

 

Links:

[1] https://java-pro.de/reaktive-programmierung-teil-1/

[2] https://java-pro.de/reaktive-programmierung-teil2/

[3] Projekt Loom: https://openjdk.java.net/projects/loom/

[4] Hettel und Tran: Nebenläufige Programmierung mit Java, dpunkt.verlag, 2016

[5] Beispielcode: https://bit.ly/2UXERIf

Victoria Krautter


Leave a Reply