- TPX | Begriffsbestimmung
- TPX | Examples
- TPX | FixedSizeExecutorExample #c20
- TPX | CachedExecutorExample #c21
- TPX | ScheduledExecutorExample #c22
- TPX | CustomPolicyRejectedExecutorExample #c24
- TPX | Custom-Executor-Components
- TPX | PrioritizedCombinerRunnableComponent #c30
- TPX | BufferingRejectedTasksExample #c31
- TPX | FAQ
JAVA-B4
Parallel Computing (Parallelism)
Parallelism
Multithreading
Klassische Threads
Threads & Runnables
Overview
Thread (Begriff)
DruckThread
DruckRunnable
ThreadDemo
Output
Thread Lifecycle
Overview
start()
run()
join()
stop()?
StartRunJoinDemo Example
Von allein endende Threads
Threads mit while(criteria)
Daemon-Threads
Overview
ThreadDaemonDemo
Daemon (Summary)
Thread-Priority
Overview
t.setPriority(..)
MachineThread Klasse (Example)
ThreadPriorityDemo (Example)
Thread Notification
Grundlagen
wait
notify
notifyAll
sleep
sleep vs. wait
Thread Notification Demo
sychronized(monitor)
Notification/Monitor Zusammenfassung
Concurrent API
Vom Thread zur Concurrent API
Was ist Concurrency?
Was ist Concurrency?
Problemstellungen bei herkömmlichen Threads
Features der Concurrent API
Warum: Bedeutung des Umstiegs
Themenüberblick
Technischer Hintergrund
Thread.sleep(0, 500_000)
System.nanoTime()
Executors (Übersicht) #310
ExecutorService (Page)
Begriffsbestimmung
Excetutor
.execute()
ExecutorService (Begriff)
ExecutorService Interface Methoden
Mehrere Callables und Runnables auf einmal starten
Kontrolliertes Beenden
Executors-Utilily Klasse
newCachedThreadPool()
newSingleThreadExecutor()
newFixedThreadPool(int nThreads)
newWorkStealingPool()
newScheduledThreadPool(int corePoolSize)
Beispiel
ForkJoinPool, RecursiveTask, Executors Example#09
A | B1,B2,B3 | C mit Summenbildung.
Executor vs. ExecutorService Example#10
Executors.newFixedThreadPool(2)
Beispiel
Zusammenfassung Executors
Callable, Future #330
Callable Future (Begriff)
Callable Interface (Begriff)
Begriff
Callable erzeugen
Callable verwenden
Future
Beispiel
Future-Listen mit invokeAll
Was sind Future-Listen?
Wie erzeuge ich Future-Listen?
Wann blockt eine Future-Liste?
Beispiel mit 12 Callables in 4 Gruppen
Executors (#350) @! Bis auf Hinweis doppelt
Overview
Executor (Begriff)
ExecutorService (Begriff)
Executors.new~ Factory Methoden
Executors für ThreadPoolExecutor mit fixed Pool
ScheduledExecutorService
ScheduledExecutorService
Superklasse ExecutorService
schedule(*)
scheduleAtFixedRate(*)
scheduleWithFixedDelay(*)
ScheduledExecutorService (xmpl)
ProductClock(Main, Bsp.)
AbstractMachine**
AbstractMaterial**
Coal/Iron/Steel**
Coal-/Iron-/Steel-Machine**
Material**
ProductionBus-Machine**
Storage T**
ThreadPoolExecutor (Detail)
Executor Implementierung
ThreadPoolExecutor (Term)
ThreadPoolExecutor Beispiele
"FixedSizeExecutorExample" EXAMPLE C20
"CachedExecutorExample" EXAMPLE C21
SynchronousQueue
"ScheduledExecutorExample" EXAMPLE C22
"CustomSingleThreadExecutorRunnableComponent" EXAMPLE C23
"CustomPolicyRejectedExecutorExample" EXAMPLE C24
"PrioritizedCombinerRunnableComponent" EXAMPLE C30
"BufferingRejectedTasksExample" EXAMPLE C31
Java RejectedExecutionHandler
ThreadPoolExecutor FAQ
Synchronizer
Synchronizer (overview)
Synchronizer Begriff
CountDownLatch
CountDownLatch Example
CyclicBarrier
CyclicBarrier Example (@Zeit ausgeben!)
Phaser
Phaser Example(@Erl!)
Exchanger
Exchanger ping/pong
Semaphore
CompletableFuture
async!!!
SemaphoreCompletableFutureDemo (@!?)
Monitoring
JMX
ThreadPoolExecutor "live" überwachen
getActiveCount()
getPoolSize()
getCompletedTaskCount()
getQueue().size()
Concurrency
Example:DeterministicCopy @todo FILE LINK FEHLT!
Example:NonDeterministicCopy @todo FILE LINK FEHLT!
Kapitel java.util.concurrent.Flow @!fehltReactive Flow @!fehltPublisher TSubscriber TSubscription TProcressor TProcressor T
ThreadPoolExecutor
TPX | Begriffsbestimmung
Implementierungsklasse für Executor
Die Klasse ThreadPoolExecutor ist die zentrale Implementierungsklasse für die Interfaces Executor, ExecutorService und ScheduledExecutorService und zugleich die Basisklasse für den ScheduledThreadPoolExecutor.
Ein ThreadPoolExecutor ermöglicht Veränderung der Thread-Anzahl
Die Klasse ThreadPoolExecutor ist die zentrale Implementierungsklasse für die Interfaces Executor, ExecutorService und ScheduledExecutorService und zugleich die Basisklasse für den ScheduledThreadPoolExecutor.
Ein ThreadPoolExecutor ist ein Objekt welches die Größe des Thread-Pools zu Beginn beim Start und die maximale Größe insgesamt definiert.
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
pool.setCorePoolSize(10); // dynamisch erhöhen
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, // corePoolSize
20, // maximumPoolSize
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
);
TPX | Examples
TPX | Examples
In den nachfolgenden Beispielen spielen wir eine Reihe typischer Anwendungsfälle, Problemstellungen und Lösungsansätze durch.
Der ThreadPoolExecutor ist die Implementierungsklasse für
das Executor-Interface sowie ExecutorService-Interface. Immer
dann, wenn man sich mit Executors.*
und einer der Methoden Executor-Instanzen erzeugt, ist das
eigentliche Objekt zumeist eine ThreadPoolExecutor Instanz.
TPX | FixedSizeExecutorExample #c20
FixedSizeExecutorExample
In diesem Beispiel definieren wir einen ThreadPoolExecutor mit einer festen Pool-Größe von 2.
Mit new ThreadPoolExecutor(2,2,..) wird
im Konstruktor die initiale Größe des Thread-Pools sowie die maximale
Größe des Thread-Pools bestimmt.
Code
package com.stuelken.java.b4.parallel.c20.fixedsizeexecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//Frage: Warum endes Programm nicht?
/**
* Diese Klasse beinhaltet eine {@link #run()} Methode und lässt sich als
* {@link Runnable} deshalb nebenläufig als beispielsweise {@link Thread}
* starten.
*
* Use Case: stabile Auslastung, kein dynamisches Wachstum.
*
* @author t2m
*
*/
public class FixedSizeExecutorExample implements Runnable {
/**
* Nummer der Gruppe
*/
private final int gruppe;
public FixedSizeExecutorExample(int i) {
this.gruppe = i;
}
/**
*
*/
@Override
public void run() {
ExecutorService pool = new ThreadPoolExecutor(
2, 2, //
0L, TimeUnit.MILLISECONDS, //
new LinkedBlockingQueue<Runnable>()); //
for (int i = 0; i < 10; i++) { //
final int id = i; //
pool.submit(
// Neuen Task "i" dem Pool hinzufügen und auch zur Ausführung sofort freigebeen.
() -> {
System.out.println("Fixed Task (gruppe: " + this.gruppe + " id: " + id + " Beginn");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}); //
System.out.println("Fixed Task (gruppe: " + this.gruppe + " id: " + id + " Ende");
} //
pool.shutdown(); //
}
/**
*
* #2: xeq.execute: Führt den gegebenen Befehl in einem Zeitpunkt in der Zukunft aus.
* Das Kommando kann in einem neuen Thread, in einem Pooled Thread, im Calling Thread
* ausgeführt werden. Das hängt von der Implementierung des {@link Executor} ab.
*
*
* @param args
*/
public static void main(String[] args) {
Executor xeq = Executors.newSingleThreadExecutor();
System.out.println("main: new FixedSizeExecutorExample() Nr. 1");
xeq.execute(new FixedSizeExecutorExample(1)); // {#2}
System.out.println("main: new FixedSizeExecutorExample() Nr. 2");
xeq.execute(new FixedSizeExecutorExample(2));
System.out.println("main: shutdown");
((ExecutorService) xeq).shutdown();
}
}
//@formatter:off
/*
Fixed Task 0 Ende
Fixed Task 1 Ende
Fixed Task 2 Ende
Fixed Task 0 Beginn
Fixed Task 1 Beginn
Fixed Task 3 Ende
Fixed Task 4 Ende
Fixed Task 5 Ende
Fixed Task 6 Ende
Fixed Task 7 Ende
Fixed Task 8 Ende
Fixed Task 9 Ende
Fixed Task 2 Beginn
Fixed Task 3 Beginn
Fixed Task 4 Beginn
Fixed Task 5 Beginn
Fixed Task 6 Beginn
Fixed Task 7 Beginn
Fixed Task 8 Beginn
Fixed Task 9 Beginn
*/
Wichtige Erläuterungen
.execute(..)
Die Methode .execute(..) funktioniert ähnlich wie .submit(..): Die zugehörigen Tasks werden zuerst einmal erfasst und der Befehl gegeben, diese Tasks auch auszuführen. Die eigentlche Ausführungszeit aber liegt in der Zukunft.
Das zweifache Aufrufen von .execute(..) bewirkt, dass 2x jeweils 10 Tasks in zwei Gruppen ergänzt werden.
Konstruktor der Custom-Klasse mit Gruppennummer
Das Beispiel zeigt, dass wir unserer Gruppe von Tasks über die Kapselung in einem Objekt unserer Klasse auch eine Gruppennummer zuteilen können.
Obwohl die zweite Gruppe aber später hinzugefügt wird, ist dem Executor das egal: Er erfasst schlichtweg Tasks und sucht sich für jede Ausführung einen aus.
Executors.newSingleThreadExecutor()
Executors.newSingleThreadExecutor()
erzeugt einen Executor welcher nur EINEN EINZIGEN WORKER THREAD nutzt,
auch wenn die zugehörige Worker-Queue unbegrenzt ist.
Wir können also durchaus dem Executor 10 Tasks (z. B. als Runnable) als funktionalen Ausdruck zuweisen, welche auch alle im Pool erfasst werden. Es wird aber stets nur 1 einziger Thread ausgeführt.
(Anmerkung: Wenn dieser Single Thread wegen einer Exception oder aus anderen Gründen vorzeitig beendet wird, bevor der Excecutor über den ExecutorService mit shutdown() heruntergefahren wird, wird ein neuer und damit quasi zweiter Thread als Ersatz erzeugt.)
Es wird garantiert, dass alle Tasks sequentiell ausgeführt werden. Es ist also immer nur maximal 1 Thread aktiv zur Zeit und niemals 2 oder mehr.
Die Reihenfolge, in welcher dieser "SingleThreadExecutor" die Tasks dann auswählt, ist zufällig.
Im Gegensatz zu newFixedThreadPool(1),
wo zu Beginn auch erst einmal nur 1 einziger Thread vom Executor
verwaltet wird, ist es bei newFixedThreadPool(1)
möglich, die Anzahl der Threads nachträglich zu erhöhen.
Executors.newSingleThreadExecutor() erlaubt KEINE Erhöhung der Anzahl der Threads.
((ExecutorService) xeq).shutdown();
Das Executor-Interface bietet KEINE shutdown() Methode. Da die Implementierungsklasse ThreadPoolExecutor aber diese Methode durchaus besitzt, benötigen wir nur das andere Interface.
Ausgabe
//@formatter:off
/*
Fixed Task 0 Ende
Fixed Task 1 Ende
Fixed Task 2 Ende
Fixed Task 0 Beginn
Fixed Task 1 Beginn
Fixed Task 3 Ende
Fixed Task 4 Ende
Fixed Task 5 Ende
Fixed Task 6 Ende
Fixed Task 7 Ende
Fixed Task 8 Ende
Fixed Task 9 Ende
Fixed Task 2 Beginn
Fixed Task 3 Beginn
Fixed Task 4 Beginn
Fixed Task 5 Beginn
Fixed Task 6 Beginn
Fixed Task 7 Beginn
Fixed Task 8 Beginn
Fixed Task 9 Beginn
*/
CachedExecutorExample
TPX | CachedExecutorExample #c21
data
Code
package com.stuelken.java.b4.parallel.c21.cachedexecutorsyncqueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// Frage: Warum endes Programm nicht?
/**
* Der {@link Executor} in diesem Beispiel verwaltet seine
* Threads über den {@link ThreadPoolExecutor}
* einer {@link SynchronousQueue}.
*
* @author t2m
*/
public class CachedExecutorExample implements Runnable {
@Override
public void run() {
ExecutorService pool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>()
);
for (int i = 0; i < 10; i++) {
final int id=i;
pool.submit(() -> System.out.println("Cached Task Nr. "+id));
}
System.out.println("run: pool.shutdown()");
pool.shutdown();
System.out.println("run: Done.");
}
public static void main(String[] args) {
Executor xeq = Executors.newSingleThreadExecutor();
xeq.execute(new CachedExecutorExample());
System.out.println("main: Done.");
}
}
Erläuterung
SynchronousQueue
Die SynchronousQueue ist eine BlockingQueue und damit eine blockende, Thread-sichere Queue. Diese Queue ist aber in jeder Hinsicht als andere Queues.
Sie hat eigentlich keine Elemente bzw. es gibt nur dann kurz mal ein Element, wenn jemand ein Element anbietet und zugleich jemand anders es abholt.
Das ist das Handschlag- oder Handshake-Prinzip.
Ausgabe
//@formatter:off
/*
main: Done.
Cached Task Nr. 0
run: pool.shutdown()
Cached Task Nr. 1
run: Done.
Cached Task Nr. 2
Cached Task Nr. 4
Cached Task Nr. 6
Cached Task Nr. 7
Cached Task Nr. 3
Cached Task Nr. 8
Cached Task Nr. 9
Cached Task Nr. 5
*/
TPX | ScheduledExecutorExample #c22
C
data
Code
package com.stuelken.java.b4.parallel.c22.singethreadexecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
* @author t2m
*
*/
public class ScheduledExecutorExample implements Runnable {
/**
* Für ein Zählen der Zugriffe wird ein Objekt, kein int-Wert benötigt.
*
* @author t2m
*
*/
public static class CounterContainer {
public int counter = 0;
}
/**
* Statische Eigenschaft mit einem Objekt welches als Monitor und Zähler genutzt werden kann.
*/
public static CounterContainer counter = new CounterContainer();
/**
* Unseren {@link ScheduledExecutorExample} {@link Runnable} ausführen.
*/
@Override
public void run() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(
() -> {
int count;
synchronized(ScheduledExecutorExample.counter) {
count = ScheduledExecutorExample.counter.counter++;
}
System.out.println("Scheduled Task, counter: " + count);
},
0, 2, TimeUnit.SECONDS);
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
}
scheduler.shutdown();
}
/**
* Demo
* @param args
*/
public static void main(String[] args) {
Executor xeq = Executors.newSingleThreadExecutor();
xeq.execute(new ScheduledExecutorExample());
System.out.println("main: Done.");
}
}
Ausgabe
//@formatter:off
/*
main: Done.
Scheduled Task, counter: 0
*/
TPX | CustomSingleThreadExecutorRunnableComponent
C
Diese {@link Runnable} artige Komponente erzeugt sich 100 kleine Tasks, erzeugt einen newSingleThreadExecutor und lässt diese dann nach und nach über eine Schleife über pool.submit(..) hinzufügen und sogleich ausführen.
Dieses Beispiel kann die Basisklasse für eine Vielzahl von Klassen sein.
Ein GameStepExecutor spezialisiert auf Spielphasen, TurnEngine Ausführungsmotor für Züge, GameRunner Wrapper für Executor + Datenmodell, CustomPlayLoop betont den eigenen Regelablauf, AsyncGameCore für asynchronen Ablauf.
1. Entkopplung von Task-Erzeugung und -Ausführung
Die eigentliche Logik (100 Tasks) ist separiert vom Ausführungsmechanismus.
Das erlaubt später leichtes Umstellen auf andere Executor-Modelle (z. B. CachedThreadPool, ScheduledExecutor).
2. Sicherer Zugriff auf gemeinsam genutzte Ressourcen
Durch SingleThread-Ausführung gibt es keine Race Conditions. Ideal für Logging, Zugriff auf UI-Komponenten oder Dateioperationen.
3. Animationssystem oder Simulation
Jedes Task ist wie ein "Frame" in der Animation oder ein "Schritt" in einer Simulation.
Denkbar etwa für einfache Game-Loop-Simulationen, animierte Visualisierungen oder Timelines.
4. Test- und Diagnostik-Komponenten
Sinnvoll für Load-Tests mit künstlicher Verzögerung (Thread.sleep()). So könnte diese Component beispielsweise alle t-Sekunden auf eine Ressource zugreifen.
Code
package com.stuelken.java.b4.parallel.c23.scheduledexecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Diese {@link Runnable} artige Komponente erzeugt sich 100 kleine Tasks,
* erzeugt einen newSingleThreadExecutor und lässt diese dann nach und nach über
* eine Schleife über pool.submit(..) hinzufügen und sogleich ausführen.
* @author t2m
*
*/
public class CustomSingleThreadExecutorRunnableComponent implements Runnable {
@Override
public void run() {
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
final int id = i;
pool.submit(() -> {
System.out.println("Single Thread Task " + id + " Start");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Single Thread Task " + id + " Ende");
;
});
}
pool.shutdown();
}
public static void main(String[] args) throws InterruptedException {
ExecutorService xeq = (ExecutorService) Executors.newSingleThreadExecutor();
xeq.execute(new CustomSingleThreadExecutorRunnableComponent());
ExecutorService exs = xeq;
Thread.sleep(5);
exs.shutdown();
exs.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("main: Done.");
}
}
Ausgabe
//@formatter:off
/*
main: Done.
Single Thread Task 0 Start
Single Thread Task 0 Ende
Single Thread Task 1 Start
Single Thread Task 1 Ende
Single Thread Task 2 Start
Single Thread Task 2 Ende
Single Thread Task 3 Start
Single Thread Task 3 Ende
Single Thread Task 4 Start
Single Thread Task 4 Ende
Single Thread Task 5 Start
Single Thread Task 5 Ende
Single Thread Task 6 Start
Single Thread Task 6 Ende
Single Thread Task 7 Start
Single Thread Task 7 Ende
Single Thread Task 8 Start
Single Thread Task 8 Ende
Single Thread Task 9 Start
Single Thread Task 9 Ende
Single Thread Task 10 Start
Single Thread Task 10 Ende
Single Thread Task 11 Start
Single Thread Task 11 Ende
Single Thread Task 12 Start
Single Thread Task 12 Ende
Single Thread Task 13 Start
Single Thread Task 13 Ende
Single Thread Task 14 Start
Single Thread Task 14 Ende
Single Thread Task 15 Start
Single Thread Task 15 Ende
Single Thread Task 16 Start
Single Thread Task 16 Ende
Single Thread Task 17 Start
Single Thread Task 17 Ende
Single Thread Task 18 Start
Single Thread Task 18 Ende
Single Thread Task 19 Start
Single Thread Task 19 Ende
Single Thread Task 20 Start
Single Thread Task 20 Ende
Single Thread Task 21 Start
Single Thread Task 21 Ende
Single Thread Task 22 Start
Single Thread Task 22 Ende
Single Thread Task 23 Start
Single Thread Task 23 Ende
Single Thread Task 24 Start
Single Thread Task 24 Ende
Single Thread Task 25 Start
Single Thread Task 25 Ende
Single Thread Task 26 Start
Single Thread Task 26 Ende
Single Thread Task 27 Start
Single Thread Task 27 Ende
Single Thread Task 28 Start
Single Thread Task 28 Ende
Single Thread Task 29 Start
Single Thread Task 29 Ende
Single Thread Task 30 Start
Single Thread Task 30 Ende
Single Thread Task 31 Start
Single Thread Task 31 Ende
Single Thread Task 32 Start
Single Thread Task 32 Ende
Single Thread Task 33 Start
Single Thread Task 33 Ende
Single Thread Task 34 Start
Single Thread Task 34 Ende
Single Thread Task 35 Start
Single Thread Task 35 Ende
Single Thread Task 36 Start
Single Thread Task 36 Ende
Single Thread Task 37 Start
Single Thread Task 37 Ende
Single Thread Task 38 Start
Single Thread Task 38 Ende
Single Thread Task 39 Start
Single Thread Task 39 Ende
Single Thread Task 40 Start
Single Thread Task 40 Ende
Single Thread Task 41 Start
Single Thread Task 41 Ende
Single Thread Task 42 Start
Single Thread Task 42 Ende
Single Thread Task 43 Start
Single Thread Task 43 Ende
Single Thread Task 44 Start
Single Thread Task 44 Ende
Single Thread Task 45 Start
Single Thread Task 45 Ende
Single Thread Task 46 Start
Single Thread Task 46 Ende
Single Thread Task 47 Start
Single Thread Task 47 Ende
Single Thread Task 48 Start
Single Thread Task 48 Ende
Single Thread Task 49 Start
Single Thread Task 49 Ende
Single Thread Task 50 Start
Single Thread Task 50 Ende
Single Thread Task 51 Start
Single Thread Task 51 Ende
Single Thread Task 52 Start
Single Thread Task 52 Ende
Single Thread Task 53 Start
Single Thread Task 53 Ende
Single Thread Task 54 Start
Single Thread Task 54 Ende
Single Thread Task 55 Start
Single Thread Task 55 Ende
Single Thread Task 56 Start
Single Thread Task 56 Ende
Single Thread Task 57 Start
Single Thread Task 57 Ende
Single Thread Task 58 Start
Single Thread Task 58 Ende
Single Thread Task 59 Start
Single Thread Task 59 Ende
Single Thread Task 60 Start
Single Thread Task 60 Ende
Single Thread Task 61 Start
Single Thread Task 61 Ende
Single Thread Task 62 Start
Single Thread Task 62 Ende
Single Thread Task 63 Start
Single Thread Task 63 Ende
Single Thread Task 64 Start
Single Thread Task 64 Ende
Single Thread Task 65 Start
Single Thread Task 65 Ende
Single Thread Task 66 Start
Single Thread Task 66 Ende
Single Thread Task 67 Start
Single Thread Task 67 Ende
Single Thread Task 68 Start
Single Thread Task 68 Ende
Single Thread Task 69 Start
Single Thread Task 69 Ende
Single Thread Task 70 Start
Single Thread Task 70 Ende
Single Thread Task 71 Start
Single Thread Task 71 Ende
Single Thread Task 72 Start
Single Thread Task 72 Ende
Single Thread Task 73 Start
Single Thread Task 73 Ende
Single Thread Task 74 Start
Single Thread Task 74 Ende
Single Thread Task 75 Start
Single Thread Task 75 Ende
Single Thread Task 76 Start
Single Thread Task 76 Ende
Single Thread Task 77 Start
Single Thread Task 77 Ende
Single Thread Task 78 Start
Single Thread Task 78 Ende
Single Thread Task 79 Start
Single Thread Task 79 Ende
Single Thread Task 80 Start
Single Thread Task 80 Ende
Single Thread Task 81 Start
Single Thread Task 81 Ende
Single Thread Task 82 Start
Single Thread Task 82 Ende
Single Thread Task 83 Start
Single Thread Task 83 Ende
Single Thread Task 84 Start
Single Thread Task 84 Ende
Single Thread Task 85 Start
Single Thread Task 85 Ende
Single Thread Task 86 Start
Single Thread Task 86 Ende
Single Thread Task 87 Start
Single Thread Task 87 Ende
Single Thread Task 88 Start
Single Thread Task 88 Ende
Single Thread Task 89 Start
Single Thread Task 89 Ende
Single Thread Task 90 Start
Single Thread Task 90 Ende
Single Thread Task 91 Start
Single Thread Task 91 Ende
Single Thread Task 92 Start
Single Thread Task 92 Ende
Single Thread Task 93 Start
Single Thread Task 93 Ende
Single Thread Task 94 Start
Single Thread Task 94 Ende
Single Thread Task 95 Start
Single Thread Task 95 Ende
Single Thread Task 96 Start
Single Thread Task 96 Ende
Single Thread Task 97 Start
Single Thread Task 97 Ende
Single Thread Task 98 Start
Single Thread Task 98 Ende
Single Thread Task 99 Start
Single Thread Task 99 Ende
*/
Rejecting Tasks
TPX | CustomPolicyRejectedExecutorExample #c24
Executor mit RejectExecutionHandler: Wenn ein Task abgewiesen wurde, so kann man auf diesen mit einem eigenen Rejector reagieren. Man kann aber nicht nur abgewiesene Tasks auf der Konsole ausgeben sondern diese auch mit einem Retry-Loop erneut starten. Task-Buffer-Techniken bei einem Executor im Händler-Paramater im Konstruktor realisierbar.
(r,executor)->System.out.println("Task rejected: " + r);
Die abgewiesen Tasks befinden sich in einer ConcurrentLinkedQueue und damit einer Thread-sicheren
Queue welche sich die Reihenfolge der Elemente gemerkt hat. Über eine Schleife kann man dann jeden abgewiesenen Task
einfach mit .submit(meinTask) wieder dem Executor hinzufügen, damit dieser dann
den wiederum ablehnen darf. Ein späteres Beispiel #c31 zeigt, wie es geht.
Executor met RejectExecutionHandler: Als een taak is afgewezen, kun je hierop reageren met een eigen (r,executor)->System.out.println(“Task rejected: ” + r);.
Je kunt echter niet alleen afgewezen taken op de console weergeven, maar deze ook opnieuw starten met een retry-loop. Je kunt echter niet alleen afgewezen taken op de console weergeven, maar deze ook opnieuw starten met een retry-loop.
Code
package com.stuelken.java.b4.parallel.c24.custompolicyexecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Ein Excecutor für sensible Systeme mit Fehlertoleranz. Dieser
* {@link CustomPolicyRejectedExecutorExample} hat einen
* {@link RejectedExecutionHandler} für den Fall, dass der Executor aktuell
* keine Kapazitäten mehr frei hat.
*
* @author t2m
*/
public class CustomPolicyRejectedExecutorExample implements Runnable {
@Override
public void run() {
/**
*
*/
RejectedExecutionHandler handler = //
(r, executor) -> System.out.println("Task rejected: " + r.toString() //
); //
ExecutorService pool = new ThreadPoolExecutor( //
1, 1, // sizeBegin, sizeMax
0L, TimeUnit.MILLISECONDS, // aliveTime
new ArrayBlockingQueue<>(2), //
// handler the handler to use when execution is blocked because the thread
// bounds and queue capacities are reached
handler);
for (int i = 0; i < 10; i++) {
final int id = i;
System.out.println("main: submit Task " + i);
pool.submit(() -> System.out.println("Custom Task " + id));
}
pool.shutdown();
}
/**
* @param args Keine.
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
Executor xeq = Executors.newSingleThreadExecutor();
xeq.execute(new CustomPolicyRejectedExecutorExample());
// Auch wenn CustomPolicyExecutorExample.run() beendet wurde endet das Programm
// nicht.
// xeq hat KEIN shutdown sondern NUR execute().
// Lösung
ExecutorService exs = (ExecutorService) xeq;
// Beende den äußeren Executor.
exs.shutdown();
// Sicherstellen, dass alle Tasks abgeschlossen sind, bevor das Programm beendet wird.
exs.awaitTermination(5, TimeUnit.SECONDS);
// Wie beendet man den Executor?
// Warum hört das Programm nicht auf?
System.out.println("main: Done.");
}
}
Ausgabe
// @formatter:off
/*
main: submit Task 0
main: submit Task 1
main: submit Task 2
main: submit Task 3
Task rejected: java.util.concurrent.FutureTask@298c700b[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@72d51680[Wrapped task = com.stuelken.java.b4.parallel.c24.custompolicyexecutor.CustomPolicyRejectedExecutorExample$$Lambda$2/0x0000000100000c18@7cc94cf5]]
main: submit Task 4
Task rejected: java.util.concurrent.FutureTask@1b100a02[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@77b33cc2[Wrapped task = com.stuelken.java.b4.parallel.c24.custompolicyexecutor.CustomPolicyRejectedExecutorExample$$Lambda$2/0x0000000100000c18@2d8855a2]]
main: submit Task 5
Task rejected: java.util.concurrent.FutureTask@50a7298d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@51563d43[Wrapped task = com.stuelken.java.b4.parallel.c24.custompolicyexecutor.CustomPolicyRejectedExecutorExample$$Lambda$2/0x0000000100000c18@3f6d5e87]]
main: submit Task 6
Task rejected: java.util.concurrent.FutureTask@7e6c131d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4bfc7981[Wrapped task = com.stuelken.java.b4.parallel.c24.custompolicyexecutor.CustomPolicyRejectedExecutorExample$$Lambda$2/0x0000000100000c18@2b20e998]]
main: submit Task 7
Task rejected: java.util.concurrent.FutureTask@1336b74e[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@17eea61c[Wrapped task = com.stuelken.java.b4.parallel.c24.custompolicyexecutor.CustomPolicyRejectedExecutorExample$$Lambda$2/0x0000000100000c18@3f76f511]]
main: submit Task 8
Task rejected: java.util.concurrent.FutureTask@67369dbf[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@d5cbace[Wrapped task = com.stuelken.java.b4.parallel.c24.custompolicyexecutor.CustomPolicyRejectedExecutorExample$$Lambda$2/0x0000000100000c18@6f60333f]]
main: submit Task 9
Task rejected: java.util.concurrent.FutureTask@4769b53c[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@411ccad0[Wrapped task = com.stuelken.java.b4.parallel.c24.custompolicyexecutor.CustomPolicyRejectedExecutorExample$$Lambda$2/0x0000000100000c18@29fe82f7]]
main: Done.
Custom Task 0
Custom Task 1
Custom Task 2
*/
TPX | PrioritizedCombinerRunnableComponent*
TPX | PrioritizedCombinerRunnableComponent*
Auch wenn java.util.concurrent eine Vielzahl von Möglichkeiten bereitstellt, ist es durchaus üblich und möglich, dass man sich basierend auf diesen Executor-Interfaces praktische Components baut, die man wiederverwenden kann.
Die PrioritizedCombinerRunnableComponent*
Klasse ist so ein Beispiel. Wir zeigen, wie man eine Komponente
erschafft, welche als Runnable Daten aus zwei BlockingQueue-Instanzen
lesen könnte.
Die eine BlockingQueue hat Priorität, dh. die Komponente läd immer den nächsten Wert zuerst aus der Queue mit der hohen Priorität; erst wenn dort kein Wert ist, nutzt die Komponente die andere Queue.
Hinweis: Eine BlockingQueue basierend auf ArrayBlockingQueue ist eine Java Collection aus java.util.concurrent.BlockingQueue. Sie unterstützt nebenläufige Programmierung und ist Thread-sicher.
Vgl. hierzu die Einführung in Collection-Interfaces und Collection-Klassen.
Component / Priorität
TPX | PrioritizedCombinerRunnableComponent #c30
In diesem zeigen, wie man eine eigene Java-Klasse
als Komponente programmiert, um die Reihenfolge
von Tasks basiererend auf einer Priorität
zu steuern. Wir haben 2 BlockingQueue T
Instanzen: Eine als Priorität «high» und eine
«low».
Unser eigener "CUSTOM" Executor stellt sicher, dass immer die BlockingQueue high zuerst dran ist, wenn diese einen Wert hat, erst wenn nicht wird die low-Quecke abgearbeitet.
WICHTIG: Eine
BlockingQueue ist ein Interfaces
für Thread-sicherer, blockende Collections aus dem
java.util.concurrent Package.
Diese Collection blockt beim Hinzufügen (add) und beim Entnehmen (take) von Werten und macht das Arbeiten nebenläufige Arbeiten mit Queues von Werten sehr einfach.
Code
package com.stuelken.java.b4.parallel.c30.prioritizedcombiner;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* Diese Klasse ist eine Runnable-artige Component. Sie liefert mit jedem Aufruf
* immer genau 1 Wert, und zwar zuerst immer aus der highPriority-BlockingQueue
* und wenn die alle ist aus der lowPriority-Queue.
*
* @author t2m
*
*/
class PrioritizedCombinerRunnableComponent implements Callable<String> {
private final BlockingQueue<String> highPriority;
private final BlockingQueue<String> lowPriority;
public PrioritizedCombinerRunnableComponent(
BlockingQueue<String> high, // Input {#1}
BlockingQueue<String> low // Input {#2}
) {
this.highPriority = high;
this.lowPriority = low;
}
/**
* Die {@link #call()} Methode eines {@link Callable}
*/
@Override
public String call() throws Exception {
String data = null;
if (!highPriority.isEmpty()) {
data = highPriority.take();
return "Prioritaet ist hoch : " + data;
}
if (!lowPriority.isEmpty()) {
data = lowPriority.take();
return "Prioritaet ist niedrig : " + data;
}
return "Leerlauf";
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
/*
* == Daten defininieren damit wir Daten haben.
*/
// Capacity, fair, Collection
ArrayList<String> listeA = new ArrayList<String>();
for (int i = 0; i < 50; i++) {
listeA.add("A" + String.format("A%03d", i)); // TODO: Bitte auf 3 Stellen formatieren.
}
ArrayList<String> listeB = new ArrayList<String>();
for (int i = 0; i < 50; i++) {
listeB.add("B" + String.format("B%03d", i)); // Auf 3 Stellen formatieren.
}
/*
* Zweie BlockingQueue-artige Collections definieren
*
* #20: BlockingQueue, fair=false: Zufällige Reihenfolge #21: BlockingQueue,
* fair=false: Erhalt der Reihenfolge
*/
BlockingQueue<String> bqA = new ArrayBlockingQueue<String>(listeA.size(), false, listeA); // {#20}}
BlockingQueue<String> bqB = new ArrayBlockingQueue<String>(listeB.size(), true, listeB); // {#21}}
/*
* Einen Executor beschaffen der jetzt unsere RunnableComponent ausführt.
*/
ExecutorService exs = Executors.newFixedThreadPool(2);
Future<String> future = exs.submit(new PrioritizedCombinerRunnableComponent(bqA, bqB));
String result = future.get();
System.out.println("Result = " + result);
System.out.println("00000000000000000000000000000");
Thread.sleep(1000);
for (int i = 0; i < 200; i++) {
Future<String> f = exs.submit(new PrioritizedCombinerRunnableComponent(bqA, bqB));
System.out.println("Result " + i + ": " + f.get());
}
exs.shutdown(); // Schließt den Pool.
exs.shutdownNow(); // versucht laufende Threads abzubrechen.
}
}
Ausgabe
// @formatter:off
/*
Result = Prioritaet ist hoch : AA000
00000000000000000000000000000
Result 0: Prioritaet ist hoch : AA001
Result 1: Prioritaet ist hoch : AA002
Result 2: Prioritaet ist hoch : AA003
Result 3: Prioritaet ist hoch : AA004
Result 4: Prioritaet ist hoch : AA005
Result 5: Prioritaet ist hoch : AA006
Result 6: Prioritaet ist hoch : AA007
Result 7: Prioritaet ist hoch : AA008
Result 8: Prioritaet ist hoch : AA009
Result 9: Prioritaet ist hoch : AA010
Result 10: Prioritaet ist hoch : AA011
Result 11: Prioritaet ist hoch : AA012
Result 12: Prioritaet ist hoch : AA013
Result 13: Prioritaet ist hoch : AA014
Result 14: Prioritaet ist hoch : AA015
Result 15: Prioritaet ist hoch : AA016
Result 16: Prioritaet ist hoch : AA017
Result 17: Prioritaet ist hoch : AA018
Result 18: Prioritaet ist hoch : AA019
Result 19: Prioritaet ist hoch : AA020
Result 20: Prioritaet ist hoch : AA021
Result 21: Prioritaet ist hoch : AA022
Result 22: Prioritaet ist hoch : AA023
Result 23: Prioritaet ist hoch : AA024
Result 24: Prioritaet ist hoch : AA025
Result 25: Prioritaet ist hoch : AA026
Result 26: Prioritaet ist hoch : AA027
Result 27: Prioritaet ist hoch : AA028
Result 28: Prioritaet ist hoch : AA029
Result 29: Prioritaet ist hoch : AA030
Result 30: Prioritaet ist hoch : AA031
Result 31: Prioritaet ist hoch : AA032
Result 32: Prioritaet ist hoch : AA033
Result 33: Prioritaet ist hoch : AA034
Result 34: Prioritaet ist hoch : AA035
Result 35: Prioritaet ist hoch : AA036
Result 36: Prioritaet ist hoch : AA037
Result 37: Prioritaet ist hoch : AA038
Result 38: Prioritaet ist hoch : AA039
Result 39: Prioritaet ist hoch : AA040
Result 40: Prioritaet ist hoch : AA041
Result 41: Prioritaet ist hoch : AA042
Result 42: Prioritaet ist hoch : AA043
Result 43: Prioritaet ist hoch : AA044
Result 44: Prioritaet ist hoch : AA045
Result 45: Prioritaet ist hoch : AA046
Result 46: Prioritaet ist hoch : AA047
Result 47: Prioritaet ist hoch : AA048
Result 48: Prioritaet ist hoch : AA049
Result 49: Prioritaet ist niedrig : BB000
Result 50: Prioritaet ist niedrig : BB001
Result 51: Prioritaet ist niedrig : BB002
Result 52: Prioritaet ist niedrig : BB003
Result 53: Prioritaet ist niedrig : BB004
Result 54: Prioritaet ist niedrig : BB005
Result 55: Prioritaet ist niedrig : BB006
Result 56: Prioritaet ist niedrig : BB007
Result 57: Prioritaet ist niedrig : BB008
Result 58: Prioritaet ist niedrig : BB009
Result 59: Prioritaet ist niedrig : BB010
Result 60: Prioritaet ist niedrig : BB011
Result 61: Prioritaet ist niedrig : BB012
Result 62: Prioritaet ist niedrig : BB013
Result 63: Prioritaet ist niedrig : BB014
Result 64: Prioritaet ist niedrig : BB015
Result 65: Prioritaet ist niedrig : BB016
Result 66: Prioritaet ist niedrig : BB017
Result 67: Prioritaet ist niedrig : BB018
Result 68: Prioritaet ist niedrig : BB019
Result 69: Prioritaet ist niedrig : BB020
Result 70: Prioritaet ist niedrig : BB021
Result 71: Prioritaet ist niedrig : BB022
Result 72: Prioritaet ist niedrig : BB023
Result 73: Prioritaet ist niedrig : BB024
Result 74: Prioritaet ist niedrig : BB025
Result 75: Prioritaet ist niedrig : BB026
Result 76: Prioritaet ist niedrig : BB027
Result 77: Prioritaet ist niedrig : BB028
Result 78: Prioritaet ist niedrig : BB029
Result 79: Prioritaet ist niedrig : BB030
Result 80: Prioritaet ist niedrig : BB031
Result 81: Prioritaet ist niedrig : BB032
Result 82: Prioritaet ist niedrig : BB033
Result 83: Prioritaet ist niedrig : BB034
Result 84: Prioritaet ist niedrig : BB035
Result 85: Prioritaet ist niedrig : BB036
Result 86: Prioritaet ist niedrig : BB037
Result 87: Prioritaet ist niedrig : BB038
Result 88: Prioritaet ist niedrig : BB039
Result 89: Prioritaet ist niedrig : BB040
Result 90: Prioritaet ist niedrig : BB041
Result 91: Prioritaet ist niedrig : BB042
Result 92: Prioritaet ist niedrig : BB043
Result 93: Prioritaet ist niedrig : BB044
Result 94: Prioritaet ist niedrig : BB045
Result 95: Prioritaet ist niedrig : BB046
Result 96: Prioritaet ist niedrig : BB047
Result 97: Prioritaet ist niedrig : BB048
Result 98: Prioritaet ist niedrig : BB049
Result 99: Leerlauf
Result 100: Leerlauf
Result 101: Leerlauf
Result 102: Leerlauf
Result 103: Leerlauf
Result 104: Leerlauf
Result 105: Leerlauf
Result 106: Leerlauf
Result 107: Leerlauf
Result 108: Leerlauf
Result 109: Leerlauf
Result 110: Leerlauf
Result 111: Leerlauf
Result 112: Leerlauf
Result 113: Leerlauf
Result 114: Leerlauf
Result 115: Leerlauf
Result 116: Leerlauf
Result 117: Leerlauf
Result 118: Leerlauf
Result 119: Leerlauf
Result 120: Leerlauf
Result 121: Leerlauf
Result 122: Leerlauf
Result 123: Leerlauf
Result 124: Leerlauf
Result 125: Leerlauf
Result 126: Leerlauf
Result 127: Leerlauf
Result 128: Leerlauf
Result 129: Leerlauf
Result 130: Leerlauf
Result 131: Leerlauf
Result 132: Leerlauf
Result 133: Leerlauf
Result 134: Leerlauf
Result 135: Leerlauf
Result 136: Leerlauf
Result 137: Leerlauf
Result 138: Leerlauf
Result 139: Leerlauf
Result 140: Leerlauf
Result 141: Leerlauf
Result 142: Leerlauf
Result 143: Leerlauf
Result 144: Leerlauf
Result 145: Leerlauf
Result 146: Leerlauf
Result 147: Leerlauf
Result 148: Leerlauf
Result 149: Leerlauf
Result 150: Leerlauf
Result 151: Leerlauf
Result 152: Leerlauf
Result 153: Leerlauf
Result 154: Leerlauf
Result 155: Leerlauf
Result 156: Leerlauf
Result 157: Leerlauf
Result 158: Leerlauf
Result 159: Leerlauf
Result 160: Leerlauf
Result 161: Leerlauf
Result 162: Leerlauf
Result 163: Leerlauf
Result 164: Leerlauf
Result 165: Leerlauf
Result 166: Leerlauf
Result 167: Leerlauf
Result 168: Leerlauf
Result 169: Leerlauf
Result 170: Leerlauf
Result 171: Leerlauf
Result 172: Leerlauf
Result 173: Leerlauf
Result 174: Leerlauf
Result 175: Leerlauf
Result 176: Leerlauf
Result 177: Leerlauf
Result 178: Leerlauf
Result 179: Leerlauf
Result 180: Leerlauf
Result 181: Leerlauf
Result 182: Leerlauf
Result 183: Leerlauf
Result 184: Leerlauf
Result 185: Leerlauf
Result 186: Leerlauf
Result 187: Leerlauf
Result 188: Leerlauf
Result 189: Leerlauf
Result 190: Leerlauf
Result 191: Leerlauf
Result 192: Leerlauf
Result 193: Leerlauf
Result 194: Leerlauf
Result 195: Leerlauf
Result 196: Leerlauf
Result 197: Leerlauf
Result 198: Leerlauf
Result 199: Leerlauf
*/
Re-run Rejected Tasks
TPX | BufferingRejectedTasksExample #c31
Abgelehnte Tasks kann man Puffern und später erneut den Start versuchen. In diesem Beispiel zeigen wir, wie es funktioniert. Man benötigt hierfür einen Handler mit wie RjectedExecutionHandler welcher über eine rejcetedExecution(..) Methode verfügt.
Jeder rejected/abgewiesene Tasks landet als Runnable r hierbei
über rejectedTasks.offer(r)
in einem Pool derjenigen Tasks, die einen weiteren Versuch
bekommen sollen.
Code
package com.stuelken.java.b4.parallel.c31.bufferingrejectedtasks;
/**
* Wenn ein ExecutorService einen Task ablehnt (z. B. bei vollem Task-Queue oder
* abgeschaltetem Executor), übernimmt der RejectedExecutionHandler. Um Tasks
* zwischenzuspeichern, kann man z. B. eine {@code BlockingQueue<Runnable>} verwenden.
*
* @author t2m
*
*/
import java.util.concurrent.*;
import java.util.*;
/**
*
* @author t2m
*
*/
public class BufferingRejectedTasksExample {
private final BlockingQueue<Runnable> rejectedTasks = new LinkedBlockingQueue<>();
public void executeWithBuffering() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), // ganz kleine Queue
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task wurde abgelehnt, wird gepuffert.");
rejectedTasks.offer(r);
}
}
);
// Erstmal 10 Tasks rein, nur 2 wird ausgeführt, Rest landet in Queue, 1 wird abgelehnt
for (int i = 1; i <= 10; i++) {
int finalI = i;
executor.execute(() -> {
System.out.println("Task " + finalI + " laeuft");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
executor.awaitTermination(2, TimeUnit.SECONDS);
System.out.println("Wir starten die gepufferten Tasks neu...");
// Jetzt die abgelehnten Tasks erneut starten
for (Runnable task : rejectedTasks) {
new Thread(task).start(); // oder z. B. ein neuer Executor
}
}
public static void main(String[] args) throws InterruptedException {
new BufferingRejectedTasksExample().executeWithBuffering();
}
}
Erläuterung
Im Beispiel werden 10 Tasks übergeben.
Der Neustart abgelehnter Tasks erfolgt über einen einfachen Thread-Konstruktor mit start() Methode, also ohne Executor. Auch das hätte aber mit einem Executor realisiert werden können.
Ausgabe
//@formatter:off
/*
Task wurde abgelehnt, wird gepuffert.
Task 2 laeuft
Task 1 laeuft
Task wurde abgelehnt, wird gepuffert.
Task wurde abgelehnt, wird gepuffert.
Task wurde abgelehnt, wird gepuffert.
Task wurde abgelehnt, wird gepuffert.
Task wurde abgelehnt, wird gepuffert.
Task wurde abgelehnt, wird gepuffert.
Task 3 laeuft
Wir starten die gepufferten Tasks neu...
Task 4 laeuft
Task 5 laeuft
Task 7 laeuft
Task 8 laeuft
Task 9 laeuft
Task 10 laeuft
Task 6 laeuft
*/
ThreadPoolExecutor FAQ
TPX | FAQ
Ja, kann man.
Ja, aber nicht mit jedem Executor. Man benötigt einen ThreadPoolExecutor.
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
pool.setCorePoolSize(10); // dynamisch erhöhen
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, // corePoolSize
20, // maximumPoolSize
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
);
Man kann also setCorePoolSize(int) und setMaximumPoolSize(int) zur Laufzeit nutzen. Damit lassen sich Ressourcen dynamisch anpassen – z. B. wenn das System lastabhängig skalieren soll.
UIO3 Es ist einfacher als Du denkst.
Stelle noch heute Deine Anfrage.
