JAVA-B4
Parallel Computing (Parallelism)
Parallelism
Multithreading
Klassische Threads
Threads & Runnables
Overview
Thread (Begriff)
DruckThread
DruckRunnable
ThreadDemo
Output
Thread Lifecycle
Overview
start()
run()
join()
stop()?
StartRunJoinDemo Example
Von allein endende Threads
Threads mit while(criteria)
Daemon-Threads
Overview
ThreadDaemonDemo
Daemon (Summary)
Thread-Priority
Overview
t.setPriority(..)
MachineThread Klasse (Example)
ThreadPriorityDemo (Example)
Thread Notification
Grundlagen
wait
notify
notifyAll
sleep
sleep vs. wait
Thread Notification Demo
sychronized(monitor)
Notification/Monitor Zusammenfassung
Concurrent API
Vom Thread zur Concurrent API
Was ist Concurrency?
Was ist Concurrency?
Problemstellungen bei herkömmlichen Threads
Features der Concurrent API
Warum: Bedeutung des Umstiegs
Themenüberblick
Technischer Hintergrund
Thread.sleep(0, 500_000)
System.nanoTime()
Executors (Übersicht) #310
ExecutorService (Page)
Begriffsbestimmung
Excetutor
.execute()
ExecutorService (Begriff)
ExecutorService Interface Methoden
Mehrere Callables und Runnables auf einmal starten
Kontrolliertes Beenden
Executors-Utilily Klasse
newCachedThreadPool()
newSingleThreadExecutor()
newFixedThreadPool(int nThreads)
newWorkStealingPool()
newScheduledThreadPool(int corePoolSize)
Beispiel
ForkJoinPool, RecursiveTask, Executors Example#09
A | B1,B2,B3 | C mit Summenbildung.
Executor vs. ExecutorService Example#10
Executors.newFixedThreadPool(2)
Beispiel
Zusammenfassung Executors
Callable, Future #330
Callable Future (Begriff)
Callable Interface (Begriff)
Begriff
Callable erzeugen
Callable verwenden
Future
Beispiel
Future-Listen mit invokeAll
Was sind Future-Listen?
Wie erzeuge ich Future-Listen?
Wann blockt eine Future-Liste?
Beispiel mit 12 Callables in 4 Gruppen
Executors (#350) @! Bis auf Hinweis doppelt
Overview
Executor (Begriff)
ExecutorService (Begriff)
Executors.new~ Factory Methoden
Executors für ThreadPoolExecutor mit fixed Pool
ScheduledExecutorService
ScheduledExecutorService
Superklasse ExecutorService
schedule(*)
scheduleAtFixedRate(*)
scheduleWithFixedDelay(*)
ScheduledExecutorService (xmpl)
ProductClock(Main, Bsp.)
AbstractMachine**
AbstractMaterial**
Coal/Iron/Steel**
Coal-/Iron-/Steel-Machine**
Material**
ProductionBus-Machine**
Storage T**
ThreadPoolExecutor (Detail)
Executor Implementierung
ThreadPoolExecutor (Term)
ThreadPoolExecutor Beispiele
"FixedSizeExecutorExample" EXAMPLE C20
"CachedExecutorExample" EXAMPLE C21
SynchronousQueue
"ScheduledExecutorExample" EXAMPLE C22
"CustomSingleThreadExecutorRunnableComponent" EXAMPLE C23
"CustomPolicyRejectedExecutorExample" EXAMPLE C24
"PrioritizedCombinerRunnableComponent" EXAMPLE C30
"BufferingRejectedTasksExample" EXAMPLE C31
Java RejectedExecutionHandler
ThreadPoolExecutor FAQ
Synchronizer
Synchronizer (overview)
Synchronizer Begriff
CountDownLatch
CountDownLatch Example
CyclicBarrier
CyclicBarrier Example (@Zeit ausgeben!)
Phaser
Phaser Example(@Erl!)
Exchanger
Exchanger ping/pong
Semaphore
CompletableFuture
async!!!
SemaphoreCompletableFutureDemo (@!?)
Monitoring
JMX
ThreadPoolExecutor "live" überwachen
getActiveCount()
getPoolSize()
getCompletedTaskCount()
getQueue().size()
Concurrency
Example:DeterministicCopy @todo FILE LINK FEHLT!
Example:NonDeterministicCopy @todo FILE LINK FEHLT!
Kapitel java.util.concurrent.Flow @!fehltReactive Flow @!fehltPublisher TSubscriber TSubscription TProcressor TProcressor T
Overview
JAVA-B4 | Future-List & invokeAll
Future-List: invokeAll in Verbindung mit Futures.
Bevor wir uns im Detail mit den jeweiligen Features befassen geben wir zu Beginn einen Einblick und Überblick, warum, wo und ab wann man diese Techniken einsetzen und herkömmliche Verfahren mit klassischen Threads überarbeiten sollte. Unsere Intro für den Sprung von Threads zu Concurrency API.
Future-List & invokeAll
Future-List & invokeAll | Begriffsbestimmung
Collection zur Speicherung von Future-Instanzen.
Sie können im Prinzip jede Collection nutzen, aber wenn Sie eine List haben möchten: Nutzen Sie z. B. die ArrayList.
Ein Future hat immer einen Typparameter T, speichert im Beispiel also String-Objekte.
List<Future<String>> liste;
Mehrere Tasks dem Executor auf einmal mitteilen.
Wenn man dem Excetutor nicht einen einzigen Task als Callable mit .submit(..) sondern eine ganze Liste mit .invokeAll(...) zuweist, verwaltet der Executor alle diese Threads und wird so viele, wie es ihm bei seiner Pool-Größe erlaubt ist, dann auch starten; der Test muss warten bis ein Platz frei ist.
Näheres siehe Beispiel.
Example
Future-List & invokeAll | Beispiel
Im Beispiel erzeugen wir 12 Callables und starten diese in Gruppen zu je 4 Threads.
package com.stuelken.java.b4.parallel.c12.concurrentinvokeall;
import java.util.concurrent.*;
import java.util.*;
/**
* @author t2m
*/
public class InvokeAllDemo {
// Startzeitpunkt
public static long start;
// Hilfsmethode für Differenz der Zeit.
public static String getDifferenzZeit() {
return (System.currentTimeMillis() - start) + "ms";
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Sicherstellen, dass wir immer 4 Threads auf einmal laufen lassen können.
// Diese Limitierung ist gewollt.
ExecutorService executor = Executors.newFixedThreadPool(4);
// Für die Erfassung von Referenzen auf Callables eine Collection nutzen.
List<Callable<String>> taskList = new ArrayList<>();
// Ein paar Callables erzeugen.
for (int i = 1; i <= 12; i++) {
final int id = i; // Nur finale Werte können an Lambdas durchgereicht werden.
taskList.add(() -> {
System.out.printf(
"Task #%02d gestartet auf Thread %s | %s%n", //
id, Thread.currentThread().getName(), //
getDifferenzZeit() //
);
Thread.sleep(5000);
System.out.printf("Task #%02d beendet auf Thread %s | %s%n", //
id, Thread.currentThread().getName(), //
getDifferenzZeit() //
);
return "Ergebnis von Task #" + id;
});
}
System.out.println("Alle Tasks werden parallel eingereicht…");
start = System.currentTimeMillis();
List<Future<String>> futures = executor.invokeAll(taskList);
System.out.println("Alle Tasks ausgeführt, warte auf Ergebnisse…");
for (Future<String> future : futures) {
String result = future.get(); // blockierend
System.out.println("result: " + result);
}
executor.shutdown();
System.out.println("Dauer: " + getDifferenzZeit());
}
}
// @formatter:off
/*
Alle Tasks werden parallel eingereicht�
Task #01 gestartet auf Thread pool-1-thread-1 | 11ms
Task #04 gestartet auf Thread pool-1-thread-4 | 11ms
Task #02 gestartet auf Thread pool-1-thread-2 | 11ms
Task #03 gestartet auf Thread pool-1-thread-3 | 11ms
Task #01 beendet auf Thread pool-1-thread-1 | 5081ms
Task #04 beendet auf Thread pool-1-thread-4 | 5081ms
Task #02 beendet auf Thread pool-1-thread-2 | 5081ms
Task #05 gestartet auf Thread pool-1-thread-2 | 5086ms
Task #06 gestartet auf Thread pool-1-thread-4 | 5086ms
Task #07 gestartet auf Thread pool-1-thread-1 | 5088ms
Task #03 beendet auf Thread pool-1-thread-3 | 5089ms
Task #08 gestartet auf Thread pool-1-thread-3 | 5090ms
Task #05 beendet auf Thread pool-1-thread-2 | 10088ms
Task #09 gestartet auf Thread pool-1-thread-2 | 10089ms
Task #08 beendet auf Thread pool-1-thread-3 | 10093ms
Task #06 beendet auf Thread pool-1-thread-4 | 10093ms
Task #07 beendet auf Thread pool-1-thread-1 | 10093ms
Task #11 gestartet auf Thread pool-1-thread-4 | 10096ms
Task #10 gestartet auf Thread pool-1-thread-3 | 10095ms
Task #12 gestartet auf Thread pool-1-thread-1 | 10097ms
Task #09 beendet auf Thread pool-1-thread-2 | 15093ms
Task #12 beendet auf Thread pool-1-thread-1 | 15105ms
Task #11 beendet auf Thread pool-1-thread-4 | 15106ms
Task #10 beendet auf Thread pool-1-thread-3 | 15105ms
Alle Tasks ausgef�hrt, warte auf Ergebnisse�
result: Ergebnis von Task #1
result: Ergebnis von Task #2
result: Ergebnis von Task #3
result: Ergebnis von Task #4
result: Ergebnis von Task #5
result: Ergebnis von Task #6
result: Ergebnis von Task #7
result: Ergebnis von Task #8
result: Ergebnis von Task #9
result: Ergebnis von Task #10
result: Ergebnis von Task #11
result: Ergebnis von Task #12
Dauer: 15111ms
*/
Zeiterfassung
Es ist immer clever, wenn man mal die Laufzeiten misst.
Executors.newFixedThreadPool(4);
Eine Limitierung von Thread-Pools ist mehr als clever. So kann man die Anzahl der Thread limitieren.
Mit 4 Threads bilden wir bei 12 Task jetzt 3 Ausführungsgruppen. Anhand der Zeit lässt sich das erkennen.
futures = executor.invokeAll(taskList)
Wir starten hierbei nun alle Threads auf einmal. Der Pool bleibt aber weiterhin auf 4 Threads limitiert bis zu dem Zeitpunkt, wo wir den Pool vergrößern würden.
Wann wird geblockt?
Futures werden als Referenzen weitergereicht und dienen schlichtweg als Referenz auf ein Ergebnis, was irgendwann dann noch kommen wird.
So lange man aus einem Future den Wert nicht mit Future.get() abzurufen versucht, wird auch nicht geblockt.
Die Besonderheit ist bei einem Future immer die, dass wir über eine Reihenfolge der Future.get() Befehle mindestens so lange warten, bis zumindest der benötigte Wert vorliegt.
Uns liegen also die Werte später in der korrekten Reihenfolge vor, auch wenn die wirklichen Ergebnisse mancher Futures schon seit Stunden vorlagen.
Was passiert, wenn es zu lange dauert?
invokeAll(taskList, timeout, unit) ermöglicht die Angabe eines Zeitwertes mit Angabe der Zeiteinheit.
Timeout- und Error-Handling bei Callables und Futures oder auch CompletionService sind ein eigenes Thema.
UIO3 Es ist einfacher als Du denkst.
Stelle noch heute Deine Anfrage.
