Fork und Join aus Java 7

Gesucht ist ein Framework zum Lösen von parallelen D&C-Algorithmen, die berechnungsintensiv sind. Sun hat in Java 7 das Fork/Join integriert, was im Rahmen von jsr166y (http://gee.cs.oswego.edu/dl/concurrency-interest/) unter maßgeblicher Arbeit von Doug Lea entwickelt wurde. Die grundlegende Idee ist, neben Threads, noch eine andere Arbeitseinheit einzuführen, die Tasks.
  • Threads: Werden vom Betriebssystem verwaltet und laufen entweder pseudo-parallel auf einem Prozessor/Core oder echt parallel. Threads können sich mit anderen Threads koordinieren. Zu viele Threads, die sich im Weg stehen und aufeinander Warten führen zu keiner verbesserten Ausführungszeit gegenüber einer sequentiellen Lösung.
  • Tasks: Werden von Threads bzw. einem Thread-Pool ausgeführt. Sie sind Arbeitseinheiten, die nicht auf andere Tasks warten

Die Tasks sind kleine Arbeitspakete und werden in eine Task-Queue gelegt und dann von Threads abgearbeitet. Hat das System zwei Prozessoren und hat der Thread-Pool die Größe 2, so ist es wahrscheinlich, dass 2 Tasks parallel abgearbeitet werden. Gibt es 4 Prozessoren, können 4 Tasks vielleicht parallel laufen. Tasks lassen sich also grundsätzlich auf einer beliebigen Anzahl Threads und somit Prozessoren/Cores bringen, wobei im Gegensatz die Effektivität von Threads immer mit der physikalischen Anzahl von Prozessoren/Cores assoziiert ist.

Das Fork/Join-Framework löst die Probleme effektiv. Wie der Name schon andeutet, geht es bei Fork um das Erstellen eines neuen Tasks und bei Join um das Zusammenführen der Ergebnisse. Die Fork/Join-Bibliothek bietet dazu die Klasse ForkJoinPool und zwei zentrale Methoden: fork() und join(). Zur Abarbeitung der Tasks stellt das Framework die Threads zu Verfügung, deren Anzahl wir zwar selbst bestimmen können, aber die Anzahl Prozessoren/Cores eine gute Standardgröße ist.[1] Die Methode fork() erzeugt einen neuen Task, der an den Anfang (!) einer Queue gestellt wird. Dabei haben alle Threads eine Queue für ihre Arbeitsaufträge und sollte einmal eine Queue leer gelaufen sein, so nimmt sich der Thread einfach einen Task vom Ende (!) einer anderen nicht-leeren Queue. (Das nennt sich work-stealing und ist in der Realwelt ziemlich selten anzutreffen.) Dass neue Tasks an den Anfang gestellt werden ist einfach zu erklären: Die Tasks werden ja immer kleiner und somit stehen die kleinen, schnell lösbaren Aufgaben vorne. Erst später folgen die größeren Aufgaben, die auf die Ergebnisse der kleinen Aufgaben zurückgreifen, die dann logischerweise schon berechnet wurden.

Zur Theorie ein Beispiel: Es geht darum, mit Fork/Join ein Programm zu haben, welches parallel das Maximum eines Arrays sucht. Der Start ist:

int[] array = { 0, 9, 10, 111, 1, 12, 13, 14, 17 };
System.out.println( MaxElementInArrayFinder.findMax( array ) );

Die eigene Klasse MaxElementInArrayFinder bietet die Methode findMax(), die auf den ForkJoinPool zurückgreift, um mit invoke() den Haupt-Task abzusetzen.

class MaxElementInArrayFinder
{
private static final ForkJoinPool fjPool = new ForkJoinPool();
...
public static int findMax( int[] array )
{
return fjPool.invoke( new MaxElemTask( array, 0, array.length -1 ) );
}
}

Die Klasse MaxElemTask repräsentiert unser Arbeitspakt. Die Tasks referenzieren jeweils das Array, und die Anfangs-/Endeposition, ab der sie nach dem Maximum suchen sollen.

private static class MaxElemTask extends RecursiveTask<Integer>
{
private final int[] array;
private final int start, end;
MaxElemTask( int[] array, int start, int end )
{
assert array != null && start >= 0 && start <= end;
this.array = array;
this.start = start;
this.end = end;
}
@Override protected Integer compute()
{

}
}

Unsere Klasse erweitert die Basisklasse RecursiveTask<Integer> und deutet durch den generischen Typ schon an, das das Ergebnis des Tasks eine Ganzzahl sein wird, nämlich das Feldmaximum aus dem gewünschten Bereich. Der Konstruktor sichert die Werte und compute() führt die eigentliche Arbeit aus: Es löst entweder das Problem direkt, wenn es klein genug ist, oder spannt Unter-Tasks auf und wartet anschließend auf deren Ergebnisse.

@Override protected Integer compute()
{
assert array != null && array.length > 0;
System.out.printf( "max( start=%d, end=%d )%n", start, end );
if ( end - start < 4 )
{
int max = array[start];
for ( int i = start + 1; i <= end; i++ )
if ( array[i] > max )
max = array[i];
return max;
}
int middle = (start + end) / 2;
MaxElemTask leftTask = new MaxElemTask( array, start, middle );
leftTask.fork();
MaxElemTask rightTask = new MaxElemTask( array, middle + 1, end );
int rightMax = rightTask.compute();
int leftMax = leftTask.join();
return Math.max( rightMax, leftMax );
}

[1] Das die Maximalanzahl von Threads beim ForkJoinPool zurzeit 32767 ist, dürfte für normale Nutzer keine Einschränkung sein.

Labels: ,

3 Antwort(en) auf ›Fork und Join aus Java 7‹

  1. # Blogger m_hess

    Typo im Titel --> Foin  

  2. # Anonymous Anonym

    Zitat: "Die Tasks speichern jeweils das Array"
    Das ist etwas ungenau ausgedrückt - sie speichern die Referenz auf das Array. Wenn das ganze Feld kopiert würde (sagen wir, es ist ein wirklich großes Feld), wäre das viel zu aufwendig, und würde den Nutzen der nebenläufigen Abarbeitung wahrscheinlich zunichte machen.  

  3. # Anonymous Anonym

    Die Wortwahl passt schon.
    Üblicherweise werden Referenzen benutzt, eine Kopie "müsste" im Gegenteil explizit genannt werden.

    Natürlich darf man dann auch den vorhandenen Quelltext nicht ignorieren, wenn der nicht klar macht was gespeichert wird, ist man vielleicht im falschen Blog gelandet.  

Kommentar veröffentlichen