uio--WebPageMain-Module

JAVA-B4 Parallel Computing (Parallelism) Parallelism Multithreading

Klassische Threads Threads & Runnables Overview Thread (Begriff) DruckThread DruckRunnable ThreadDemo Output Thread Lifecycle Overview start() run() join() stop()? StartRunJoinDemo Example Von allein endende Threads Threads mit while(criteria) Daemon-Threads Overview ThreadDaemonDemo Daemon (Summary) Thread-Priority Overview t.setPriority(..) MachineThread Klasse (Example) ThreadPriorityDemo (Example) Thread Notification Grundlagen wait notify notifyAll sleep sleep vs. wait Thread Notification Demo sychronized(monitor) Notification/Monitor Zusammenfassung

Concurrent API Vom Thread zur Concurrent API Was ist Concurrency? Was ist Concurrency? Problemstellungen bei herkömmlichen Threads Features der Concurrent API Warum: Bedeutung des Umstiegs Themenüberblick Technischer Hintergrund Thread.sleep(0, 500_000) System.nanoTime()

Executors (Übersicht) #310 ExecutorService (Page) Begriffsbestimmung Excetutor .execute() ExecutorService (Begriff) ExecutorService Interface Methoden Mehrere Callables und Runnables auf einmal starten Kontrolliertes Beenden Executors-Utilily Klasse newCachedThreadPool() newSingleThreadExecutor() newFixedThreadPool(int nThreads) newWorkStealingPool() newScheduledThreadPool(int corePoolSize) Beispiel ForkJoinPool, RecursiveTask, Executors Example#09 A | B1,B2,B3 | C mit Summenbildung. Executor vs. ExecutorService Example#10 Executors.newFixedThreadPool(2) Beispiel Zusammenfassung Executors

Callable, Future #330 Callable Future (Begriff) Callable Interface (Begriff) Begriff Callable erzeugen Callable verwenden Future Beispiel

Future-Listen mit invokeAll Was sind Future-Listen? Wie erzeuge ich Future-Listen? Wann blockt eine Future-Liste? Beispiel mit 12 Callables in 4 Gruppen

Executors (#350) @! Bis auf Hinweis doppelt Overview Executor (Begriff) ExecutorService (Begriff) Executors.new~ Factory Methoden Executors für ThreadPoolExecutor mit fixed Pool

ScheduledExecutorService ScheduledExecutorService Superklasse ExecutorService schedule(*) scheduleAtFixedRate(*) scheduleWithFixedDelay(*) ScheduledExecutorService (xmpl) ProductClock(Main, Bsp.) AbstractMachine** AbstractMaterial** Coal/Iron/Steel** Coal-/Iron-/Steel-Machine** Material** ProductionBus-Machine** Storage T**
ThreadPoolExecutor (Detail) Executor Implementierung ThreadPoolExecutor (Term) ThreadPoolExecutor Beispiele "FixedSizeExecutorExample" EXAMPLE C20 "CachedExecutorExample" EXAMPLE C21 SynchronousQueue "ScheduledExecutorExample" EXAMPLE C22 "CustomSingleThreadExecutorRunnableComponent" EXAMPLE C23 "CustomPolicyRejectedExecutorExample" EXAMPLE C24 "PrioritizedCombinerRunnableComponent" EXAMPLE C30 "BufferingRejectedTasksExample" EXAMPLE C31 Java RejectedExecutionHandler ThreadPoolExecutor FAQ

Synchronizer Synchronizer (overview) Synchronizer Begriff CountDownLatch CountDownLatch Example CyclicBarrier CyclicBarrier Example (@Zeit ausgeben!) Phaser Phaser Example(@Erl!) Exchanger Exchanger ping/pong Semaphore CompletableFuture async!!! SemaphoreCompletableFutureDemo (@!?)

Monitoring JMX ThreadPoolExecutor "live" überwachen getActiveCount() getPoolSize() getCompletedTaskCount() getQueue().size()

Concurrency Example:DeterministicCopy @todo FILE LINK FEHLT! Example:NonDeterministicCopy @todo FILE LINK FEHLT!

Kapitel java.util.concurrent.Flow @!fehltReactive Flow @!fehltPublisher TSubscriber TSubscription TProcressor TProcressor T

Overview

Executors | Overview

Übersicht: Executor-Interfaces und Klassen ersetzen in Java Concurrency das herkömmliche thread.start() sowie mitunter hierbei zugleich auch das Problem, wie man diesen Start für viele Threads und auch deren Ende eigentlich kontrolliert steuert.

Das wichtigste Grundprinzip eines Executors vorab: Man erzeugt diese über die Factory-Methoden von Executors.*

Begriffsbestimmung

Executors | Begriffsbestimmung

Rund um Executors gilt es drei ähnlich klingende Bezeichnungen zu verstehen: Executor, ExecutorService und Executors.

Executor

Executors | Executor

Executor ist das Basis-Interface

Executor ist ein Interface in Java, das die Ausführung von Runnable-Objekten abstrahiert, java.util.concurrent.Executor


public interface Executor {
 void execute(Runnable command);
}
 
 

Die Methode execute(Runnable r) startet den Task. Zugleich ist diese Methode quasi die einzige Methode, die das Interface defininert.

Entkopplung der Aufforderung zur Ausgabe von den Details.

Executor als Interface führt dazu, dass man die Art und Weise der Ausführung, die man früher bei Threads genau angeben musste, bei Verwendung eines Executor(s) entkoppeln kann. Es reicht quasi zu wissen, dass man den Excecutor ausführen kann.

Ein Executor fordert dafür keine Methode

Das Executor-Interface fordert nur die execute(Runnable r) Methode. Ein Runnable hat aber nur eine einzige Methode den Rückgabe-Wert stehts void ist.

Dem Executor fehlt die shutDown(..) Methode

Das bedeutet für die Praxis, dass man zwar Runnables starten kann, aber selbst wenn die Tasks selbst fertig sind, ist der Executor noch immer am Laufen.

Der Executor ist quasi ein ThreadPoolExecutor, welcher darauf wartet, dass er weiter Runnables mit .submit(..) zur Ausführung bekommt. Wenn diese ihm aber nicht geben, wartet er einfach weiter; und bewirkt, dass das Programm bzw. die main-Methode nicht endet.

ExecutorService

Executors | ExecutorService

Auch ExceutorService ist ein Interface

Der java.util.concurrent.ExecutorService erweitert das Executor Interface durch eine Reihe weiterer Methoden:

Interface:


// Das offizielle Interface
public interface ExecutorService extends Executor { // Erweitert Executor
 Future<?> submit(Runnable task); // Fordert eine submit(..) Methode für Runnables
 <T> Future<T> submit(Callable<T> task); // Fordert eine submit(..) für Callables
 void shutdown(); // Fordert die Option für eine Aufforderung zum Beenden von Tasks
 List<Runnable> shutdownNow(); // Fordert die sofortige Beendigung. 
 boolean awaitTermination(long timeout, TimeUnit unit); 
 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); // Bulk
 <T> T invokeAny(Collection<? extends Callable<T>> tasks); // Bulk
 // …
}

Ein ExecutorService verwaltet Tasks als Callable<T>

Mit Hilfe von .submit(callable) kann man einen Task übertragenl, welcher sogleich auch ausgeführt wird bzw. als Thread nebenläufig ausgeführt werden kann, wenn denn die Runtime diesem Prozessorzeit gönnt.

Die Besonderheit besteht darin, dass ein Callable einen Rückgabewert vom Typ Future T erzeugen kann. Damit ist es also möglich, dass ein ExecutorService nicht nur nebenläufige Prozesse startet sondern jeder dieser Prozesse auch einen Rückgabe-Wert haben darf.

... und mehrere Futures T als Antwort bekommen.

invokeAny(..) und invokeAll(..) ermöglichen eine sogeannte «Bulk»-Ausführung von Callable Tasks: Anstatt also wie bei submit(Callable task) nur einen einzigen Task je Befehl zu übergeben kann man auch mehr übertragen.

shutDown() und shutDownNow() erleichtern die Arbeit.

Ein ExecutorService bietet die Kontrolle über den Lebenszyklus (z. B. shutdown(), awaitTermination(), shutdownNow. Man kann also Tasks kontrolliert ordentlich beenden lassen oder das Beenden auch forcieren.

Er unterstützt Pools (z. B. FixedThreadPool, CachedThreadPool).

Mehrfaches Aufrufen ist möglich

Ein ExcecutorService kann mehrere Tasks gleichzeitig starten und überwachen. Man kann also mehrere nacheinander mit submit(..) übergeben oder auch mehrere mit InvokeAny und InvokeAll.

Executors Utility-Klasse

Executors | Executors Utility-Klasse

...

Die Klasse Executors ist ein Utility für Factory-Methoden und Wrapper. Man verwendet diese Klasse dazu, um neue Executor-Instanzen erzeugen zu können.

newCachedThreadPool(), newFixedThreadPool(int nThreads) und weitere Methoden mehr erzeugen ThreadPools.

...

Dynamisch wachsendes Pool, Threads werden bis zu 60 s idle gehalten.

Nach Ablauf dieser 60s werden diese Threads beendet, auch ohne dass diese gelaufen sind.

...

Fester Pool mit nThreads und unbeschränkter Warteschlange.

Sehr praktisch für den Fall, dass man beispielsweise nur 2 Threads erlauben möchte.

...

Ein einzelner Worker-Thread, Aufgaben in Reihenfolge.

Auch wenn man mehrere Runnables, Callables, Tasks überträgt: Es wird immer maximal ein Thread zur Zeit aktiv sein, wie man es quasi von JavaScript kennt.

Auch dieser Task kann mal eine Weile schlafen und wieder starten, ist aber erst wieder an der Reihen, wenn es keinen zweiten gibt, der schon läuft.

...

Fork/Join-basiertes Pool für Work-Stealing.

...

Pool für zeitversetzte und periodische Aufgaben.

Hierzu sei erwähnt, dass es mehrere Implementierungsklassen für Executor-Typen gibt, darunter den ScheduledThreadPoolExecutor.


Auf die ThreadPoolExecutor-Klasse als Implementierungsklasse wird noch später eingangen werden.

...

Wrapper, der keine Implementierungs-Methoden freigibt.

...

Erzeugung von ThreadFactory

...

Liefert Callable-Adapter-Objekte.

Beispiel

Executors | Beispiel

Executor, ExecutorService, Executors werden in vielen unserer Beispiele verwendet.

Executor vs. ExecutorService

Executors | ForkJoinPool A|B1,B2,B3|C

Die Executors-Utility-Klasse kann beispielsweise auch von ce:RecursiveTask abgeleitete Objekte dazu verwenden eine Verschachtelung von Tasks mit Task-Fork und Task-Join zu verwalten.

Das nachfolgende Beispiel zeigt, wie man einen Task A beginnt, welcher intern 3 Sub-Tasks B1, B2, B3 ausführt.


package com.stuelken.java.b4.parallel.c09.concurrentstealing;

import java.util.concurrent.*;

/**
 * Es soll folgende Tasks geben: A ist der erste Task. Daraus sollen 3 Tasks B1,
 * B2, B3 als "Fork" erstellt werden. Abschließend sollen die drei wieder als
 * "Join" zu C fortgesetzt werden.
 * 
 * @author t2m
 */
public class WorkStealingForkJoinDemo {

	/**
	 * Oberaufgabe oder erster Task, der von drei Unteraufgaben B1–B3 "forkt" und
	 * deren Ergebnisse "joint".
	 * 
	 * Erweitert die Klasse {@link RecursiveTask} mit dem
	 * Typparameter {@link Integer}, damit unserer Task
	 * schlussendlich auch ein Ergebnis berechnen kann. 
	 * 
	 * @author t2m
	 */
	static class TaskA extends RecursiveTask<Integer> {

		/**
		 * 
		 */
		private static final long serialVersionUID = -506106973422841988L;

		/**
		 * "The main computation performed by this task." (JavaDoc).
		 */
		@Override
		protected Integer compute() {
			
			System.out.println("TaskA compute begonnen.");

			// Wir erzeugen 3 Tasks für unseren Fork, welche aber
			// alle auf dem selben Typ basieren und sich nur über den
			// Wert für den Konstruktor unterscheiden.
			
			System.out.println("TaskA erzeugt b1, 2, b3 mit b1.fork() etc..");
			
			TaskB b1 = new TaskB(1); // Aufgabe B mit Wert 1
			TaskB b2 = new TaskB(2); // Aufgabe B mit Wert 2
			TaskB b3 = new TaskB(3); // Aufgabe B mit Wert 3

			// Fork der Unteraufgaben
			// Dieser Befehl bewirkt, dass sich dieser Task in dem Container-Thread,
			// in welchem er sich befindet, in einen "common pool" einzufinden
			// versucht, um dort asynchron nebenläufig mit anderen weiteren
			// Tasks ausgeführt zu werden.
			b1.fork();
			b2.fork();
			b3.fork();
			
			

			// Join: Ergebnisse zusammenrechnen
			// Mit .join() liefert jeder Task seinen berechneten Wert
			// zurück. Die .join() Methode blockt entsprechend, bis
			// diesen Wert denn auch gibt.
			//
			// Vergleiche auch .get() welche bei Callables ein Future<T> liefert.
			
			int sum = b1.join() + b2.join() + b3.join();
			System.out.printf( //
			 "A collected sum=%d in %s%n", //
			 sum, //
			 Thread.currentThread().getName() //
			);
			
			System.out.println("TaskA compute beendet.");
			
			return sum;
		}
	}

	/**
	 * B: Eine ganz einfache Aufgabe, liefert id*10
	 * 
	 * Von RecursiveTask T können KEINE Objekte erzeugt werden; man kann diese
	 * Klasse allerdings ableiten. 
	 * 
	 * @author t2m
	 *
	 */
	static class TaskB extends RecursiveTask<Integer> {
		private final int id;

		/**
		 * Jeder Tasks hat eine id, damit wir diesen unterscheiden können.
		 * 
		 * @param id
		 */
		TaskB(int id) {
			this.id = id;
		}
		
	

		/**
		 * Jeder {@link RecursiveTask} fordert die compute() 
		 * Methode, damit in unserem Fall ein Task für den Typ Integer
		 * am Ende mit compute() auch einen Integer-Wert liefrt. 
		 */
		@Override
		protected Integer compute() {
			
			// Ausgabe, welcher B-Task hier gerade läuft.
			System.out.printf("Running B%d in %s%n",
			 id, Thread.currentThread().getName());
			return id * 10; // Berechnung multipliziert ID einfach mit 10.
		}
	}

	/**
	 * 
	 * @param args Keine Argumente
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		
		// {#1} Erzeugt einen Work-Stealing-Pool mit (hier) 4 parallel arbeitenden Threads
		// {#2} Executors erzeugt stets Objekte einer Implementierungsklasse
	// {#3} Der Rückgabewert der Factory-Methoden ist aber ein Executor-Interface.
	// {#4} Wir müssen deshalb den Wert casten.
		//
		// WICHTIG: Dieser spezielle Pools ist in der Lage, verschachtelte
		// Tasks verwalten zu können.
		ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(4);

		// A wird in den Pool eingereicht
		// B1 bis B3 brauchen wir nicht übergeben, weil diese innerhalb von A
		// erzeugt werden. Der Task A ist also der Container für den Fork von 3
		// Sub-Tasks. 
		System.out.println("main: pool.submit(new TaskA() vorher");
		ForkJoinTask<Integer> taskA = pool.submit(new TaskA());
		System.out.println("main: pool.submit(new TaskA() nachher");

		// C: Holt das Gesamtergebnis ab
		// Einen Task "C" sparen wir uns weil dieser einfach der letzte
		// unserer main-Methode und damit dem Haupthread ist.
		System.out.printf("Final result in main: %d%n", taskA.get());
		
		// ! Man beachte, dass task.get() ein Future T liefert und damit zuerst einmal blockt is der Wert kommt. 

		// Abschließend unseren Executor für den Pool kontrolliert beenden.
		pool.shutdown();
	}
}


 
Erläuterungen

Ausgabe



// @formatter:off
/*
main: pool.submit(new TaskA() vorher
main: pool.submit(new TaskA() nachher
TaskA compute begonnen.
TaskA erzeugt b1, 2, b3 mit b1.fork() etc..
Running B1 in ForkJoinPool-1-worker-1
Running B3 in ForkJoinPool-1-worker-3
Running B2 in ForkJoinPool-1-worker-2
A collected sum=60 in ForkJoinPool-1-worker-1
TaskA compute beendet.
Final result in main: 60
*/ 
 

Beispiel

Executors | Beispiel

Executor, ExecutorService, Executors werden in vielen unserer Beispiele verwendet.

Executor vs. ExecutorService

Executors | Executor vs. ExecutorService

Den Unterschied zwischen einem Executor und einem ExecutorServices demonstriert unser Beispiel.

In beiden Fällen erfolgt der Zugriff auf die API über die java.util.concurrent.Executor Klasse, was es recht einfach macht, sich das zu merken.


package com.stuelken.java.b4.parallel.c10.concurrentexecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 
 * @param args
 */
public class ExecutorServiceDemo {

	public static void main(String[] args) {

		/*
		 * === Einfacher Executor
		 */

		Executor simpleExecutor = Executors.newSingleThreadExecutor();
		simpleExecutor.execute(() -> {
			System.out.println("Task mit einfachem Executor laeuft");
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});

		((ExecutorService) simpleExecutor).isShutdown();

		/*
		 * === Vollwertiger ExecutorService
		 */

		ExecutorService poolExecutor = Executors.newFixedThreadPool(2);

		poolExecutor.submit(() -> {
			System.out.println("Task 1 via ExecutorService laeuft");
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});

		poolExecutor.submit(() -> {
			System.out.println("Task 2 via ExecutorService laeuft");
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});

		// Beispiel #3: Kontrolle via shutdown()
		System.out.println("main: sleep 10000!");
		try {
			Thread.sleep(10000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("main: poolExecutor.shutdown!");
		poolExecutor.shutdown();

		System.out.println("main: Letzte Zeile. ");

		///////////////////////////////////
		// TODO: endet das Programm nicht?
		///////////////////////////////////
		if (false) {
			((ExecutorService) simpleExecutor).shutdown();
			System.out.println("main: Jetzt hat auch simple~ geendet. ");
		}

	}

}

 
Erläuterungen

Ausgabe

// @formatter:off
/*
Task mit einfachem Executor laeuft
Task 1 via ExecutorService laeuft
main: sleep 10000!
Task 2 via ExecutorService laeuft
main: poolExecutor.shutdown!
main: Letzte Zeile. 
 
*/ 
 

Zusammenfassend

Executors | Zusammenfassend

Executor, ExecutorService und die Executors-Utility-Klasse erleicht das Starten, Verwalten und Beenden von Runnables und Callables, wobei im Falle von Callables wir auch noch Rückgabe-Werte bekommen können.

Callables und Futures werden im nächsten Kapitel erläutert.

Implementierungen für ExecutorService: ThreadPoolExecutor als Superklasse für ScheduledThreadsPoolExecutor sowie ForkJoinPool.

Pool-Size

Anzahl der CPU-Cores? I/O-Last und Laufzeiten für Tasks?

UI ORGANIZED.

UIO3 Es ist einfacher als Du denkst.

Stelle noch heute Deine Anfrage.

uio--WebPageFooter-Module