Tasks parallel ausführen mit Java Future

2 Kommentare

Kürzlich hatte ich die Gelegenheit einen sehr interessanten Vortrag von Adam Bien anzuhören, der unter anderem Future in einem Teil seiner Beispielanwendung benutzt hat. Future bietet einen sehr eleganten Ansatz um Aufgaben in Java parallel auszuführen. Warum es also nicht mal selber in einem kleinen Beispiel ausprobieren und bei der Gelegenheit das Ganze in einen kurzen Blogbeitrag verpacken. Also los geht’s.

Der beste Anfang ist hier erstmal unsere Beispielklasse:

package de.codecentric.blog.sample;
 
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
 
public class FutureTaskDemo {
 
    /**
     * Maximum amount of numbers to check
     */
    public static final int MAX_NUMBER = 2000000000;
 
    /**
     * Returns the amount of numbers that can be divided by the divisor without remainder.
     * @param first First number to check
     * @param last Last number to check
     * @param divisor Divisor
     * @return Amount of numbers that can be divided by the divisor without remainder
     */
    public static int amountOfDivisibleBy(int first, int last, int divisor) {
 
        int amount = 0;
        for (int i = first; i <= last; i++) {
            if (i % divisor == 0) {
                amount++;
            }
        }
        return amount;
    }
 
    /**
     * Returns the amount of numbers that can be divided by the divisor without remainder (using parallel execution).
     * @param first First number to check
     * @param last Last number to check
     * @param divisor Divisor
     * @return Amount of numbers that can be divided by the divisor without remainder
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public static int amountOfDivisibleByFuture(final int first, final int last, final int divisor)
            throws InterruptedException, ExecutionException {
 
        int amount = 0;
 
        // Prepare to execute and store the Futures
        int threadNum = 2;
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
 
        // Start thread for the first half of the numbers
        FutureTask<Integer> futureTask_1 = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() {
                return FutureTaskDemo.amountOfDivisibleBy(first, last / 2, divisor);
            }
        });
        taskList.add(futureTask_1);
        executor.execute(futureTask_1);
 
        // Start thread for the second half of the numbers
        FutureTask<Integer> futureTask_2 = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() {
                return FutureTaskDemo.amountOfDivisibleBy(last / 2 + 1, last, divisor);
            }
        });
        taskList.add(futureTask_2);
        executor.execute(futureTask_2);
 
        // Wait until all results are available and combine them at the same time
        for (int j = 0; j < threadNum; j++) {
            FutureTask<Integer> futureTask = taskList.get(j);
            amount += futureTask.get();
        }
        executor.shutdown();
 
        return amount;
    }
 
    /**
     * Executing the example.
     * @param args Command line arguments
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
 
        // Sequential execution
        long timeStart = Calendar.getInstance().getTimeInMillis();
        int result = FutureTaskDemo.amountOfDivisibleBy(0, MAX_NUMBER, 3);
        long timeEnd = Calendar.getInstance().getTimeInMillis();
        long timeNeeded = timeEnd - timeStart;
        System.out.println("Result         : " + result + " calculated in " + timeNeeded + " ms");
 
        // Parallel execution
        long timeStartFuture = Calendar.getInstance().getTimeInMillis();
        int resultFuture = FutureTaskDemo.amountOfDivisibleByFuture(0, MAX_NUMBER, 3);
        long timeEndFuture = Calendar.getInstance().getTimeInMillis();
        long timeNeededFuture = timeEndFuture - timeStartFuture;
        System.out.println("Result (Future): " + resultFuture + " calculated in " + timeNeededFuture + " ms");
    }	
}

Der hier implementierte Algorithmus (nicht sicher, ob ich es wirklich einen Algorithmus nennen sollte ;)) prüft für eine gegebene Menge von Zahlen, wie viele von diesen durch eine bestimmte Zahl teilbar sind. Dies soll wirklich nur als Beispiel für die Implementierungstechnik dienen, natürlich könnten wir auch berechnen warum 42 die Antwort auf die ultimative Frage des Lebens, der Universums uns allem Anderen ist. Für den Moment möchte ich persönlich aber doch eher bei den trivialen Problemen bleiben.

Die main-Methode wird nur gebraucht, um die beiden Varianten der Berechnung aufzurufen und damit die Zeit für die Ausführung zu berechnen. Die erste Methode amountOfDivisibleBy ist dabei wirklich trivial und braucht denke ich keine weitere Erläuterung. Bei der zweiten Methode amountOfDivisibleByFuture wird es interessant.

Zunächst brauchen wir hier einen Executor, der später benutzt wird um die Future-Tasks zu starten. Die Tasks werden zur späteren Verarbeitung in einer Liste abgelegt:

        int threadNum = 2;
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

Um dieses Beispiel möglichst einfach zu halten werden die beiden Future-Instanzen hart-codiert erzeugt, zur Liste hinzugefügt und dann mit Hilf des Executors ausgeführt. Und jetzt kommt der Kniff an der ganzen Sache. Wir haben jetzt zwei Objekte, welche die Ausführung unseres Algorithmus übernehmen.

        // Start thread for the first half of the numbers
        FutureTask<Integer> futureTask_1 = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() {
                return FutureTaskDemo.amountOfDivisibleBy(first, last / 2, divisor);
            }
        });
        taskList.add(futureTask_1);
        executor.execute(futureTask_1);

Für diese Objekte gibt es nun verschiedene Möglichkeiten zu prüfen, ob die Verarbeitung bereits fertig ist. In diesem Beispiel macht es Sinn in einer Schleife den „blockierenden Aufruf“ get auf unseren Future-Objekten auszuführen. ein solcher Aufruf kommt erst zurück, wenn die Bearbeitung abgeschlossen ist, d.h. der erste Aufruf wird vermutlich länger dauern und der zweite wird fast direkt ein Ergebnis zurückliefern, vorausgesetzt die Hardware ermöglicht eine schnelle parallel Verarbeitung. Die Ergebnisse der beiden Teilaufgaben werden dann nur noch addiert und zurück geliefert am Ende der Methode

        // Wait until all results are available and combine them at the same time
        for (int j = 0; j < threadNum; j++) {
            FutureTask<Integer> futureTask = taskList.get(j);
            amount += futureTask.get();
        }
        executor.shutdown();

Und tatsächlich kann man sehen, dass die Methode welche Future nutzt fast doppelt so schnell ist wie die rein sequentielle Verarbeitung.

Result         : 666666667 calculated in 12500 ms
Result (Future): 666666667 calculated in 6922 ms

Wie so oft in dieser Art Blogbeitrag ist das Beispiel natürlich recht künstlich, aber hoffentlich wird trotzdem deutlich wie Future genutzt werden kann, um Tasks in Java parallel auszuführen. Und vielleicht wartet ja schon eine echte Aufgabe darauf mit Hilfe dieses Features gelöst zu werden.

Thomas Jaspers

Langjährige Erfahrung in agilen Softwareprojekten unter Einsatz von Java-Enterprise-Technologien. Schwerpunkt im Bereich der Testautomatisierung (Konzepte und Open-Source Werkzeuge).

Share on FacebookGoogle+Share on LinkedInTweet about this on TwitterShare on RedditDigg thisShare on StumbleUpon

Kommentare

  • Thomas Scherm

    18. Oktober 2011 von Thomas Scherm

    Futures & GC

    Die Implementierungen von Future halten intern eine Referenz auf die ausführende Callable Instanz. Diese wird auch nach Beendigung der Ausführung im Future gehalten.

    Bei massiven Einsatz von Futures (d.h. viele Sammeln, bis man sie weiterverarbeiten kann) kann sich so einiges im Heap ansammeln.

    Kann man ein solches Design nicht vermeiden, sollte mann vor Beendigung in Betracht ziehen explizit im Callable lokale Objekte zu dereferenzieren.

  • Robert Spielmann

    8. September 2016 von Robert Spielmann

    Schöner und hilfreicher Artikel, vielen lieben Dank 🙂

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.