uio--WebPageMain-Module

JAVA-B4 Parallel Computing (Parallelism) Parallelism Multithreading

Klassische Threads Threads & Runnables Overview Thread (Begriff) DruckThread DruckRunnable ThreadDemo Output Thread Lifecycle Overview start() run() join() stop()? StartRunJoinDemo Example Von allein endende Threads Threads mit while(criteria) Daemon-Threads Overview ThreadDaemonDemo Daemon (Summary) Thread-Priority Overview t.setPriority(..) MachineThread Klasse (Example) ThreadPriorityDemo (Example) Thread Notification Grundlagen wait notify notifyAll sleep sleep vs. wait Thread Notification Demo sychronized(monitor) Notification/Monitor Zusammenfassung

Concurrent API Vom Thread zur Concurrent API Was ist Concurrency? Was ist Concurrency? Problemstellungen bei herkömmlichen Threads Features der Concurrent API Warum: Bedeutung des Umstiegs Themenüberblick Technischer Hintergrund Thread.sleep(0, 500_000) System.nanoTime()

Executors (Übersicht) #310 ExecutorService (Page) Begriffsbestimmung Excetutor .execute() ExecutorService (Begriff) ExecutorService Interface Methoden Mehrere Callables und Runnables auf einmal starten Kontrolliertes Beenden Executors-Utilily Klasse newCachedThreadPool() newSingleThreadExecutor() newFixedThreadPool(int nThreads) newWorkStealingPool() newScheduledThreadPool(int corePoolSize) Beispiel ForkJoinPool, RecursiveTask, Executors Example#09 A | B1,B2,B3 | C mit Summenbildung. Executor vs. ExecutorService Example#10 Executors.newFixedThreadPool(2) Beispiel Zusammenfassung Executors

Callable, Future #330 Callable Future (Begriff) Callable Interface (Begriff) Begriff Callable erzeugen Callable verwenden Future Beispiel

Future-Listen mit invokeAll Was sind Future-Listen? Wie erzeuge ich Future-Listen? Wann blockt eine Future-Liste? Beispiel mit 12 Callables in 4 Gruppen

Executors (#350) @! Bis auf Hinweis doppelt Overview Executor (Begriff) ExecutorService (Begriff) Executors.new~ Factory Methoden Executors für ThreadPoolExecutor mit fixed Pool

ScheduledExecutorService ScheduledExecutorService Superklasse ExecutorService schedule(*) scheduleAtFixedRate(*) scheduleWithFixedDelay(*) ScheduledExecutorService (xmpl) ProductClock(Main, Bsp.) AbstractMachine** AbstractMaterial** Coal/Iron/Steel** Coal-/Iron-/Steel-Machine** Material** ProductionBus-Machine** Storage T**
ThreadPoolExecutor (Detail) Executor Implementierung ThreadPoolExecutor (Term) ThreadPoolExecutor Beispiele "FixedSizeExecutorExample" EXAMPLE C20 "CachedExecutorExample" EXAMPLE C21 SynchronousQueue "ScheduledExecutorExample" EXAMPLE C22 "CustomSingleThreadExecutorRunnableComponent" EXAMPLE C23 "CustomPolicyRejectedExecutorExample" EXAMPLE C24 "PrioritizedCombinerRunnableComponent" EXAMPLE C30 "BufferingRejectedTasksExample" EXAMPLE C31 Java RejectedExecutionHandler ThreadPoolExecutor FAQ

Synchronizer Synchronizer (overview) Synchronizer Begriff CountDownLatch CountDownLatch Example CyclicBarrier CyclicBarrier Example (@Zeit ausgeben!) Phaser Phaser Example(@Erl!) Exchanger Exchanger ping/pong Semaphore CompletableFuture async!!! SemaphoreCompletableFutureDemo (@!?)

Monitoring JMX ThreadPoolExecutor "live" überwachen getActiveCount() getPoolSize() getCompletedTaskCount() getQueue().size()

Concurrency Example:DeterministicCopy @todo FILE LINK FEHLT! Example:NonDeterministicCopy @todo FILE LINK FEHLT!

Kapitel java.util.concurrent.Flow @!fehltReactive Flow @!fehltPublisher TSubscriber TSubscription TProcressor TProcressor T

Overview

ScheduledExecutorService | Overview

Die Interfaces Executor, ExecutorService, ScheduledExecutorServices sind eine Vererbungskette welche uns den Zugriff auf Methoden eines ThreadPoolExecutors ermöglichen.

scheduler.scheduleAtFixedRate(bus::fireTick, 0, 2, TimeUnit.SECONDS) ist ein klassisches Pattern für die Verwendung eines Schedulers.

ScheduledExecutorService

ScheduledExecutorService | Begriffsbestimmung

ScheduledExecutorService führt Tasks zeitgesteuert aus.

Ein Callable<T> Der ScheduledExecutorService ist ein zeitgesteuerter Task-Executor, der Aufgaben verzögert oder regelmäßig ausführt.

Man kann sich diesen wie einen fortgeschrittenen Timer vorstellen.

Er verwendet aber Threads aus einem Pool und ist somit skalierbar und robust.

Es ist funktional: Man kann Lambda-Ausdrücke dafür verwenden.

Es liefert bei einer Ausführung dem im Typ-Parameter angegeben Datentyp mit einem Wert oder vom Prinzip her auch null.

Es kann checked Exceptions auswerten mit throw Exception.

Executors.newScheduledThreadPool(1)

Man nutzt die statische Methode newScheduledThreadPool(..) von Executors.

Für die Deklaration eines Callables reicht ein Lambda-Ausdruck.


// Den ~Service instanzieren. 
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 
// Dem Service einen Task mit einem Zeitplan mitteilen. 
scheduler.schedule(() -> System.out.println("Start nach 3 Sekunden"), 3, TimeUnit.SECONDS); 
 
// ... 
 

Zeitregeln

Mit .submit(task) kann man im Prinzip eine Aufgabe für eine potentiell sofortige Ausführung hinzu.

Mit .schedule(Runnable, delay, timeUnit) kann man einen Task EINMALIG ausführen lassen, das allerdings verzögert; im Prinzip wie ein rückwärts laufender Timer, nur dass Sie den Timer nicht sehen können.

.scheduledAtFixedRate(..) ermöglicht eine wiederholte Ausführung in einem festen Zeitrythmus.

.scheduleWithFixedDelay(..) wiederholt die Ausführung mit einer Pause nach der Beendigung der letzten Ausführung.


schedule(Runnable, delay, unit) 
scheduleAtFixedRate(Runnable, initialDelay, period, unit) 
scheduleWithFixedDelay(Runnable, initialDelay, delay, unit)
 
 
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// Aufgabe einmalig nach 3 Sekunden
scheduler.schedule(() -> System.out.println("Start verzögert"), 3, TimeUnit.SECONDS);

// Aufgabe wiederholt alle 2 Sekunden
scheduler.scheduleAtFixedRate(() -> System.out.println("Tick"), 0, 2, TimeUnit.SECONDS); 

ProductionClockDEMO

ScheduledExecutorService | ProductionClockDEMO

Im Beispiel werden 4 Speicherplätze für zu produzierende Rohstoffe angelegt und zu Beginn eine Reihe von Rohstoffen generiert, damit das Programm die weitere Fertigung simulieren kann. Alle Maschinen werden als AbstractMachine definiert und verfügen über eine fireTick() Methode, welche man in einem Zeittakt dann durch den Scheduler aufrufen lassen kann.

Das Nehmen und Einspeichern von Material und Produkten in den jeweiligen Speichern wird über synchronisierte Methoden Thread-sicher ausgeführt. Wartende andere Threads werden mit notifyAll benachrichtigt.



package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

import java.util.concurrent.*;

/**
 * Diese Klasse beinhaltet das Hauptprogramm mit der {@link #main(String[])} Methode
 * in welchemn verschiedene {@link Storage} Instanzen 
 * 
 * @author t2m
 */
public class ProductionClockDEMO {

	public static void main(String[] args) throws InterruptedException {

		// {#10} Speicher anlegen.
		Storage<IronOre> oreStorage = new Storage<>("ErzDepot");
		Storage<Coal> coalStorage = new Storage<>("KohleDepot ");
		Storage<Iron> ironStorage = new Storage<>("EisenDepot ");
		Storage<Steel> steelStorage = new Storage<>("StahlDepot ");

		// {#20} Vorab bereits etwas Eisenerz speichern.
		for (int i = 0; i < 20; i++) {
			oreStorage.add(new IronOre());
		}

		// {#30} Maschinen für die Produktion registrieren.
		ProductionBus bus = new ProductionBus();
		bus.register(new IronMachine(oreStorage, ironStorage));
		bus.register(new CoalMachine(coalStorage));
		bus.register(new SteelMachine(ironStorage, coalStorage, steelStorage));

		// {#40} Einen Scheduler erzeugen für einen einzigen Thread.

		ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
		long delay = (long) 0; // {#41}
		long period = (long) 2; // {#42} Die Periode darf NICHT 0 sein. 

		System.out.println("main: scheduler.scheduleAtFixedRate(..)");
		scheduler.scheduleAtFixedRate(bus::fireTick, delay, period, TimeUnit.SECONDS); // {#43}

		System.out.println("main: Thread.sleep(..)");
		Thread.sleep(20_000);

		System.out.println("main: scheduler.shutdownNow(..)");
		scheduler.shutdownNow();

		System.out.println("main: Produktion beendet.");
	}
}

 

Storage

Jedes Storage-Objekt dient zum Speichern von Materialien. Alle Materialien implementieren vom Prinzip her das selbe Interface, aber für eine Typisierung werden hier Typparameter verwendet, um die jeweiligen Storage-Instanzen sortenrein befüllen zu können.

~Bus.register(..)

Die jeweiligen Maschinen werden im ~Bus registriert, damit später der ScheduledExecutorService als scheduler Instanz mit einer fixen Zeitrate an jedem dieser Maschine-Objekte die gleiche Methode über bus::fireTick aufrufen kann.

bus::fireTick

Das ist eine Methoden-Referenz und damit eine Referenz auf Callable, welches derr Scheduler über scheduleAtFixedRate(..) für Callable, delay und period als Dauer ausführen soll.

Hinweis: period muss größer 0 sein! Die Zahlenangaben sind in long udnd erfordern eine TimeUnit TimeUnit

Tasks

Das Task besteht hierbei darin, an jeder Maschine deren fireTick() Methode aufzurufen. Ob es sich um 3 Maschinen oder 30.000 Objekte handelt, welche dann getaktet (nacheinander) gesteuert über den ~Bus ihre Arbeit aufnehmen, spielt keine wirkliche Rolle.

shutdownnow()

Ein Scheduler dient zum Starten und zum Stoppen der von ihm verwalteten nebenläufigen Prozesse.

Auch wenn die fireTick() Methoden blockend nacheinander im Takt ausgeführt werden, so führt dennoch der Scheduler diese Aufgaben nebenläufig aus.

Bei Shutdown werden die gerade noch laufenden Threads NICHT abgebrochen sondern noch kontrolliert zu Ende gebracht.

Die bus::fireTick Methode, die als Methoden-Referenz dem Scheduler als Callable angegeben wurde, führt also noch die 3 Aufrufe an die 3 Maschinen zuende durch.

Code

ScheduledExecutorService (Example) | Code ***

Das Beispiel erfordert ein paar mehr Files.

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

/**
 * Superklasse für {@link CoalMachine}, {@link IronMachine}, {@link SteelMachine}.
 * 
 * Implementierung: Jede Implementierungsklasse benötigt zwingend eine Methode
 * {@link #tick()} welche vom Scheduler aufgerufen wird.
 * 
 * Es gibt drei Phasen: {@link #beginWarmUp()}, #prod
 * 
 * @author t2m
 */
public abstract class AbstractMachine {
	
 protected int phase = 0;

 public void tick() {
 switch (phase) {
 case 0 -> beginWarmUp();
 case 1 -> produce();
 case 2 -> cooldown();
 default -> phase = 0;
 }
 }

 protected abstract void beginWarmUp();
 protected abstract void produce();
 protected abstract void cooldown();
}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

/**
 * Basisklasse für das {@link Material} wie beispielsweise {@link Coal}.
 * 
 * @author t2m
 *
 */
public class AbstractMaterial implements Material {
	
	public AbstractMaterial() {
		this.name = this.getClass().getSimpleName();
	}

	private String name;

	@Override
	public String getName() {
		return this.name;
	}

}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

public class Coal extends AbstractMaterial {
}

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

import com.stuelken.java.b4.parallel.c13.scheduledexecutionservice.*;
/**
 * 
 * @author tinyone
 *
 */
public class CoalMachine extends AbstractMachine {
 private final Storage<Coal> output;

 public CoalMachine(Storage<Coal> output) {
 this.output = output;
 }

 protected void beginWarmUp() {
 System.out.println("[CoalMachine] WarmUp");
 phase++;
 }

 protected void produce() {
 output.add(new Coal());
 System.out.println("[CoalMachine] Producing Coal");
 phase++;
 }

 protected void cooldown() {
 System.out.println("[CoalMachine] Cooldown");
 phase = 0;
 }
}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

public class Iron extends AbstractMaterial {


}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

import com.stuelken.java.b4.parallel.c13.scheduledexecutionservice.*;

/**
 * @author t2m
 */
public class IronMachine extends AbstractMachine {
 private final Storage<IronOre> input;
 private final Storage<Iron> output;

 public IronMachine(Storage<IronOre> input, Storage<Iron> output) {
 this.input = input;
 this.output = output;
 }

 protected void beginWarmUp() {
 System.out.println("[IronMachine] WarmUp");
 phase++;
 }

 protected void produce() {
 IronOre ore = input.take();
 System.out.printf("[IronMachine] > Iron from %s%n", ore.getName());
 output.add(new Iron());
 phase++;
 }

 protected void cooldown() {
 System.out.println("[IronMachine] Cooldown");
 phase = 0;
 }
}

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

public class IronOre extends AbstractMaterial {

}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

/**
 * Interface für {@link AbstractMaterial}
 * @author t2m
 */
public interface Material {

	/**
	 * Name oder Bezeichner des Materials.
	 * @return
	 */
	String getName();
	
}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

import java.util.*;

/**
 * Eine {@link ProductionBus} Instanz
 * speichert intern die Referenzen auf {@link #machines} {@link AbstractMachine}
 * Instanzen. 
 * 
 * @author t2m 
 */
public class ProductionBus {
	
 private final List<AbstractMachine> machines = new ArrayList<>();

 public void register(AbstractMachine m) {
 machines.add(m);
 }

 /**
 * 
 */
 public void fireTick() {
 System.out.println("\n Neuer Takt:");
 machines.forEach(AbstractMachine::tick);
 }
}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

import java.util.concurrent.*;

/**
 * Diese Klasse beinhaltet das Hauptprogramm mit der {@link #main(String[])} Methode
 * in welchemn verschiedene {@link Storage} Instanzen 
 * 
 * @author t2m
 */
public class ProductionClockDEMO {

	public static void main(String[] args) throws InterruptedException {

		// {#10} Speicher anlegen.
		Storage<IronOre> oreStorage = new Storage<>("ErzDepot");
		Storage<Coal> coalStorage = new Storage<>("KohleDepot ");
		Storage<Iron> ironStorage = new Storage<>("EisenDepot ");
		Storage<Steel> steelStorage = new Storage<>("StahlDepot ");

		// {#20} Vorab bereits etwas Eisenerz speichern.
		for (int i = 0; i < 20; i++) {
			oreStorage.add(new IronOre());
		}

		// {#30} Maschinen für die Produktion registrieren.
		ProductionBus bus = new ProductionBus();
		bus.register(new IronMachine(oreStorage, ironStorage));
		bus.register(new CoalMachine(coalStorage));
		bus.register(new SteelMachine(ironStorage, coalStorage, steelStorage));

		// {#40} Einen Scheduler erzeugen für einen einzigen Thread.

		ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
		long delay = (long) 0; // {#41}
		long period = (long) 2; // {#42} Die Periode darf NICHT 0 sein. 

		System.out.println("main: scheduler.scheduleAtFixedRate(..)");
		scheduler.scheduleAtFixedRate(bus::fireTick, delay, period, TimeUnit.SECONDS); // {#43}

		System.out.println("main: Thread.sleep(..)");
		Thread.sleep(30_000);

		System.out.println("main: scheduler.shutdownNow(..)");
		scheduler.shutdownNow();

		System.out.println("main: Produktion beendet.");
		
	}
}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

public class Steel extends AbstractMaterial {

}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

/**
 * @author t2m
 */
public class SteelMachine extends AbstractMachine {
	private final Storage<Iron> ironInput;
	private final Storage<Coal> coalInput;
	private final Storage<Steel> output;

	public SteelMachine(Storage<Iron> ironInput, Storage<Coal> coalInput, Storage<Steel> output) {
		this.ironInput = ironInput;
		this.coalInput = coalInput;
		this.output = output;
	}

	protected void beginWarmUp() {
		System.out.println("[SteelMachine] WarmUp");
		phase++;
	}

	protected void produce() {
		Iron iron = ironInput.take();
		Coal coal = coalInput.take();
		System.out.printf("[SteelMachine] > Steel from %s + %s%n", iron.getName(), coal.getName());
		output.add(new Steel());
		phase++;
	}

	protected void cooldown() {
		System.out.println("[SteelMachine] Cooldown");
		phase = 0;
	}
}
 

Code


package com.stuelken.java.b4.parallel.c13.scheduledexecutionservice;

import java.util.*;

/**
 * Eine Klasse welche zur Speicherung von 
 * Objekten in einer {@link ArrayList}
 * als {@link List} vom Typ 
 * 
 * @author t2m
 *
 * @param <T>
 */
public class Storage<T extends Material> {
	
	private final List<T> pool = new ArrayList<>();
	private final String name;

	/**
	 * Konstruktor 
	 * @param name Name des Speichers. 
	 */
	public Storage(String name) {
		this.name = name;
	}

	/**
	 * Synchronsiertes Hinzufügen von Items.
	 * @param item
	 */
	public synchronized void add(T item) { 
		pool.add(item); /* {#30} */
		notifyAll(); /* {#31} */
		System.out.printf("[%s] Eingelagert: %s (%d)%n", name, item.getName(), pool.size());
	}

	public synchronized T take() {
		while (pool.isEmpty()) {
			try {
				wait();
			} catch (InterruptedException ignored) {
			}
		}
		return pool.remove(0);
	}

	public String getName() {
		return name;
	}

	public synchronized int size() {
		return pool.size();
	}
}
 

Code


[ErzDepot] Eingelagert: IronOre (1)
[ErzDepot] Eingelagert: IronOre (2)
[ErzDepot] Eingelagert: IronOre (3)
[ErzDepot] Eingelagert: IronOre (4)
[ErzDepot] Eingelagert: IronOre (5)
[ErzDepot] Eingelagert: IronOre (6)
[ErzDepot] Eingelagert: IronOre (7)
[ErzDepot] Eingelagert: IronOre (8)
[ErzDepot] Eingelagert: IronOre (9)
[ErzDepot] Eingelagert: IronOre (10)
[ErzDepot] Eingelagert: IronOre (11)
[ErzDepot] Eingelagert: IronOre (12)
[ErzDepot] Eingelagert: IronOre (13)
[ErzDepot] Eingelagert: IronOre (14)
[ErzDepot] Eingelagert: IronOre (15)
[ErzDepot] Eingelagert: IronOre (16)
[ErzDepot] Eingelagert: IronOre (17)
[ErzDepot] Eingelagert: IronOre (18)
[ErzDepot] Eingelagert: IronOre (19)
[ErzDepot] Eingelagert: IronOre (20)
main: scheduler.scheduleAtFixedRate(..)
main: Thread.sleep(..)

 Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp

 Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (1)

 Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown

 Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp

 Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (2)

 Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown

 Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp

 Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (3)

 Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown

 Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp

 Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (4)

 Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown

 Neuer Takt:
[IronMachine] WarmUp
[CoalMachine] WarmUp
[SteelMachine] WarmUp

 Neuer Takt:
[IronMachine] > Iron from IronOre
[EisenDepot ] Eingelagert: Iron (1)
[KohleDepot ] Eingelagert: Coal (1)
[CoalMachine] Producing Coal
[SteelMachine] > Steel from Iron + Coal
[StahlDepot ] Eingelagert: Steel (5)

 Neuer Takt:
[IronMachine] Cooldown
[CoalMachine] Cooldown
[SteelMachine] Cooldown
main: scheduler.shutdownNow(..)
main: Produktion beendet.
 

FootNotes, Keywords, Tags


    java/TimeUnit, java/Enumerationskonstante

    Hinweise, Rechte, Marken

    UI ORGANIZED.

    UIO3 Es ist einfacher als Du denkst.

    Stelle noch heute Deine Anfrage.

    uio--WebPageFooter-Module