JAVA-B4
Parallel Computing (Parallelism)
Parallelism
Multithreading
Klassische Threads
Threads & Runnables
Overview
Thread (Begriff)
DruckThread
DruckRunnable
ThreadDemo
Output
Thread Lifecycle
Overview
start()
run()
join()
stop()?
StartRunJoinDemo Example
Von allein endende Threads
Threads mit while(criteria)
Daemon-Threads
Overview
ThreadDaemonDemo
Daemon (Summary)
Thread-Priority
Overview
t.setPriority(..)
MachineThread Klasse (Example)
ThreadPriorityDemo (Example)
Thread Notification
Grundlagen
wait
notify
notifyAll
sleep
sleep vs. wait
Thread Notification Demo
sychronized(monitor)
Notification/Monitor Zusammenfassung
Concurrent API
Vom Thread zur Concurrent API
Was ist Concurrency?
Was ist Concurrency?
Problemstellungen bei herkömmlichen Threads
Features der Concurrent API
Warum: Bedeutung des Umstiegs
Themenüberblick
Technischer Hintergrund
Thread.sleep(0, 500_000)
System.nanoTime()
Executors (Übersicht) #310
ExecutorService (Page)
Begriffsbestimmung
Excetutor
.execute()
ExecutorService (Begriff)
ExecutorService Interface Methoden
Mehrere Callables und Runnables auf einmal starten
Kontrolliertes Beenden
Executors-Utilily Klasse
newCachedThreadPool()
newSingleThreadExecutor()
newFixedThreadPool(int nThreads)
newWorkStealingPool()
newScheduledThreadPool(int corePoolSize)
Beispiel
ForkJoinPool, RecursiveTask, Executors Example#09
A | B1,B2,B3 | C mit Summenbildung.
Executor vs. ExecutorService Example#10
Executors.newFixedThreadPool(2)
Beispiel
Zusammenfassung Executors
Callable, Future #330
Callable Future (Begriff)
Callable Interface (Begriff)
Begriff
Callable erzeugen
Callable verwenden
Future
Beispiel
Future-Listen mit invokeAll
Was sind Future-Listen?
Wie erzeuge ich Future-Listen?
Wann blockt eine Future-Liste?
Beispiel mit 12 Callables in 4 Gruppen
Executors (#350) @! Bis auf Hinweis doppelt
Overview
Executor (Begriff)
ExecutorService (Begriff)
Executors.new~ Factory Methoden
Executors für ThreadPoolExecutor mit fixed Pool
ScheduledExecutorService
ScheduledExecutorService
Superklasse ExecutorService
schedule(*)
scheduleAtFixedRate(*)
scheduleWithFixedDelay(*)
ScheduledExecutorService (xmpl)
ProductClock(Main, Bsp.)
AbstractMachine**
AbstractMaterial**
Coal/Iron/Steel**
Coal-/Iron-/Steel-Machine**
Material**
ProductionBus-Machine**
Storage T**
ThreadPoolExecutor (Detail)
Executor Implementierung
ThreadPoolExecutor (Term)
ThreadPoolExecutor Beispiele
"FixedSizeExecutorExample" EXAMPLE C20
"CachedExecutorExample" EXAMPLE C21
SynchronousQueue
"ScheduledExecutorExample" EXAMPLE C22
"CustomSingleThreadExecutorRunnableComponent" EXAMPLE C23
"CustomPolicyRejectedExecutorExample" EXAMPLE C24
"PrioritizedCombinerRunnableComponent" EXAMPLE C30
"BufferingRejectedTasksExample" EXAMPLE C31
Java RejectedExecutionHandler
ThreadPoolExecutor FAQ
Synchronizer
Synchronizer (overview)
Synchronizer Begriff
CountDownLatch
CountDownLatch Example
CyclicBarrier
CyclicBarrier Example (@Zeit ausgeben!)
Phaser
Phaser Example(@Erl!)
Exchanger
Exchanger ping/pong
Semaphore
CompletableFuture
async!!!
SemaphoreCompletableFutureDemo (@!?)
Monitoring
JMX
ThreadPoolExecutor "live" überwachen
getActiveCount()
getPoolSize()
getCompletedTaskCount()
getQueue().size()
Concurrency
Example:DeterministicCopy @todo FILE LINK FEHLT!
Example:NonDeterministicCopy @todo FILE LINK FEHLT!
Kapitel java.util.concurrent.Flow @!fehltReactive Flow @!fehltPublisher TSubscriber TSubscription TProcressor TProcressor T
Overview
ScheduledExecutorService | Overview
Die Interfaces Executor, ExecutorService, ScheduledExecutorServices sind eine Vererbungskette welche uns den Zugriff auf Methoden eines ThreadPoolExecutors ermöglichen.
scheduler.scheduleAtFixedRate(bus::fireTick, 0, 2, TimeUnit.SECONDS)
ist ein klassisches Pattern für die Verwendung eines Schedulers.
ScheduledExecutorService
ScheduledExecutorService | Begriffsbestimmung
ScheduledExecutorService führt Tasks zeitgesteuert aus.
Ein Callable<T> Der ScheduledExecutorService ist ein zeitgesteuerter Task-Executor, der Aufgaben verzögert oder regelmäßig ausführt.
Man kann sich diesen wie einen fortgeschrittenen Timer vorstellen.
Er verwendet aber Threads aus einem Pool und ist somit skalierbar und robust.
Es ist funktional: Man kann Lambda-Ausdrücke dafür verwenden.
Es liefert bei einer Ausführung dem im Typ-Parameter angegeben Datentyp mit einem Wert oder vom Prinzip her auch null.
Es kann checked Exceptions auswerten mit throw Exception.
Executors.newScheduledThreadPool(1)
Man nutzt die statische Methode newScheduledThreadPool(..) von Executors.
Für die Deklaration eines Callables reicht ein Lambda-Ausdruck.
// Den ~Service instanzieren.
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// Dem Service einen Task mit einem Zeitplan mitteilen.
scheduler.schedule(() -> System.out.println("Start nach 3 Sekunden"), 3, TimeUnit.SECONDS);
// ...
Zeitregeln
Mit .submit(task) kann man im Prinzip eine Aufgabe für eine potentiell sofortige Ausführung hinzu.
Mit .schedule(Runnable, delay, timeUnit) kann man einen Task EINMALIG ausführen lassen, das allerdings verzögert; im Prinzip wie ein rückwärts laufender Timer, nur dass Sie den Timer nicht sehen können.
.scheduledAtFixedRate(..) ermöglicht eine wiederholte Ausführung in einem festen Zeitrythmus.
.scheduleWithFixedDelay(..) wiederholt die Ausführung mit einer Pause nach der Beendigung der letzten Ausführung.
schedule(Runnable, delay, unit)
scheduleAtFixedRate(Runnable, initialDelay, period, unit)
scheduleWithFixedDelay(Runnable, initialDelay, delay, unit)
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// Aufgabe einmalig nach 3 Sekunden
scheduler.schedule(() -> System.out.println("Start verzögert"), 3, TimeUnit.SECONDS);
// Aufgabe wiederholt alle 2 Sekunden
scheduler.scheduleAtFixedRate(() -> System.out.println("Tick"), 0, 2, TimeUnit.SECONDS);
ProductionClockDEMO
ScheduledExecutorService | ProductionClockDEMO
Im Beispiel werden 4 Speicherplätze für zu produzierende Rohstoffe angelegt und zu Beginn eine Reihe von Rohstoffen generiert, damit das Programm die weitere Fertigung simulieren kann. Alle Maschinen werden als AbstractMachine definiert und verfügen über eine fireTick() Methode, welche man in einem Zeittakt dann durch den Scheduler aufrufen lassen kann.
Das Nehmen und Einspeichern von Material und Produkten in den jeweiligen Speichern wird über synchronisierte Methoden Thread-sicher ausgeführt. Wartende andere Threads werden mit notifyAll benachrichtigt.
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
import java.util.concurrent.*;
/**
* Diese Klasse beinhaltet das Hauptprogramm mit der {@link #main(String[])} Methode
* in welchemn verschiedene {@link Storage} Instanzen
*
* @author t2m
*/
public class ProductionClockDEMO {
public static void main(String[] args) throws InterruptedException {
// {#10} Speicher anlegen.
Storage<IronOre> oreStorage = new Storage<>("ErzDepot");
Storage<Coal> coalStorage = new Storage<>("KohleDepot ");
Storage<Iron> ironStorage = new Storage<>("EisenDepot ");
Storage<Steel> steelStorage = new Storage<>("StahlDepot ");
// {#20} Vorab bereits etwas Eisenerz speichern.
for (int i = 0; i < 20; i++) {
oreStorage.add(new IronOre());
}
// {#30} Maschinen für die Produktion registrieren.
ProductionBus bus = new ProductionBus();
bus.register(new IronMachine(oreStorage, ironStorage));
bus.register(new CoalMachine(coalStorage));
bus.register(new SteelMachine(ironStorage, coalStorage, steelStorage));
// {#40} Einen Scheduler erzeugen für einen einzigen Thread.
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
long delay = (long) 0; // {#41}
long period = (long) 2; // {#42} Die Periode darf NICHT 0 sein.
System.out.println("main: scheduler.scheduleAtFixedRate(..)");
scheduler.scheduleAtFixedRate(bus::fireTick, delay, period, TimeUnit.SECONDS); // {#43}
System.out.println("main: Thread.sleep(..)");
Thread.sleep(20_000);
System.out.println("main: scheduler.shutdownNow(..)");
scheduler.shutdownNow();
System.out.println("main: Produktion beendet.");
}
}
Storage
Jedes Storage-Objekt dient zum Speichern von Materialien. Alle Materialien implementieren vom Prinzip her das selbe Interface, aber für eine Typisierung werden hier Typparameter verwendet, um die jeweiligen Storage-Instanzen sortenrein befüllen zu können.
~Bus.register(..)
Die jeweiligen Maschinen werden im ~Bus registriert,
damit später der ScheduledExecutorService als
scheduler Instanz
mit einer fixen Zeitrate an jedem dieser Maschine-Objekte
die gleiche Methode über bus::fireTick
aufrufen kann.
bus::fireTick
Das ist eine Methoden-Referenz und damit eine Referenz auf Callable, welches derr Scheduler über scheduleAtFixedRate(..) für Callable, delay und period als Dauer ausführen soll.
Hinweis: period muss größer 0 sein! Die Zahlenangaben sind in long udnd erfordern eine TimeUnit TimeUnit
Tasks
Das Task besteht hierbei darin, an jeder Maschine deren fireTick() Methode aufzurufen. Ob es sich um 3 Maschinen oder 30.000 Objekte handelt, welche dann getaktet (nacheinander) gesteuert über den ~Bus ihre Arbeit aufnehmen, spielt keine wirkliche Rolle.
shutdownnow()
Ein Scheduler dient zum Starten und zum Stoppen der von ihm verwalteten nebenläufigen Prozesse.
Auch wenn die fireTick() Methoden blockend nacheinander im Takt ausgeführt werden, so führt dennoch der Scheduler diese Aufgaben nebenläufig aus.
Bei Shutdown werden die gerade noch laufenden Threads NICHT abgebrochen sondern noch kontrolliert zu Ende gebracht.
Die bus::fireTick Methode, die als Methoden-Referenz dem Scheduler als Callable angegeben wurde, führt also noch die 3 Aufrufe an die 3 Maschinen zuende durch.
Code
ScheduledExecutorService (Example) | Code ***
Das Beispiel erfordert ein paar mehr Files.
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
/**
* Superklasse für {@link CoalMachine}, {@link IronMachine}, {@link SteelMachine}.
*
* Implementierung: Jede Implementierungsklasse benötigt zwingend eine Methode
* {@link #tick()} welche vom Scheduler aufgerufen wird.
*
* Es gibt drei Phasen: {@link #beginWarmUp()}, #prod
*
* @author t2m
*/
public abstract class AbstractMachine {
protected int phase = 0;
public void tick() {
switch (phase) {
case 0 -> beginWarmUp();
case 1 -> produce();
case 2 -> cooldown();
default -> phase = 0;
}
}
protected abstract void beginWarmUp();
protected abstract void produce();
protected abstract void cooldown();
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
/**
* Basisklasse für das {@link Material} wie beispielsweise {@link Coal}.
*
* @author t2m
*
*/
public class AbstractMaterial implements Material {
public AbstractMaterial() {
this.name = this.getClass().getSimpleName();
}
private String name;
@Override
public String getName() {
return this.name;
}
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
public class Coal extends AbstractMaterial {
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
import com.stuelken.java.b4.parallel.c13.scheduledexecutionservice.*;
/**
*
* @author tinyone
*
*/
public class CoalMachine extends AbstractMachine {
private final Storage<Coal> output;
public CoalMachine(Storage<Coal> output) {
this.output = output;
}
protected void beginWarmUp() {
System.out.println("[CoalMachine] WarmUp");
phase++;
}
protected void produce() {
output.add(new Coal());
System.out.println("[CoalMachine] Producing Coal");
phase++;
}
protected void cooldown() {
System.out.println("[CoalMachine] Cooldown");
phase = 0;
}
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
public class Iron extends AbstractMaterial {
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
import com.stuelken.java.b4.parallel.c13.scheduledexecutionservice.*;
/**
* @author t2m
*/
public class IronMachine extends AbstractMachine {
private final Storage<IronOre> input;
private final Storage<Iron> output;
public IronMachine(Storage<IronOre> input, Storage<Iron> output) {
this.input = input;
this.output = output;
}
protected void beginWarmUp() {
System.out.println("[IronMachine] WarmUp");
phase++;
}
protected void produce() {
IronOre ore = input.take();
System.out.printf("[IronMachine] > Iron from %s%n", ore.getName());
output.add(new Iron());
phase++;
}
protected void cooldown() {
System.out.println("[IronMachine] Cooldown");
phase = 0;
}
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
public class IronOre extends AbstractMaterial {
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
/**
* Interface für {@link AbstractMaterial}
* @author t2m
*/
public interface Material {
/**
* Name oder Bezeichner des Materials.
* @return
*/
String getName();
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
import java.util.*;
/**
* Eine {@link ProductionBus} Instanz
* speichert intern die Referenzen auf {@link #machines} {@link AbstractMachine}
* Instanzen.
*
* @author t2m
*/
public class ProductionBus {
private final List<AbstractMachine> machines = new ArrayList<>();
public void register(AbstractMachine m) {
machines.add(m);
}
/**
*
*/
public void fireTick() {
System.out.println("\n Neuer Takt:");
machines.forEach(AbstractMachine::tick);
}
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
import java.util.concurrent.*;
/**
* Diese Klasse beinhaltet das Hauptprogramm mit der {@link #main(String[])} Methode
* in welchemn verschiedene {@link Storage} Instanzen
*
* @author t2m
*/
public class ProductionClockDEMO {
public static void main(String[] args) throws InterruptedException {
// {#10} Speicher anlegen.
Storage<IronOre> oreStorage = new Storage<>("ErzDepot");
Storage<Coal> coalStorage = new Storage<>("KohleDepot ");
Storage<Iron> ironStorage = new Storage<>("EisenDepot ");
Storage<Steel> steelStorage = new Storage<>("StahlDepot ");
// {#20} Vorab bereits etwas Eisenerz speichern.
for (int i = 0; i < 20; i++) {
oreStorage.add(new IronOre());
}
// {#30} Maschinen für die Produktion registrieren.
ProductionBus bus = new ProductionBus();
bus.register(new IronMachine(oreStorage, ironStorage));
bus.register(new CoalMachine(coalStorage));
bus.register(new SteelMachine(ironStorage, coalStorage, steelStorage));
// {#40} Einen Scheduler erzeugen für einen einzigen Thread.
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
long delay = (long) 0; // {#41}
long period = (long) 2; // {#42} Die Periode darf NICHT 0 sein.
System.out.println("main: scheduler.scheduleAtFixedRate(..)");
scheduler.scheduleAtFixedRate(bus::fireTick, delay, period, TimeUnit.SECONDS); // {#43}
System.out.println("main: Thread.sleep(..)");
Thread.sleep(30_000);
System.out.println("main: scheduler.shutdownNow(..)");
scheduler.shutdownNow();
System.out.println("main: Produktion beendet.");
}
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
public class Steel extends AbstractMaterial {
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
/**
* @author t2m
*/
public class SteelMachine extends AbstractMachine {
private final Storage<Iron> ironInput;
private final Storage<Coal> coalInput;
private final Storage<Steel> output;
public SteelMachine(Storage<Iron> ironInput, Storage<Coal> coalInput, Storage<Steel> output) {
this.ironInput = ironInput;
this.coalInput = coalInput;
this.output = output;
}
protected void beginWarmUp() {
System.out.println("[SteelMachine] WarmUp");
phase++;
}
protected void produce() {
Iron iron = ironInput.take();
Coal coal = coalInput.take();
System.out.printf("[SteelMachine] > Steel from %s + %s%n", iron.getName(), coal.getName());
output.add(new Steel());
phase++;
}
protected void cooldown() {
System.out.println("[SteelMachine] Cooldown");
phase = 0;
}
}
Code
package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;
import java.util.*;
/**
* Eine Klasse welche zur Speicherung von
* Objekten in einer {@link ArrayList}
* als {@link List} vom Typ
*
* @author t2m
*
* @param <T>
*/
public class Storage<T extends Material> {
private final List<T> pool = new ArrayList<>();
private final String name;
/**
* Konstruktor
* @param name Name des Speichers.
*/
public Storage(String name) {
this.name = name;
}
/**
* Synchronsiertes Hinzufügen von Items.
* @param item
*/
public synchronized void add(T item) {
pool.add(item); /* {#30} */
notifyAll(); /* {#31} */
System.out.printf("[%s] Eingelagert: %s (%d)%n", name, item.getName(), pool.size());
}
public synchronized T take() {
while (pool.isEmpty()) {
try {
wait();
} catch (InterruptedException ignored) {
}
}
return pool.remove(0);
}
public String getName() {
return name;
}
public synchronized int size() {
return pool.size();
}
}
Code
[ErzDepot] Eingelagert: IronOre (1)
[ErzDepot] Eingelagert: IronOre (2)
[ErzDepot] Eingelagert: IronOre (3)
[ErzDepot] Eingelagert: IronOre (4)
[ErzDepot] Eingelagert: IronOre (5)
[ErzDepot] Eingelagert: IronOre (6)
[ErzDepot] Eingelagert: IronOre (7)
[ErzDepot] Eingelagert: IronOre (8)
[ErzDepot] Eingelagert: IronOre (9)
[ErzDepot] Eingelagert: IronOre (10)
[ErzDepot] Eingelagert: IronOre (11)
[ErzDepot] Eingelagert: IronOre (12)
[ErzDepot] Eingelagert: IronOre (13)
[ErzDepot] Eingelagert: IronOre (14)
[ErzDepot] Eingelagert: IronOre (15)
[ErzDepot] Eingelagert: IronOre (16)
[ErzDepot] Eingelagert: IronOre (17)
[ErzDepot] Eingelagert: IronOre (18)
[ErzDepot] Eingelagert: IronOre (19)
[ErzDepot] Eingelagert: IronOre (20)
main: scheduler.scheduleAtFixedRate(..)
main: Thread.sleep(..)
Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp
Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (1)
Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown
Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp
Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (2)
Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown
Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp
Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (3)
Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown
Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp
Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (4)
Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown
Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp
Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (5)
Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown
main: scheduler.shutdownNow(..)
main: Produktion beendet.
FootNotes, Keywords, Tags
java/TimeUnit, java/Enumerationskonstante
Hinweise, Rechte, Marken
UIO3 Es ist einfacher als Du denkst.
Stelle noch heute Deine Anfrage.
