uio--WebPageMain-Module

JAVA-C2 Collection Advanced Collection Advanced Problemstellung Diagramm

Concurrent Collection Interfaces & Classes Thread-sichere Lists Collection.synchronizedList() CopyOnWriteArrayList (@Bsp?) Thread-sichere Sets Collection.synchronizedSet() CopyOnWriteArraySet Thread-sichere Queues BlockingQueue ArrayBlockingQueue LinkedBlockingQueue PriorityBlockingQueue Nicht-serialisierbare Queues SynchronousQueue DelayQueue Thread-sichere Maps Collection.synchronizedMap(..) ConcurrentHashMap

Aggregate Operations Streams & Aggregate Operations (Overview) @! 15 Page Section Links fehlen! Person ExampleData Aggregate Operations @!section? stream() @!section? for-Loop for-Loop stream() und forEach() Streams und Pipelines Filter und Predicates EXAMPLE 03 Stream/Predicate/Consumer EXAMPLE 04 Consumer mit Generic Type Parameter EXAMPLE 05 Predicates/Enums/Collection "Pipelines" EXAMPLE 06 Mapping to new Type EX6 Summary maptTOInt, ToIntFunction, IntStream.average() OptionalDouble EXAMPLE 07 Map Key-Value-Array zu LinkedHashMap Stream.mapToObj(k,v) collect(Collectors.toMap(a,b,c,d) Map.entry(k,v) map(*).findFirst().orElse(*) Map.getOrDefault(..)

Collection Reduction stream.reduce Collection Collecting Collection stream.collect



@! Kapitel überprüfen!@! Java-Tier von B4 auf B2 verschoben!@! Alle Links/Referenzen checken!

Collection-Parallelism Collection Stream API ab JAVA 8 .parallelStream() .parallelStream() .parallelStream().mapToInt(*).average).getAsDouble() Typ OptionalDouble Begriff Fork Begriff Join Executing Streams in Parallel (01) Concurrent Reduction (02) Ordered Sequentiell (03) Ordered Parallel (04) Side Effects Parallel Computing "Lazyness" (06) Interference (07) State-Full-Lambda Expressions (08) zustandsbehaftete Lambda-Ausdrücke (08)

Collections, Streams & AlgorithmsCollection Streams (Overview) UIO SimulateDownloadDataProvider** connectSimulated():ListINTFilter (prime, schnapps, quer, mod, Operationsfilter, map, sorted, collect, reduce, peek, limit



(Collection FAQ) Datenquellen zu Streams konvertieren Array-Stream Collection-Stream Map-Stream Primitive-Stream Generatoren-Stream Iteratoren-Stream (Collection Implementations) (Collection Algorithms)

Overview

Parallel Computing | Overview

Parallel Computing als parallele Datenverarbeitung gibt es in JAVA inzwischen in mehreren Varianten. Wir können herkömmlich Threads und Runnables anlegen oder aber auch das JAVA Fork-/Join-Framework von JAVA Concurrency verwenden, welches für viele Problemstellungen standartisierte Ansätze bietet.

Intro

Parallel Computing | Intro

Bevor wir uns mit den jeweiligen Beispielen zum Thema befassen ist es anzuraten, im Überblick zuerst ein paar Begriffe und Problemstellungen rund um nebenläufige Verarbeitung von Daten zu kennen.

Diese Probleme in nebenläufiger Verarbeitung von Daten, als Parallel Computing bezeichnet, beschränken sich nicht nur auf JAVA [1] sondern gelten für alle Programmiersprachen, welche mehrere Prozessoren und damit echtes Multi-Processing wie mitunter auch C++ oder C# ermöglichen. JAVASCRIPT [2] ist bei bislang fast allen Runtimes für JAVA im Browser oder mit NodeJS auf Serverseite letztendlich Single-Threaded, dh. auch wenn ein Rechner mehrere Prozessoren hat, werden nebenläufige Operationen in JavaScript auf dem selben Prozessor atomar zerlegt, erfordern aber keine Synchronisierung wie in JAVA und C#, weil es in JS keinen parallel Zugriff auf Code-Segmente oder Variablen geben kann, in JAVA und C# aber durchaus.

parallele Datenverarbeitung

Parallel-Computing dt. parallele Datenverarbeitung, bedeutet, dass man eine Problemstellung dahingehend in Teilprobleme gliedert, welche anschließend in separaten Threads gleichzeitig im Sinne von «zeitlich parallel» ausgeführt werden.

Trennen und Zusammenfügen des Programmverlaufs

Infolge der Aufteilung der Verarbeitung von Problemstellungen in nebenläufige (engl.) Threads (eigentlich dt. «Faden») , dem sogenannten (engl.) Fork, besteht nun wiederum die Problemstellung, dass das Gesamtergebnis dieser Verarbeitung nur dann möglich ist, wenn man diese nebenläufig auf mehreren Prozessoren ablaufenden Sequenzen von Programmschritten an irgendeinem Punkt, Join bezeichnet, kontrolliert wieder zusammenführen kann.

JAVA verfügt über genau ein solches Fork-Join-Framework Framework.

Trennen und Zusammenfügen des Programmverlaufs

Infolge der Aufteilung der Verarbeitung von Problemstellungen in nebenläufige (engl.) Threads (eigentlich dt. «Faden») , dem sogenannten (engl.) Fork, besteht nun wiederum die Problemstellung, dass das Gesamtergebnis dieser Verarbeitung nur dann möglich ist, wenn man diese nebenläufig auf mehreren Prozessoren ablaufenden Sequenzen von Programmschritten an irgendeinem Punkt, Join bezeichnet, kontrolliert wieder zusammenführen kann.

Die Runtime übernimmt bei Aggregation-Operations die Kontrolle

Dieses Fork-Join-Framework kann die Einführung der parallelen Datenverarbeitung in Programmen stark vereinfachen. Man muss hierzu aber vorgeben, wie denn die Teilprobleme parallel verarbeitet und dahingehend zuerst einmal geteilt werden müssen und wie diese später wieder zusammengeführt werden.

Gegenüber einer herkömmlichen Programmierung von Thread Instanzen oder eben Klassen, die das Runnable Interface implementiert haben, und hierbei der Entwickler die Kontrolle über das Entstehen und das Beenden von Threads hatte, wird bei Verwendung des Frameworks diese Kontrolle über die Runtime übernommen.

Thread-Interferenzen führen Daten-Inkonsistenzen

Wenn mehrere Threads zeitlich parallel auf die gleiche Collection zugreifen, kann es Fehler bei der Speicher-Konsistenz geben.

Mit Hilfe von synchronized { } Blöcken oder synchronized (variable) { .. } kann man zwar den zeitlichen Zugriff auf Programmcode-Sequenzen und Variablen durch mehrere Prozessoren auf stets nur 1 Prozess limitieren.

Wenn die Verarbeitung von Daten in Collections sich in Abhängigkeit von der Reihenfolge, in welcher diese Arbeitsschritte durchgeführt werden, im Ergebnis sich verändert, führt das zu einem Sideeffect, dt. Nebeneffekt.

Thread-Interferenzen führen Daten-Inkonsistenzen

Das Collection-Framework stellt Synchronisations-Wrapper bereit. Mit diesen lässt sich einer jeden Collection eine «automatische Synchronisation» hinzügen.

Damit werden diese Collections thread-safe bzw. thread-sicher.

Dieses Verfahren kann dennoch aber zu Problemem führen: Threads können sich gegenseitig blockieren.

Thread-Blocking Konflikte

Wenn Threads auf den gleichen mit synchronized synchronisierten Code-Block oder eine Variable zugreifen wollen, können sich Threads gegenseitig blockieren, wenn Thread A in der Bearbeitung des Blocks a auf die Variable b wartet, während Thread B im Block b gerade darauf wartet, den Zugriff auf den Block a zu bekommen.


Dieses Prinzip kennt man schlussendlich auch im Straßenverkehr: Wenn in einer Straße mit zwei Richtungen ein Engpass besteht, durch den nur ein Fahrzeug passt, gilt im Straßenverkehr die Regel, dass derjenige warten möge, auf dessen Seite das Hindernis steht.

Besteht das Hindernis aber an beiden Seiten, hat keine Seite ein Vorrecht, dh. jeder kann seinen eigenen Bereich für den Zugang nur dann freigeben, wenn der jeweils andere seinen Bereich verlässt. Da dieses hier aber nicht möglich ist, blockieren sich nicht nur Fahrzeuge an Engpässen sondern auch Threads in der Programmierung.

parallelStream() mit Aggregation-Operations

Unter der Voraussetzung, dass eine Collection, die in einem Stream über .parallelStream() nebenläufig (statt sequenziell mit .stream() verarbeitet wird, in diesem Zeitraum NICHT verändert wird, ermöglichen die Aggregation-Operations eine nebenläufige Verarbeitung von Daten und verhindern hierbei das Thread-Blocking.

Serielle Ausführung kann schneller sein.

Die Voraussetzung dafür, dass eine parallele Verarbeitung schneller ist als eine sequenzielle herkömmliche Verarbeitung, besteht in der Verfügbarkeit mehrerer ProzessorKerne und auch einer gewissen Datenmenge.

Das über die Aggregation-Operations automatisierte Erzeugen von Threads, das Aufspalten in Thread-Forks und das Zusammenführen über Thread-Joins erfordert Zeit für das Anlegen von Objekten im Arbeitsspeicher.

Serielle Ausführung kann schneller sein.

Die zugehörigen Klassen, Interfaces, Enumerations und anderen Types des JAVA Fork-Join-Framework(s) sind im Package java.util.concurrency enthalten.

https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

Parallel Stream

Parallel Computing | Executing Streams in Parallel / StreamParallel01

Wir verwenden hier .parallelStream() mit .filter(..) für die Reduzierung der Elemente im Stream und anschließender üblicher Ermittlung des Durchschnitts über IntStream.average() als Aggregate-Operation.

In diesem Beispiel berechnen wir in paralleler Ausführung das Durschnittsalter der Frauen, Männer und derer, die keines von beiden sind.

Beispiel



package com.stuelken.java.b1.collections.parallelism;

import java.util.List;

import com.stuelken.java.b2.exampledata.Person;

public class Parallel01 {

	public static void main(String[] args) {

		/**
		 * 
		 */
		List<Person> gruppe = Person.createGruppe();

		// @formatter:off				
		
		double durchschnittsalterFrauen = gruppe
		 .parallelStream() // (1) Stream<Person>
		 .filter(p -> p.getGender() == Person.Sex.FEMALE) // (2) filter(Predicate<Person> predicate)
		 .mapToInt(Person::getAge) // (3) mapToInt(mapper): Stream<Integer)
		 .average() // (4) Aggregate-Operation IntStream.average(): OptionalDouble
		 .getAsDouble(); // (5) OptionalDouble.getAsDouble()
		
		System.out.println("Frauen:"+durchschnittsalterFrauen);
		
		double durchschnittsalterMannen = gruppe
		 .parallelStream()
		 .filter(p -> p.getGender() == Person.Sex.MALE)
		 .mapToInt(Person::getAge)
		 .average()
		 .getAsDouble();
		System.out.println("Mannen:"+durchschnittsalterMannen);
		
		double durchschnittsalterAndere = gruppe
		 .parallelStream()
		 .filter(p -> p.getGender() == Person.Sex.UNDEFINED)
		 .mapToInt(Person::getAge)
		 .average()
		 .getAsDouble();
		
		System.out.println("Andere:"+durchschnittsalterAndere);
		
		// @formatter:on				

	}

}
// @formatter:off				
/*
22.0
40.166666666666664
24.0 
*/


 
22.0 (Beispiel)
40.166666666666664
24.0 

Stream T für parallele Verarbeitung

Mit Einführung der Stream-API und des JAVA Concurrency-Frameworks kam über die sequentielle Verarbeitung von Streams über .stream() auch Methode .parallelStream hinzu.

Auch hier wird ein Stream<T> bzw. Stream<Person> erzeugt.

Die nebenläufige Verarbeitung erfolgt hierbei automatisch über die Runtime. Wir brauchen also nicht mehr manuell Threads erzeugen.

Das Mappen auf einen anderen Typ

Erzeugt wird hier eine zweite Collection für Integer-Werte, dh. für jedes Element im Stream wird automatisch ein Element im zweiten Stream angelegt.

Die Besonderheit hierbei besteht darin, dass die Runtime selbst entscheidet, in welcher Reihenfolge die Elemente verarbeitet werden.

Wenn vermutlich ein double-Wert kommen wird

Die average() Methode von IntStream ist eine Aggregate-Operation und sorgt dafür, dass wir letztendlich unsere Collection, die wir da im Stream verarbeiten, schlussendlich wieder auf einen einzigen Wert zusammenfassen können.

Nun kann es aber passieren, dass die Collection keinen einzigen Wert hat; wir wissen also nicht, ob ein double-Wert herauskommt oder doch eher gar nichts. Hierfür ist OptionalDouble gedacht.

JAVADOC, Eclipse

Diese Anweisung dient nur der Formatierung des Quellcodes in Eclipse.

Executing Streams in Parallel

Man kann Streams entweder seriell oder # parallel ausführen.

Wenn ein Stream parallel ausgeführt wird, partitioniert die Java-Laufzeitumgebung den Stream in mehrere Teilstreams. Diesen Prozess bezeichnet man als Fork.

Aggregatsoperationen iterieren über diese Teilstreams, verarbeiten sie parallel und kombinieren anschließend die Ergebnisse. Dieses Zusammenfügen der parallen Prozesse bezeichnet man als Join

Standardmäßig wird ein Stream immer als serieller Stream erzeugt, sofern nicht anders angegeben.

Um einen parallelen Stream zu erstellen, nutzt man die Operation Collection.parallelStream() Operation.

Alternativ kannst du die Methode BaseStream.parallel() verwenden, java.util.stream.BaseStream [3]

Concurrent Reduction

Parallel Computing | Concurrent Reduction

StreamParallel02

In diesem Beispiel berechnen wir in paralleler Ausführung das Durschnittsalter der Frauen, Männer und derer, die keines von beiden sind. Wirkt nicht wirklich spannend, ist es aber: Wir müssen nämlich nicht einen einzigen Thread manuell erzeugen, weil das alles uns die Stream-API abnimmt.

Wir verwenden hier .parallelStream() mit .filter(..) für die Reduzierung der Elemente im Stream und anschließender üblicher Ermittlung des Durchschnitts über IntStream.average() als Aggregate-Operation.

Beispiel



package com.stuelken.java.b1.collections.parallelism;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import com.stuelken.java.b2.exampledata.Person;

public class Parallel02_ConcurrentReductionToMap {

	public static void main(String[] args) {

		// Gruppe als Collection
		List<Person> gruppe = Person.createGruppe();

		// @formatter:off				

		// Sequentielles Verfahren mit .stream()
		Map<Person.Sex, List<Person>> mapPersonenGruppiertNachGeschlecht_sequentiell =
				gruppe
		 .stream()
		 .collect(
		 Collectors.groupingBy(Person::getGender));

		// Paralleles Verfahren mit .parallelStream() und 
		// als Map eine java.util.concurrent.ConcurrentMap;

		ConcurrentMap<Person.Sex, List<Person>> mapPersonenGruppiertNachGeschlecht_parallel =
				gruppe
		 .parallelStream()
		 .collect(
		 Collectors.groupingByConcurrent(Person::getGender));
		
		System.out.println(mapPersonenGruppiertNachGeschlecht_parallel);
		System.out.println(mapPersonenGruppiertNachGeschlecht_sequentiell);
		
		// @formatter:on				

	}

}
// @formatter:off				
/*
{FEMALE=[
Person-Objekt (name:Ephigene, num:0, geschlecht:FEMALE, geb.:2015-01-04, age:10, email:, 
Person-Objekt (name:Berta, num:0, geschlecht:FEMALE, geb.:1990-07-15, age:34, email:berta@example.com], MALE=[
Person-Objekt (name:Excelsius, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:emil@example.com, 
Person-Objekt (name:Torben, num:0, geschlecht:MALE, geb.:1969-09-10, age:55, email:torben@example.com, 
Person-Objekt (name:Hakaan, num:0, geschlecht:MALE, geb.:1964-12-10, age:60, email:emil@example.com, 
Person-Objekt (name:Cesar, num:0, geschlecht:MALE, geb.:1991-08-13, age:33, email:cesar@example.com, 
Person-Objekt (name:Anton, num:0, geschlecht:MALE, geb.:1980-06-20, age:45, email:anton@example.com, 
Person-Objekt (name:Dora, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:dora@example.com], UNDEFINED=[
Person-Objekt (name:Neutrum, num:0, geschlecht:UNDEFINED, geb.:2000-09-12, age:24, email:neutrum@example.com]}
{FEMALE=[
Person-Objekt (name:Berta, num:0, geschlecht:FEMALE, geb.:1990-07-15, age:34, email:berta@example.com, 
Person-Objekt (name:Ephigene, num:0, geschlecht:FEMALE, geb.:2015-01-04, age:10, email:], MALE=[
Person-Objekt (name:Anton, num:0, geschlecht:MALE, geb.:1980-06-20, age:45, email:anton@example.com, 
Person-Objekt (name:Cesar, num:0, geschlecht:MALE, geb.:1991-08-13, age:33, email:cesar@example.com, 
Person-Objekt (name:Dora, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:dora@example.com, 
Person-Objekt (name:Excelsius, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:emil@example.com, 
Person-Objekt (name:Hakaan, num:0, geschlecht:MALE, geb.:1964-12-10, age:60, email:emil@example.com, 
Person-Objekt (name:Torben, num:0, geschlecht:MALE, geb.:1969-09-10, age:55, email:torben@example.com], UNDEFINED=[
Person-Objekt (name:Neutrum, num:0, geschlecht:UNDEFINED, geb.:2000-09-12, age:24, email:neutrum@example.com]}
*/


 
{FEMALE=[
Person-Objekt (name:Ephigene, num:0, geschlecht:FEMALE, geb.:2015-01-04, age:10, email:, 
Person-Objekt (name:Berta, num:0, geschlecht:FEMALE, geb.:1990-07-15, age:34, email:berta@example.com], MALE=[
Person-Objekt (name:Excelsius, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:emil@example.com, 
Person-Objekt (name:Torben, num:0, geschlecht:MALE, geb.:1969-09-10, age:55, email:torben@example.com, 
Person-Objekt (name:Hakaan, num:0, geschlecht:MALE, geb.:1964-12-10, age:60, email:emil@example.com, 
Person-Objekt (name:Cesar, num:0, geschlecht:MALE, geb.:1991-08-13, age:33, email:cesar@example.com, 
Person-Objekt (name:Anton, num:0, geschlecht:MALE, geb.:1980-06-20, age:45, email:anton@example.com, 
Person-Objekt (name:Dora, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:dora@example.com], UNDEFINED=[
Person-Objekt (name:Neutrum, num:0, geschlecht:UNDEFINED, geb.:2000-09-12, age:24, email:neutrum@example.com]}
{FEMALE=[
Person-Objekt (name:Berta, num:0, geschlecht:FEMALE, geb.:1990-07-15, age:34, email:berta@example.com, 
Person-Objekt (name:Ephigene, num:0, geschlecht:FEMALE, geb.:2015-01-04, age:10, email:], MALE=[
Person-Objekt (name:Anton, num:0, geschlecht:MALE, geb.:1980-06-20, age:45, email:anton@example.com, 
Person-Objekt (name:Cesar, num:0, geschlecht:MALE, geb.:1991-08-13, age:33, email:cesar@example.com, 
Person-Objekt (name:Dora, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:dora@example.com, 
Person-Objekt (name:Excelsius, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:emil@example.com, 
Person-Objekt (name:Hakaan, num:0, geschlecht:MALE, geb.:1964-12-10, age:60, email:emil@example.com, 
Person-Objekt (name:Torben, num:0, geschlecht:MALE, geb.:1969-09-10, age:55, email:torben@example.com], UNDEFINED=[
Person-Objekt (name:Neutrum, num:0, geschlecht:UNDEFINED, geb.:2000-09-12, age:24, email:neutrum@example.com]}

Beide Varianten

Das Beispiel zeigt beide Varianten, wie man sich sequentiell eine Map und parallel eine ConcurrentMap beschaffen kann.

Parallele Datenverarbeitung bei Maps

Dieses Beispiel verwendet die ConcurrentMap. .groupingByConcurrent(..) ist performanter als die normale groupingBy Operation, wenn es um eine parallele Verarbeitung mit .parallelStream() geht.

Das gilt sinngemäß auch für die Verwendung von Collectors.toConcurrentMap()

Das Vereinen (engl. merge) von zweiter Map Collections basierend auf dem jeweiligen Schlüssel (engl. Key) wird für die parallele Verarbeitung optimiert.

Concurrent Reduction

JAVA führt eine sogenannte ConcurrentReduction dann durch, wenn für die aktuelle Pipeline eine Reihe von Bedinungen erfüllt sind.

parallele Datenverarbeitung

Der Stream ist parallel. Dieses wird über Collection.parallelStream() bewirkt.

Collector.Characteristics.CONCURRENT

Der characteristic-Parameter von collect(characteristic,..) hat den einen Collector.Characteristics.CONCURRENT [4] Wert. Will man feststellen, welche Charakteristik ein java.util.function.Collector hat, ruft man die Collector.characteristics() Methode.

Collector.Characteristics [5] ist eine Enumeration in java.util.stream.Collector.Characteristics.

Stream ungeordnet oder Collector mit ungeordneter Characteristic

Entweder der Stream ist unordnet (engl.unordered) oder aber der Collector hat die Characteristic Collector.Characteristics.UNORDERED.

Will man sicherstellen, dass ein Stream ungeordnet sein soll, so kann man die BaseStream.unordered() Operation, BaseStream.unordered() [6] , aufrufen.

Geordnete Collections sind für Concurrent Reduction Verfahren ein Problem

Immer dann, wenn eine Reihenfolge oder Ordnung erhalten werden soll, muss man einen anderen Weg nutzen.

Ordered Sequentual

Parallel Computing | Ordered Sequentual Streams

Auch bei einer parallelen Verarbeitung lässt sich die Reihenfolge in Collections während der Verarbeitung sicherstellen. Hierbei gilt es aber Voraussetzungen zu schaffen.

In diesem Beispiel zeigen wir, wie man Arrays über Collections.sort(..) in Verbindung mit einem Comparator neu sortieren kann. Wir sortieren hier Integer-Werte und zeigen, wie wir über Methodenreferenzen auf Funktionale Interfaces und Methoden diese Sortierung übergeben können.

Beispiel



package com.stuelken.java.b1.collections.parallelism;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

// @formatter:off	
// @formatter:on				

public class Parallel03_Sequential {

	public static void main(String[] args) {

		Integer[] intArray = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; // Eine geordnete Liste von int-Werten.

		List<Integer> zahlenListOfInteger = // Liste als Collection
		 new ArrayList<>(Arrays.asList(intArray)); // Umlesen des Array in ArrayList

		Integer[] arrayClone = intArray.clone();
		List<Integer> zahlenListOfInteger_Clone = new ArrayList<>(Arrays.asList(arrayClone));

		/*
		 * Sequentiell mit .stream().forEach(..)
		 */
		System.out.println("\nlistOfIntegers:");
		zahlenListOfInteger //
		 .stream() //
		 .forEach(e -> System.out.print(e + " ")); //
		System.out.println("");

		/*
		 * Methoden-Referenzen definieren
		 */
		System.out.println("\\nlistOfIntegers sorted in reverse order:");

		Comparator<Integer> normalComparator = Integer::compare; // Methoden_Referenz auf java.lang.Integer.compare
		Comparator<Integer> reversedComparator = normalComparator.reversed(); // Methodenreferenz auf
		 // java.util.Comparator.reversed

		System.out.println("\nSEQUENTIELL");

		/*
		 * Sortiertierte Ausgabe umgekehrt, ohne die Collection zu verändern
		 */
		Collections.sort(zahlenListOfInteger, reversedComparator); // Collection.sort(
		zahlenListOfInteger //
		 .stream() //
		 .forEach(e -> System.out.print(e + " ")); //
		System.out.println("");

	}

}
// @formatter:off				
/*

listOfIntegers:
0 1 2 3 4 5 6 7 8 9 
\nlistOfIntegers sorted in reverse order:

SEQUENTIELL
9 8 7 6 5 4 3 2 1 0 
*/

 
listOfIntegers:
0 1 2 3 4 5 6 7 8 9 
\nlistOfIntegers sorted in reverse order:


SEQUENTIELL
9 8 7 6 5 4 3 2 1 0 


PARALLEL
Parallel stream : 3 4 1 0 2 8 7 5 9 6 
Another parallel stream: 7 8 3 4 1 5 6 0 2 9 
With forEachOrdered (>): 9 8 7 6 5 4 3 2 1 0 


PARALLEL (Clone) zahlenListOfInteger_Clone 
Parallel stream : 6 5 2 4 3 9 7 8 0 1 
Another parallel stream: 2 7 8 9 5 1 4 6 0 3 
With forEachOrdered (>): 0 1 2 3 4 5 6 7 8 9 

stream() ist sequentiell

Hier gibt es keine Überraschungen.

Umgekehrte Reihenfolge

Collections.sort(..) verändert hier das Original, dh. die ArrayList von int-Werten, welche wir über das List-Interfaces adressieren, hat anschließend alle Zahlen in der umgekehrte Reihenfolge erfasst.

Sequentielle Verarbeitung mit «forEach(..)»

Die sequentielle Verarbeitung über .stream() und .foreach(consumer) macht genau das, was diese Operation tun sol..

Der Comparator of Integer wird hier über eine Methoden-Referenz auf .comparator() der Wrapperklasse für int-Werte, Integer beschafft.

Da jeder Comparator eine Reihe bestimmter Methoden wie mitunter .reverse() bietet, verfügt auch der von Integer.comparor erhalte Comparator über diese Methode.

Ordered Parallel

Parallel Computing | Ordered Parallel

Auch bei einer parallelen Verarbeitung lässt sich die Reihenfolge in Collections während der Verarbeitung sicherstellen. Hierbei gilt es aber Voraussetzungen zu schaffen, denn mit der üblichen forEach(..) Methode funktioniert es nicht.

In diesem Beispiel zeigen wir, wie man Arrays über Collections.sort(..) in Verbindung mit einem Comparator neu sortieren kann. Wir sortieren hier Integer-Werte und zeigen, wie wir über Methodenreferenzen auf Funktionale Interfaces und Methoden diese Sortierung übergeben können.

Beispiel



package com.stuelken.java.b1.collections.parallelism;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

// @formatter:off	
// @formatter:on				

public class Parallel03_Sequential {

	public static void main(String[] args) {

		Integer[] intArray = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; // Eine geordnete Liste von int-Werten.

		List<Integer> zahlenListOfInteger = // Liste als Collection
		 new ArrayList<>(Arrays.asList(intArray)); // Umlesen des Array in ArrayList

		Integer[] arrayClone = intArray.clone();
		List<Integer> zahlenListOfInteger_Clone = new ArrayList<>(Arrays.asList(arrayClone));

		/*
		 * Sequentiell mit .stream().forEach(..)
		 */
		System.out.println("\nlistOfIntegers:");
		zahlenListOfInteger //
		 .stream() //
		 .forEach(e -> System.out.print(e + " ")); //
		System.out.println("");

		/*
		 * Methoden-Referenzen definieren
		 */
		System.out.println("\\nlistOfIntegers sorted in reverse order:");

		Comparator<Integer> normalComparator = Integer::compare; // Methoden_Referenz auf java.lang.Integer.compare
		Comparator<Integer> reversedComparator = normalComparator.reversed(); // Methodenreferenz auf
		 // java.util.Comparator.reversed

		System.out.println("\nSEQUENTIELL");

		/*
		 * Sortiertierte Ausgabe umgekehrt, ohne die Collection zu verändern
		 */
		Collections.sort(zahlenListOfInteger, reversedComparator); // Collection.sort(
		zahlenListOfInteger //
		 .stream() //
		 .forEach(e -> System.out.print(e + " ")); //
		System.out.println("");

	}

}
// @formatter:off				
/*

listOfIntegers:
0 1 2 3 4 5 6 7 8 9 
\nlistOfIntegers sorted in reverse order:

SEQUENTIELL
9 8 7 6 5 4 3 2 1 0 
*/

 
listOfIntegers:
0 1 2 3 4 5 6 7 8 9 
\nlistOfIntegers sorted in reverse order:


SEQUENTIELL
9 8 7 6 5 4 3 2 1 0 


PARALLEL
Parallel stream : 3 4 1 0 2 8 7 5 9 6 
Another parallel stream: 7 8 3 4 1 5 6 0 2 9 
With forEachOrdered (>): 9 8 7 6 5 4 3 2 1 0 


PARALLEL (Clone) zahlenListOfInteger_Clone 
Parallel stream : 6 5 2 4 3 9 7 8 0 1 
Another parallel stream: 2 7 8 9 5 1 4 6 0 3 
With forEachOrdered (>): 0 1 2 3 4 5 6 7 8 9 

stream() ist sequentiell

Hier gibt es keine Überraschungen.

Umgekehrte Reihenfolge

Collections.sort(..) verändert hier das Original, dh. die ArrayList von int-Werten, welche wir über das List-Interfaces adressieren, hat anschließend alle Zahlen in der umgekehrte Reihenfolge erfasst.

Parallele Verarbeitung «forEachOrdered()»

Wir können auch bei einer parallen Verarbeitung die Ordnung erhalten, indem nicht forEach(..) sondern forEachOrdered(..) nutzen. Damit das allerdings funktionieren kann, sind ein paar Voraussetzungen erforderlich.

parallele Datenverarbeitung

Der Stream ist parallel. Dieses wird über Collection.parallelStream() bewirkt.

Collector.Characteristics.CONCURRENT

Der characteristic-Parameter von collect(characteristic,..) hat den einen Collector.Characteristics.CONCURRENT [7] Wert.

Will man feststellen, welche Charakteristik ein java.util.function.Collector hat, ruft man die Collector.characteristics() Methode.

Collector.Characteristics [8] ist eine Enumeration in java.util.stream.Collector.Characteristics.

Stream ungeordnet oder Collector mit ungeordneter Characteristic

Entweder der Stream ist ungeordnet (engl.unordered) oder aber der Collector hat die Characteristic Collector.Characteristics.UNORDERED.

Will man sicherstellen, dass ein Stream ungeordnet sein soll, so kann man die BaseStream.unordered() Operation, BaseStream.unordered() [9] , aufrufen.

Geordnete Collections sind für Concurrent Reduction Verfahren ein Problem

Immer dann, wenn eine Reihenfolge oder Ordnung erhalten werden soll, muss man einen anderen Weg nutzen.

Side Effects

Parallel Computing | Seiteneffekte

Seiteneffekte, Side Effects

Seitenffekte

Wann hat eine Methode/Ausruck einen Seiteneffekt?

Eine Methode oder ein Ausdruck hat einen Seiteneffekt, wenn er neben der Rückgabe bzw. Erzeugung eines Wertes auch den Zustand der Daten im Computer verändert.

Beispiele hierfür sind veränderbare Reduktionen (engl. Mutable Reductions), dh. Operationen, welche die collect-Operation verwenden.

Wann hat eine Methode/Ausruck einen Seiteneffekt?

Die Methode .collect(..) kann Site Effects bewirken. Sie wird im Kapitel de/uio3-docs/java/java-c2/collection-reduction erläutert.

Der Zugriff auf Daten während der Verarbeitung ist ein Seiteneffekt

Der Zugriff auf Daten während der Verarbeitung ist ein Seiteneffekt

JAVA API .parallelStream.collect(..) ist sicherer

Relativ sicher

Eine Reduzierung der Elemente des Streams auf ein Ergebnis mit der collect(..) Methode im Falle von .parallelStream() beinhandelt mögliche Seiteneffekte sehr gut.

Das bedeutet sinngemäß: Veränderungen an den Daten führen zu einer Exception.

Wert beschaffen

Man sollte aber immer Vorsicht walten lassen. In einer parallelen Verarbeitung kann es sein, dass ein übergebener Lambda-Ausdruck in mehreren Threads gleichzeitig ausgeführt wird.

Wert beschaffen

Man sollte aber immer Vorsicht walten lassen. In einer parallelen Verarbeitung kann es sein, dass ein übergebener Lambda-Ausdruck in mehreren Threads gleichzeitig ausgeführt wird.

Beispiele

Die nachfolgenden Abschnitte behandeln Interferencen und zustandsbehaftete Lambda-Ausdrücke, Statefull-Lambda-Expressions. In beiden Fällen können Seiteneffekte entstehen und damit zu inkonsistenten und unvorhersagbaren Ergebnissen führen.

Dies gilt insbesondere für parallele Datenverarbeitung mit .parallelStream().

Anmerkung: Das gilt auch dann, wenn man stattdessen mit Threads arbeitet. Die Packages und Klassen sind zwar erst einmal dann andere, aber das Prinzip ist das selbe.

Lazyness

Parallel Computing | Lazyness

Alle Zwischenoperationen in der sogenannten Pipeline auf einem Stream sind lazy. Dieses Prinzip man auch als Lazyness.

Ein Ausdruck, eine Methode oder ein Algorithmus ist lazy, wenn sein Wert nur dann berechnet wird, wenn er benötigt wird. (Ein Algorithmus ist eager, wenn er sofort ausgewertet oder verarbeitet wird.

Zwischenoperationen sind lazy, weil die Verarbeitung des Inhalts eines Streams erst dann beginnt, wenn die abschließende (terminal) Operation startet.

Durch das lazy Verarbeiten von Streams können der Java-Compiler und die Laufzeitumgebung die Stream-Verarbeitung optimal gestalten.

Siehe Terminal Operation, Aggregate-Operations, reduce(), collect()


Zum Beispiel könnte in einer Pipeline wie im Abschnitt Aggregate Operations am Beispiel filter-mapToInt-average die average-Operation zunächst einige Ganzzahlen aus dem von mapToInt erzeugten Stream übernehmen (der seinerseits Elemente aus der filter-Operation liefert). Dieser Vorgang wird solange wiederholt, bis alle erforderlichen Elemente aus dem Stream bezogen wurden, und erst danach wird der Durchschnitt berechnet.

Dieses Beispiel verkettet die in listOfStrings enthaltenen Zeichenketten zu einem Optional <String>-Wert mittels der reduce(..)-Operation, welche eine Terminal-Operation ist.

Allerdings ruft die Pipeline hier die Zwischenoperation peek auf, die versucht, während der Ausführung der Pipeline ein neues Element zu listOfStrings hinzuzufügen.




/*
Dieses Beispiel ist im Kapitel Aggregation-Operation enthalten.
Es fehlt aktuell allerdings der Link und/oder die Referenzierung.
*/


 


/* Beispiel fehlt. */

...

Das bedeutet, dass die Pipeline in diesem Beispiel erst gestartet wird, wenn die get-Operation aufgerufen wird, und die Ausführung erst endet, wenn die get-Operation abgeschlossen ist.

Interference

Interference

Parallel Computing | Interference

Intermedia Operations werden in der parallelen Datenverarbeitung infolge von .parallelStream() in einer völlig zufälligen, nicht-deteriministischen Reihenfolge verarbeitet. Jede Veränderung oder bereits ein Zugriff zur Ausgabe kann zu Interferenzen führen.



package com.stuelken.java.b1.collections.parallelism;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @author t2m
 */
public class Parallel07_SideEffects_Interference {

	public static void main(String[] args) {

		try { // (1) Wirft ConcurrentModificationException aus 
			List<String> listOfStrings = new ArrayList<>( // (2)
			 Arrays.asList("eins", "zwei", "drei")); // (3)

			// This will fail as the peek operation will attempt to add the
			// string "three" to the source after the terminal operation has
			// commenced.

			String concatenatedString = listOfStrings.stream() // (4)

			 .peek(s -> listOfStrings.add("vier")) // (5) Interferenz!!!

			 .reduce((a, b) -> a + " " + b).get(); // (6)

			System.out.println("Concatenated string: " + concatenatedString); // (7)

		} catch (Exception e) { // (8)
			System.out.println("Exception caught: " + e.toString()); // (9)
		}
	}

}

/*
 * Output: Exception caught: java.util.ConcurrentModificationException
 */


 
/*
 * Output: Exception caught: java.util.ConcurrentModificationException
 */

Direkt eine ArrayList erzeugen

Die Arrays Klasse bietet diverse statische praktische Methoden rund um das Arbeiten mit Arrays.

Das ist hier unzulässig

Die .peek Methode sorgt hier dafür, dass wir versuchen, dem Stream noch eine Zahl "vier" hinzufügen. .peek(comsumer) liefert neuen Stream und verändert den bisherigen.

Veränderungen werden unterbunden

Der praktische Aspekt dieser Ausnahme besteht darin, dass wir diesen Fehler kontrolliert abfangen können, allerdings beendet dieser Fehler auch die Durchführung der Verarbeitung.

... ...

Zwischenoperationen können zwar mal filtern oder mappen, aber Sie sollten dennoch nicht die Datenkonsistenz verändern.

Interference

Statefull-Lambda-Expressions

Parallel Computing | Statefull-Lambda-Expressions

Vermeide Sie es, zustandsbehaftete Lambda-Ausdrücke als Parameter in Stream-Operationen zu verwenden.

Ein zustandsbehafteter Lambda-Ausdruck ist einer, dessen Ergebnis von einem Zustand abhängt, der sich während der Ausführung einer Pipeline ändern kann.


Das folgende Beispiel fügt mit der Zwischenoperation map Elemente aus der Liste listOfIntegers zu einer neuen List-Instanz hinzu. Dies wird zweimal durchgeführt: zuerst mit einem seriellen Stream und danach mit einem parallelen Stream.


Die Operation forEachOrdered verarbeitet Elemente in der Reihenfolge, die im Stream festgelegt ist – unabhängig davon, ob der Stream seriell oder parallel ausgeführt wird.

Wird ein Stream jedoch parallel ausgeführt, verarbeitet die map-Operation die Elemente gemäß der Vorgaben der Java-Laufzeitumgebung und des Compilers.

Folglich kann sich die Reihenfolge, in der der Lambda-Ausdruck e -> { parallelStorage.add(e); return e; } Elemente zur Liste parallelStorage hinzufügt, bei jedem Programmdurchlauf ändern.

Für deterministische und vorhersehbare Ergebnisse sollte man darauf achten, dass Lambda-Ausdrücke in Stream-Operationen keinen zustandsbehafteten Code enthalten.

Hinweis: In diesem Beispiel wird die Methode synchronizedList aufgerufen, sodass die Liste parallelStorage threadsicher ist.

Bedenken Sie, dass Collections standardmäßig nicht threadsicher sind – das bedeutet, dass mehrere Threads nicht gleichzeitig auf dieselbe Collection zugreifen sollten.

Wenn wir beim Erzeugen von parallelStorage nicht die Methode synchronizedList aufrufen, List<Integer> parallelStorage = new ArrayList<>(); > > > verhält sich das Beispiel unvorhersehbar, weil mehrere Threads gleichzeitig auf parallelStorage zugreifen und diese modifizieren, ohne dass eine Synchronisierung erfolgt.




package com.stuelken.java.b1.collections.parallelism; // (1)

import java.lang.Integer; // (2)
import java.util.ArrayList; // (3)
import java.util.Arrays; // (4)
import java.util.Collections; // (5)
import java.util.List; // (6)

/**
 * 
 * @author t2m
 *
 */
public class Parallel08_SideEffects_StatefullLambdaExpressions { // (7)
	public static void main(String[] args) { // (8)
		
		
		List<Integer> serialStorage = new ArrayList<>(); // (9) Container für Werte
		Integer[] intArray = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; // (10) Werte
		List<Integer> zahlenListOfInteger = new ArrayList<>(Arrays.asList(intArray)); // (11) Liste
		
		/*
		 * ===
		 */
		
		System.out.println("(1) seriell: " // 
				+ "\nstream().map(e->serialStorage.add(e);return(e)}).foreachOrdered(..): "); // (12)
		zahlenListOfInteger.stream() // (13) Sequenzieller Durchlauf
		 .map(e -> { // (14)
			 System.out.println(" .stream().map(e ) mit e:" + e); // (15) Debugging
			 serialStorage.add(e); // (16) Seriellen Storage Werte der ArrayList geordnet geben.
			 return e; // (17)
		 }) // (18)
		 .forEachOrdered(e -> System.out.print(e + " ")); // (19)
		
		System.out.println("\n"); // (20)
		serialStorage.stream().forEachOrdered(e -> System.out.print(e + " ")); // (21) Geordnet.
		System.out.println("\n"); // (22)
		
		/*
		 * ===
		 */
		
		System.out.println("(2) Parallel stream "// 
				+ "\nmit parallelStorage, Collections.synchronizedList(..):"); // (23)
		
		// WICHTIG: Thread-Sichere Liste erzeugen
		List<Integer> parallelStorage = Collections.synchronizedList(new ArrayList<>()); // (24)
		zahlenListOfInteger.parallelStream() // (25) Paralleler Stream, nebenläufig
		 .map(e -> { // (26) 
			 parallelStorage.add(e); // (27) Reihenfolge ist NICHT deteriministisch, also zufällig 
			 return e; // (28)
		 }) // (29)
		 .forEachOrdered(e -> System.out.print(e + " ")); // (30)
		System.out.println(""); // (31)

		/*
		 * ===
		 */
		
		System.out.println("\n"); // (32)
		System.out.println("(3) parallelStrorage mit .stream().foreachOrdered(..):"); // (33)
		parallelStorage.stream().forEachOrdered(e -> System.out.print(e + " ")); // (34)
		System.out.println(""); // (35)
	} // (36)
} // (37)


 
(1) seriell: 
stream().map(e->serialStorage.add(e);return(e)}).foreachOrdered(..): 
 .stream().map(e ) mit e:0
0 .stream().map(e ) mit e:1
1 .stream().map(e ) mit e:2
2 .stream().map(e ) mit e:3
3 .stream().map(e ) mit e:4
4 .stream().map(e ) mit e:5
5 .stream().map(e ) mit e:6
6 .stream().map(e ) mit e:7
7 .stream().map(e ) mit e:8
8 .stream().map(e ) mit e:9
9 

0 1 2 3 4 5 6 7 8 9 

(2) Parallel stream mit parallelStorage, Collections.synchronizedList(..):
0 1 2 3 4 5 6 7 8 9 


(3) parallelStrorage mit .stream().foreachOrdered(..):
8 1 2 6 9 7 5 0 3 4 

Sychronisiert für parallele Verarbeitung.

Wichtig zum Verständnis: Normalerweise sind Collections NICHT synchronisiert. Um also eine Collection thread-sicher verarbeiten zu können, müssen wir diese in eine synchronisierte Fassung übersetzen.

Dieses Pattern sorgt für die synchronisierte Verarbeitung

Wir können also parallel arbeiten, ... ABER ...

Die Reihenfolge geht hier verloren

Gibt man den parallelStorage also mit .stream() mal in der Reihenfolge aus, die wir dort erfasst haben, kann man gut erkennen, dass wir zwar die Elemente geordnet im .parallelStream() letzendlich ausgeben, aber WÄHREND der Verarbeitung mit der Intermedia Operation map wir eine völlig willkürliche Reihenfolge haben.

Collection.synchronizedList(c) wandelt eine andere Collection wie eine ArrayList für Integer in eine List<Integer> um, womit diese Collecion thread-sicher verrabeitet werden kann.

Die Reihenfolge bei Intermedia Operations wie .map(..) ist aber innerhalb der Pipeline im Falle von .parallelStream() weiterhin völlig willkürlich.

Links

https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html

Quellen, Notes, Tags


  • [1]↑ JAVA: Programmiersprache
  • [2]↑ JAVASCRIPT: Programmiersprache, nicht JAVA
  • [3]↑ java.util.stream.BaseStream: https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.htm
  • [4]↑ Collector.Characteristics.CONCURRENT : https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html#CONCURRENT
  • [5]↑ Collector.Characteristics: https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html
  • [6]↑ BaseStream.unordered(): https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#unordered--
  • [7]↑ Collector.Characteristics.CONCURRENT : https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html#CONCURRENT
  • [8]↑ Collector.Characteristics: https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html
  • [9]↑ BaseStream.unordered(): https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#unordered--

UI ORGANIZED.

UIO3 Es ist einfacher als Du denkst.

Stelle noch heute Deine Anfrage.

uio--WebPageFooter-Module