JAVA-B4
Parallel Computing (Parallelism)
Parallelism
Multithreading
Klassische Threads
Threads & Runnables
Overview
Thread (Begriff)
DruckThread
DruckRunnable
ThreadDemo
Output
Thread Lifecycle
Overview
start()
run()
join()
stop()?
StartRunJoinDemo Example
Von allein endende Threads
Threads mit while(criteria)
Daemon-Threads
Overview
ThreadDaemonDemo
Daemon (Summary)
Thread-Priority
Overview
t.setPriority(..)
MachineThread Klasse (Example)
ThreadPriorityDemo (Example)
Thread Notification
Grundlagen
wait
notify
notifyAll
sleep
sleep vs. wait
Thread Notification Demo
sychronized(monitor)
Notification/Monitor Zusammenfassung
Concurrent API
Vom Thread zur Concurrent API
Was ist Concurrency?
Was ist Concurrency?
Problemstellungen bei herkömmlichen Threads
Features der Concurrent API
Warum: Bedeutung des Umstiegs
Themenüberblick
Technischer Hintergrund
Thread.sleep(0, 500_000)
System.nanoTime()
Executors (Übersicht) #310
ExecutorService (Page)
Begriffsbestimmung
Excetutor
.execute()
ExecutorService (Begriff)
ExecutorService Interface Methoden
Mehrere Callables und Runnables auf einmal starten
Kontrolliertes Beenden
Executors-Utilily Klasse
newCachedThreadPool()
newSingleThreadExecutor()
newFixedThreadPool(int nThreads)
newWorkStealingPool()
newScheduledThreadPool(int corePoolSize)
Beispiel
ForkJoinPool, RecursiveTask, Executors Example#09
A | B1,B2,B3 | C mit Summenbildung.
Executor vs. ExecutorService Example#10
Executors.newFixedThreadPool(2)
Beispiel
Zusammenfassung Executors
Callable, Future #330
Callable Future (Begriff)
Callable Interface (Begriff)
Begriff
Callable erzeugen
Callable verwenden
Future
Beispiel
Future-Listen mit invokeAll
Was sind Future-Listen?
Wie erzeuge ich Future-Listen?
Wann blockt eine Future-Liste?
Beispiel mit 12 Callables in 4 Gruppen
Executors (#350) @! Bis auf Hinweis doppelt
Overview
Executor (Begriff)
ExecutorService (Begriff)
Executors.new~ Factory Methoden
Executors für ThreadPoolExecutor mit fixed Pool
ScheduledExecutorService
ScheduledExecutorService
Superklasse ExecutorService
schedule(*)
scheduleAtFixedRate(*)
scheduleWithFixedDelay(*)
ScheduledExecutorService (xmpl)
ProductClock(Main, Bsp.)
AbstractMachine**
AbstractMaterial**
Coal/Iron/Steel**
Coal-/Iron-/Steel-Machine**
Material**
ProductionBus-Machine**
Storage T**
ThreadPoolExecutor (Detail)
Executor Implementierung
ThreadPoolExecutor (Term)
ThreadPoolExecutor Beispiele
"FixedSizeExecutorExample" EXAMPLE C20
"CachedExecutorExample" EXAMPLE C21
SynchronousQueue
"ScheduledExecutorExample" EXAMPLE C22
"CustomSingleThreadExecutorRunnableComponent" EXAMPLE C23
"CustomPolicyRejectedExecutorExample" EXAMPLE C24
"PrioritizedCombinerRunnableComponent" EXAMPLE C30
"BufferingRejectedTasksExample" EXAMPLE C31
Java RejectedExecutionHandler
ThreadPoolExecutor FAQ
Synchronizer
Synchronizer (overview)
Synchronizer Begriff
CountDownLatch
CountDownLatch Example
CyclicBarrier
CyclicBarrier Example (@Zeit ausgeben!)
Phaser
Phaser Example(@Erl!)
Exchanger
Exchanger ping/pong
Semaphore
CompletableFuture
async!!!
SemaphoreCompletableFutureDemo (@!?)
Monitoring
JMX
ThreadPoolExecutor "live" überwachen
getActiveCount()
getPoolSize()
getCompletedTaskCount()
getQueue().size()
Concurrency
Example:DeterministicCopy @todo FILE LINK FEHLT!
Example:NonDeterministicCopy @todo FILE LINK FEHLT!
Kapitel java.util.concurrent.Flow @!fehltReactive Flow @!fehltPublisher TSubscriber TSubscription TProcressor TProcressor T
Overview
Executors | Overview
Übersicht: Executor-Interfaces und Klassen ersetzen in Java Concurrency das herkömmliche thread.start() sowie mitunter hierbei zugleich auch das Problem, wie man diesen Start für viele Threads und auch deren Ende eigentlich kontrolliert steuert.
Das wichtigste Grundprinzip eines Executors vorab:
Man erzeugt diese über die Factory-Methoden von Executors.*
Begriffsbestimmung
Executors | Begriffsbestimmung
Rund um Executors gilt es drei ähnlich klingende Bezeichnungen zu verstehen: Executor, ExecutorService und Executors.
Executor
Executors | Executor
Executor ist das Basis-Interface
Executor ist ein Interface in Java, das die Ausführung von
Runnable-Objekten abstrahiert, java.util.concurrent.Executor
public interface Executor {
void execute(Runnable command);
}
Die Methode execute(Runnable r)
startet den Task. Zugleich ist diese Methode quasi die einzige
Methode, die das Interface defininert.
Entkopplung der Aufforderung zur Ausgabe von den Details.
Executor als Interface führt dazu, dass man die Art und Weise der Ausführung, die man früher bei Threads genau angeben musste, bei Verwendung eines Executor(s) entkoppeln kann. Es reicht quasi zu wissen, dass man den Excecutor ausführen kann.
Ein Executor fordert dafür keine Methode
Das Executor-Interface fordert nur die execute(Runnable r) Methode.
Ein Runnable hat aber nur eine einzige Methode den Rückgabe-Wert stehts void
ist.
Dem Executor fehlt die shutDown(..) Methode
Das bedeutet für die Praxis, dass man zwar Runnables starten kann, aber selbst wenn die Tasks selbst fertig sind, ist der Executor noch immer am Laufen.
Der Executor ist quasi ein ThreadPoolExecutor, welcher darauf wartet, dass er weiter Runnables mit .submit(..) zur Ausführung bekommt. Wenn diese ihm aber nicht geben, wartet er einfach weiter; und bewirkt, dass das Programm bzw. die main-Methode nicht endet.
ExecutorService
Executors | ExecutorService
Auch ExceutorService ist ein Interface
Der java.util.concurrent.ExecutorService
erweitert das Executor Interface durch eine Reihe weiterer
Methoden:
Interface:
// Das offizielle Interface
public interface ExecutorService extends Executor { // Erweitert Executor
Future<?> submit(Runnable task); // Fordert eine submit(..) Methode für Runnables
<T> Future<T> submit(Callable<T> task); // Fordert eine submit(..) für Callables
void shutdown(); // Fordert die Option für eine Aufforderung zum Beenden von Tasks
List<Runnable> shutdownNow(); // Fordert die sofortige Beendigung.
boolean awaitTermination(long timeout, TimeUnit unit);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); // Bulk
<T> T invokeAny(Collection<? extends Callable<T>> tasks); // Bulk
// …
}
Ein ExecutorService verwaltet Tasks als Callable<T>
Mit Hilfe von .submit(callable)
kann man einen Task übertragenl, welcher sogleich auch ausgeführt wird bzw. als Thread
nebenläufig ausgeführt werden kann, wenn denn die Runtime diesem Prozessorzeit
gönnt.
Die Besonderheit besteht darin, dass ein Callable einen Rückgabewert vom
Typ Future T erzeugen kann. Damit ist es also
möglich, dass ein ExecutorService nicht nur nebenläufige Prozesse startet
sondern jeder dieser Prozesse auch einen Rückgabe-Wert haben darf.
... und mehrere Futures T als Antwort bekommen.
invokeAny(..) und invokeAll(..)
ermöglichen eine sogeannte «Bulk»-Ausführung von
Callable Tasks: Anstatt also wie bei
submit(Callable task) nur einen einzigen Task
je Befehl zu übergeben kann man auch mehr übertragen.
shutDown() und shutDownNow() erleichtern die Arbeit.
Ein ExecutorService bietet die Kontrolle über den Lebenszyklus (z. B. shutdown(),
awaitTermination(), shutdownNow. Man kann
also Tasks kontrolliert ordentlich beenden lassen oder das Beenden auch forcieren.
Er unterstützt Pools (z. B. FixedThreadPool, CachedThreadPool).
Mehrfaches Aufrufen ist möglich
Ein ExcecutorService kann mehrere Tasks gleichzeitig starten und überwachen.
Man kann also mehrere nacheinander mit submit(..)
übergeben oder auch mehrere mit InvokeAny und
InvokeAll.
Executors Utility-Klasse
Executors | Executors Utility-Klasse
...
Die Klasse Executors ist ein Utility für Factory-Methoden und Wrapper. Man verwendet diese Klasse dazu, um neue Executor-Instanzen erzeugen zu können.
newCachedThreadPool(), newFixedThreadPool(int nThreads) und weitere Methoden mehr erzeugen ThreadPools.
...
Dynamisch wachsendes Pool, Threads werden bis zu 60 s idle gehalten.
Nach Ablauf dieser 60s werden diese Threads beendet, auch ohne dass diese gelaufen sind.
...
Fester Pool mit nThreads und unbeschränkter Warteschlange.
Sehr praktisch für den Fall, dass man beispielsweise nur 2 Threads erlauben möchte.
...
Ein einzelner Worker-Thread, Aufgaben in Reihenfolge.
Auch wenn man mehrere Runnables, Callables, Tasks überträgt: Es wird immer maximal ein Thread zur Zeit aktiv sein, wie man es quasi von JavaScript kennt.
Auch dieser Task kann mal eine Weile schlafen und wieder starten, ist aber erst wieder an der Reihen, wenn es keinen zweiten gibt, der schon läuft.
...
Fork/Join-basiertes Pool für Work-Stealing.
...
Pool für zeitversetzte und periodische Aufgaben.
Hierzu sei erwähnt, dass es mehrere Implementierungsklassen für Executor-Typen gibt, darunter den ScheduledThreadPoolExecutor.
Auf die ThreadPoolExecutor-Klasse als Implementierungsklasse wird noch später eingangen werden.
...
Wrapper, der keine Implementierungs-Methoden freigibt.
...
Erzeugung von ThreadFactory
...
Liefert Callable-Adapter-Objekte.
Beispiel
Executors | Beispiel
Executor, ExecutorService, Executors werden in vielen unserer Beispiele verwendet.
Executor vs. ExecutorService
Executors | ForkJoinPool A|B1,B2,B3|C
Die Executors-Utility-Klasse kann beispielsweise auch von
ce:RecursiveTask abgeleitete
Objekte dazu verwenden eine Verschachtelung von Tasks mit
Task-Fork und Task-Join zu verwalten.
Das nachfolgende Beispiel zeigt, wie man einen Task A beginnt, welcher intern 3 Sub-Tasks B1, B2, B3 ausführt.
package com.stuelken.java.b4.parallel.c09.concurrentstealing;
import java.util.concurrent.*;
/**
* Es soll folgende Tasks geben: A ist der erste Task. Daraus sollen 3 Tasks B1,
* B2, B3 als "Fork" erstellt werden. Abschließend sollen die drei wieder als
* "Join" zu C fortgesetzt werden.
*
* @author t2m
*/
public class WorkStealingForkJoinDemo {
/**
* Oberaufgabe oder erster Task, der von drei Unteraufgaben B1–B3 "forkt" und
* deren Ergebnisse "joint".
*
* Erweitert die Klasse {@link RecursiveTask} mit dem
* Typparameter {@link Integer}, damit unserer Task
* schlussendlich auch ein Ergebnis berechnen kann.
*
* @author t2m
*/
static class TaskA extends RecursiveTask<Integer> {
/**
*
*/
private static final long serialVersionUID = -506106973422841988L;
/**
* "The main computation performed by this task." (JavaDoc).
*/
@Override
protected Integer compute() {
System.out.println("TaskA compute begonnen.");
// Wir erzeugen 3 Tasks für unseren Fork, welche aber
// alle auf dem selben Typ basieren und sich nur über den
// Wert für den Konstruktor unterscheiden.
System.out.println("TaskA erzeugt b1, 2, b3 mit b1.fork() etc..");
TaskB b1 = new TaskB(1); // Aufgabe B mit Wert 1
TaskB b2 = new TaskB(2); // Aufgabe B mit Wert 2
TaskB b3 = new TaskB(3); // Aufgabe B mit Wert 3
// Fork der Unteraufgaben
// Dieser Befehl bewirkt, dass sich dieser Task in dem Container-Thread,
// in welchem er sich befindet, in einen "common pool" einzufinden
// versucht, um dort asynchron nebenläufig mit anderen weiteren
// Tasks ausgeführt zu werden.
b1.fork();
b2.fork();
b3.fork();
// Join: Ergebnisse zusammenrechnen
// Mit .join() liefert jeder Task seinen berechneten Wert
// zurück. Die .join() Methode blockt entsprechend, bis
// diesen Wert denn auch gibt.
//
// Vergleiche auch .get() welche bei Callables ein Future<T> liefert.
int sum = b1.join() + b2.join() + b3.join();
System.out.printf( //
"A collected sum=%d in %s%n", //
sum, //
Thread.currentThread().getName() //
);
System.out.println("TaskA compute beendet.");
return sum;
}
}
/**
* B: Eine ganz einfache Aufgabe, liefert id*10
*
* Von RecursiveTask T können KEINE Objekte erzeugt werden; man kann diese
* Klasse allerdings ableiten.
*
* @author t2m
*
*/
static class TaskB extends RecursiveTask<Integer> {
private final int id;
/**
* Jeder Tasks hat eine id, damit wir diesen unterscheiden können.
*
* @param id
*/
TaskB(int id) {
this.id = id;
}
/**
* Jeder {@link RecursiveTask} fordert die compute()
* Methode, damit in unserem Fall ein Task für den Typ Integer
* am Ende mit compute() auch einen Integer-Wert liefrt.
*/
@Override
protected Integer compute() {
// Ausgabe, welcher B-Task hier gerade läuft.
System.out.printf("Running B%d in %s%n",
id, Thread.currentThread().getName());
return id * 10; // Berechnung multipliziert ID einfach mit 10.
}
}
/**
*
* @param args Keine Argumente
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// {#1} Erzeugt einen Work-Stealing-Pool mit (hier) 4 parallel arbeitenden Threads
// {#2} Executors erzeugt stets Objekte einer Implementierungsklasse
// {#3} Der Rückgabewert der Factory-Methoden ist aber ein Executor-Interface.
// {#4} Wir müssen deshalb den Wert casten.
//
// WICHTIG: Dieser spezielle Pools ist in der Lage, verschachtelte
// Tasks verwalten zu können.
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(4);
// A wird in den Pool eingereicht
// B1 bis B3 brauchen wir nicht übergeben, weil diese innerhalb von A
// erzeugt werden. Der Task A ist also der Container für den Fork von 3
// Sub-Tasks.
System.out.println("main: pool.submit(new TaskA() vorher");
ForkJoinTask<Integer> taskA = pool.submit(new TaskA());
System.out.println("main: pool.submit(new TaskA() nachher");
// C: Holt das Gesamtergebnis ab
// Einen Task "C" sparen wir uns weil dieser einfach der letzte
// unserer main-Methode und damit dem Haupthread ist.
System.out.printf("Final result in main: %d%n", taskA.get());
// ! Man beachte, dass task.get() ein Future T liefert und damit zuerst einmal blockt is der Wert kommt.
// Abschließend unseren Executor für den Pool kontrolliert beenden.
pool.shutdown();
}
}
// @formatter:off
/*
main: pool.submit(new TaskA() vorher
main: pool.submit(new TaskA() nachher
TaskA compute begonnen.
TaskA erzeugt b1, 2, b3 mit b1.fork() etc..
Running B1 in ForkJoinPool-1-worker-1
Running B3 in ForkJoinPool-1-worker-3
Running B2 in ForkJoinPool-1-worker-2
A collected sum=60 in ForkJoinPool-1-worker-1
TaskA compute beendet.
Final result in main: 60
*/
Beispiel
Executors | Beispiel
Executor, ExecutorService, Executors werden in vielen unserer Beispiele verwendet.
Executor vs. ExecutorService
Executors | Executor vs. ExecutorService
Den Unterschied zwischen einem Executor und einem ExecutorServices demonstriert unser Beispiel.
In beiden Fällen erfolgt der Zugriff auf die API über die
java.util.concurrent.Executor
Klasse, was es recht einfach macht, sich das zu merken.
package com.stuelken.java.b4.parallel.c10.concurrentexecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @param args
*/
public class ExecutorServiceDemo {
public static void main(String[] args) {
/*
* === Einfacher Executor
*/
Executor simpleExecutor = Executors.newSingleThreadExecutor();
simpleExecutor.execute(() -> {
System.out.println("Task mit einfachem Executor laeuft");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
((ExecutorService) simpleExecutor).isShutdown();
/*
* === Vollwertiger ExecutorService
*/
ExecutorService poolExecutor = Executors.newFixedThreadPool(2);
poolExecutor.submit(() -> {
System.out.println("Task 1 via ExecutorService laeuft");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
poolExecutor.submit(() -> {
System.out.println("Task 2 via ExecutorService laeuft");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Beispiel #3: Kontrolle via shutdown()
System.out.println("main: sleep 10000!");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("main: poolExecutor.shutdown!");
poolExecutor.shutdown();
System.out.println("main: Letzte Zeile. ");
///////////////////////////////////
// TODO: endet das Programm nicht?
///////////////////////////////////
if (false) {
((ExecutorService) simpleExecutor).shutdown();
System.out.println("main: Jetzt hat auch simple~ geendet. ");
}
}
}
// @formatter:off
/*
Task mit einfachem Executor laeuft
Task 1 via ExecutorService laeuft
main: sleep 10000!
Task 2 via ExecutorService laeuft
main: poolExecutor.shutdown!
main: Letzte Zeile.
*/
Zusammenfassend
Executors | Zusammenfassend
Executor, ExecutorService und die Executors-Utility-Klasse erleicht das Starten, Verwalten und Beenden von Runnables und Callables, wobei im Falle von Callables wir auch noch Rückgabe-Werte bekommen können.
Callables und Futures werden im nächsten Kapitel erläutert.
Implementierungen für ExecutorService: ThreadPoolExecutor als Superklasse
für ScheduledThreadsPoolExecutor sowie ForkJoinPool.
Pool-Size
Anzahl der CPU-Cores? I/O-Last und Laufzeiten für Tasks?
UIO3 Es ist einfacher als Du denkst.
Stelle noch heute Deine Anfrage.
