Java Blog

Asnychrones Programmieren mit CompletableFuture (CompletionStage)

So schöne an Future-Objekten ist die Abarbeitung im Hintergrund und die Möglichkeit, später abzufragen, ob das Ergebnis schon da ist. Allerdings fehlt der Future-Schnittstelle eine Methode, automatisch nach der Fertigstellung einen Folgeauftrag abzuarbeiten. Dafür bietet die Java-Bibliothek eine spezielle Unterklasse CompletableFuture. Die Klasse implementiert die Schnittstelle CompletionStage, die vermutlich die größte Anzahl Operationen in der gesamten Java SE hat. Der Typname drückt aus, das es um die Fertigstellung (engl. completion) von Abschnitten (engl. stage) geht.

Ein Beispiel für einen trinkfesten mutigen Piraten:

package com.tutego.insel.concurrent;




import java.time.LocalTime;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;




class Pirate {




  public static void main( String[] arg ) throws Throwable {




    String result = CompletableFuture.supplyAsync( Pirate::newName )

                                     .thenApply( Pirate::swears )

                                     .thenCombine( drinkRum(), Pirate::combinePiratAndDrinks )

                                     .thenCombine( drinkRum(), Pirate::combinePiratAndDrinks )

                                     .get();

    System.out.println( result ); // Pirat Guybrush flucht und trinkt dann 10 Flaschen Rum und trinkt dann 11 Flaschen Rum

  }




  static String newName() {

    Logger.getGlobal().info( "" + Thread.currentThread() );

    return "Pirat Guybrush";

  }




  static String swears( String pirate ) {

    Logger.getGlobal().info( "" + Thread.currentThread() );

    return pirate + " flucht";

  }




  static CompletableFuture<Integer> drinkRum() {

    Logger.getGlobal().info( "" + Thread.currentThread() );

    try { TimeUnit.SECONDS.sleep( 1 ); } catch ( Exception e ) { }

    return CompletableFuture.supplyAsync( () -> LocalTime.now().getSecond() );

  }




  static String combinePiratAndDrinks( String pirat, int bottlesOfRum ) {

    Logger.getGlobal().info( "" + Thread.currentThread() );

    return pirat + " und trinkt dann " + bottlesOfRum + " Flaschen Rum";

  }

}

Die Ausgabe ist:

Juni 15, 2017 10:41:56 NACHM. com.tutego.insel.thread.concurrent.Pirate drinkRum

INFORMATION: Thread[main,5,main]

Juni 15, 2017 10:41:56 NACHM. com.tutego.insel.thread.concurrent.Pirate newName

INFORMATION: Thread[ForkJoinPool.commonPool-worker-1,5,main]

Juni 15, 2017 10:41:56 NACHM. com.tutego.insel.thread.concurrent.Pirate swears

INFORMATION: Thread[ForkJoinPool.commonPool-worker-1,5,main]

Juni 15, 2017 10:41:57 NACHM. com.tutego.insel.thread.concurrent.Pirate combinePiratAndDrinks

INFORMATION: Thread[main,5,main]

Juni 15, 2017 10:41:57 NACHM. com.tutego.insel.thread.concurrent.Pirate drinkRum

INFORMATION: Thread[main,5,main]

Juni 15, 2017 10:41:58 NACHM. com.tutego.insel.thread.concurrent.Pirate combinePiratAndDrinks

INFORMATION: Thread[main,5,main]

Pirat Guybrush flucht und trinkt dann 57 Flaschen Rum und trinkt dann 58 Flaschen Rum

Zum Programm: Zunächst muss die Kette von Abschnitten aufgebaut werden. Das kann entweder mit dem Standardkonstruktor geschehen, oder mit statischen Methoden. In unserem Fall nutzen wir supplyAsync(Supplier<U> supplier). Die Methode nimmt sich einen freien Thread aus dem ForkJoinPool.commonPool() und lässt den Thread den supplier abarbeiten. Das Ergebnis ist über die Rückgabe, einem CompletableFuture, abrufbar. Als nächsten wenden wir thenApply(Function<? super T,? extends U> fn) an, die vergleichbar ist mit einer map(…)-Operation eines Streams. Interessant wird es bei thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); sie verbindet das Ergebnis der eigenen CompletionStage über eine Funktion mit einer anderen CompletionStage, die wir in unserem Fall auch wieder mit supplyAsync(…) aufbauen. So kombinieren wir zwei unabhängige CompletionStage miteinander und synchronisieren das Ergebnis. Wir können das gut an der Ausgabe auslesen, dass drinkRum ganz am Anfang schon ausgeführt wird, und zwar vom Thread[main,5,main], nicht vom ForkJoinPool, weil es unabhängig von den anderen läuft.