- Overview
- Intro
- Parallel Computing | Executing Streams in Parallel
- Parallel Computing | StreamParallel01
- Parallel Computing | Concurrent Reduction
- Parallel Computing | StreamParallel02
- Parallel Computing | Ordered Sequentual
- Parallel Computing | Ordered Parallel
- Parallel Computing | Side Effects
- Parallel Computing | Lazyness
- Parallel Computing | Interference
- Parallel Computing | Statefull-Lambda-Expressions
- Links
JAVA-C2
Collection Advanced
Collection Advanced
Problemstellung
Diagramm
Concurrent Collection Interfaces & Classes
Thread-sichere Lists
Collection.synchronizedList()
CopyOnWriteArrayList (@Bsp?)
Thread-sichere Sets
Collection.synchronizedSet()
CopyOnWriteArraySet
Thread-sichere Queues
BlockingQueue
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
Nicht-serialisierbare Queues
SynchronousQueue
DelayQueue
Thread-sichere Maps
Collection.synchronizedMap(..)
ConcurrentHashMap
Aggregate Operations
Streams & Aggregate Operations (Overview)
@! 15 Page Section Links fehlen!
Person ExampleData
Aggregate Operations @!section?
stream() @!section?
for-Loop
for-Loop
stream() und forEach()
Streams und Pipelines
Filter und Predicates
EXAMPLE 03 Stream/Predicate/Consumer
EXAMPLE 04 Consumer mit Generic Type Parameter
EXAMPLE 05 Predicates/Enums/Collection
"Pipelines"
EXAMPLE 06 Mapping to new Type
EX6 Summary
maptTOInt, ToIntFunction, IntStream.average() OptionalDouble
EXAMPLE 07 Map Key-Value-Array zu LinkedHashMap
Stream.mapToObj(k,v)
collect(Collectors.toMap(a,b,c,d)
Map.entry(k,v)
map(*).findFirst().orElse(*)
Map.getOrDefault(..)
Collection Reduction
stream.reduce
Collection Collecting
Collection stream.collect
@! Kapitel überprüfen!@! Java-Tier von B4 auf B2 verschoben!@! Alle Links/Referenzen checken!
Collection-Parallelism
Collection Stream API ab JAVA 8
.parallelStream()
.parallelStream()
.parallelStream().mapToInt(*).average).getAsDouble()
Typ OptionalDouble
Begriff Fork
Begriff Join
Executing Streams in Parallel (01)
Concurrent Reduction (02)
Ordered Sequentiell (03)
Ordered Parallel (04)
Side Effects
Parallel Computing "Lazyness" (06)
Interference (07)
State-Full-Lambda Expressions (08)
zustandsbehaftete Lambda-Ausdrücke (08)
Collections, Streams & AlgorithmsCollection Streams (Overview) UIO SimulateDownloadDataProvider** connectSimulated():ListINTFilter (prime, schnapps, quer, mod, Operationsfilter, map, sorted, collect, reduce, peek, limit
(Collection FAQ)
Datenquellen zu Streams konvertieren
Array-Stream
Collection-Stream
Map-Stream
Primitive-Stream
Generatoren-Stream
Iteratoren-Stream
(Collection Implementations)
(Collection Algorithms)
Overview
Parallel Computing | Overview
Parallel Computing als parallele Datenverarbeitung gibt es in JAVA inzwischen in mehreren Varianten. Wir können herkömmlich Threads und Runnables anlegen oder aber auch das JAVA Fork-/Join-Framework von JAVA Concurrency verwenden, welches für viele Problemstellungen standartisierte Ansätze bietet.
Intro
Parallel Computing | Intro
Bevor wir uns mit den jeweiligen Beispielen zum Thema befassen ist es anzuraten, im Überblick zuerst ein paar Begriffe und Problemstellungen rund um nebenläufige Verarbeitung von Daten zu kennen.
Diese Probleme in nebenläufiger Verarbeitung von Daten, als Parallel Computing bezeichnet, beschränken sich nicht nur auf JAVA [1] sondern gelten für alle Programmiersprachen, welche mehrere Prozessoren und damit echtes Multi-Processing wie mitunter auch C++ oder C# ermöglichen. JAVASCRIPT [2] ist bei bislang fast allen Runtimes für JAVA im Browser oder mit NodeJS auf Serverseite letztendlich Single-Threaded, dh. auch wenn ein Rechner mehrere Prozessoren hat, werden nebenläufige Operationen in JavaScript auf dem selben Prozessor atomar zerlegt, erfordern aber keine Synchronisierung wie in JAVA und C#, weil es in JS keinen parallel Zugriff auf Code-Segmente oder Variablen geben kann, in JAVA und C# aber durchaus.
parallele Datenverarbeitung
Parallel-Computing dt. parallele Datenverarbeitung, bedeutet, dass man eine Problemstellung dahingehend in Teilprobleme gliedert, welche anschließend in separaten Threads gleichzeitig im Sinne von «zeitlich parallel» ausgeführt werden.
Trennen und Zusammenfügen des Programmverlaufs
Infolge der Aufteilung der Verarbeitung von Problemstellungen in nebenläufige (engl.) Threads (eigentlich dt. «Faden») , dem sogenannten (engl.) Fork, besteht nun wiederum die Problemstellung, dass das Gesamtergebnis dieser Verarbeitung nur dann möglich ist, wenn man diese nebenläufig auf mehreren Prozessoren ablaufenden Sequenzen von Programmschritten an irgendeinem Punkt, Join bezeichnet, kontrolliert wieder zusammenführen kann.
JAVA verfügt über genau ein solches Fork-Join-Framework Framework.
Trennen und Zusammenfügen des Programmverlaufs
Infolge der Aufteilung der Verarbeitung von Problemstellungen in nebenläufige (engl.) Threads (eigentlich dt. «Faden») , dem sogenannten (engl.) Fork, besteht nun wiederum die Problemstellung, dass das Gesamtergebnis dieser Verarbeitung nur dann möglich ist, wenn man diese nebenläufig auf mehreren Prozessoren ablaufenden Sequenzen von Programmschritten an irgendeinem Punkt, Join bezeichnet, kontrolliert wieder zusammenführen kann.
Die Runtime übernimmt bei Aggregation-Operations die Kontrolle
Dieses Fork-Join-Framework kann die Einführung der parallelen Datenverarbeitung in Programmen stark vereinfachen. Man muss hierzu aber vorgeben, wie denn die Teilprobleme parallel verarbeitet und dahingehend zuerst einmal geteilt werden müssen und wie diese später wieder zusammengeführt werden.
Gegenüber einer herkömmlichen Programmierung von
Thread Instanzen
oder eben Klassen, die das
Runnable
Interface implementiert haben, und hierbei
der Entwickler die Kontrolle über das Entstehen
und das Beenden von Threads hatte, wird bei Verwendung
des Frameworks diese Kontrolle über die Runtime übernommen.
Thread-Interferenzen führen Daten-Inkonsistenzen
Wenn mehrere Threads zeitlich parallel auf die gleiche Collection zugreifen, kann es Fehler bei der Speicher-Konsistenz geben.
Mit Hilfe von synchronized { } Blöcken
oder synchronized (variable) { .. }
kann man zwar den zeitlichen Zugriff auf Programmcode-Sequenzen
und Variablen durch mehrere Prozessoren auf stets nur 1
Prozess limitieren.
Wenn die Verarbeitung von Daten in Collections sich in Abhängigkeit von der Reihenfolge, in welcher diese Arbeitsschritte durchgeführt werden, im Ergebnis sich verändert, führt das zu einem Sideeffect, dt. Nebeneffekt.
Thread-Interferenzen führen Daten-Inkonsistenzen
Das Collection-Framework stellt Synchronisations-Wrapper bereit. Mit diesen lässt sich einer jeden Collection eine «automatische Synchronisation» hinzügen.
Damit werden diese Collections thread-safe bzw. thread-sicher.
Dieses Verfahren kann dennoch aber zu Problemem führen: Threads können sich gegenseitig blockieren.
Thread-Blocking Konflikte
Wenn Threads auf den gleichen mit synchronized
synchronisierten Code-Block oder eine Variable zugreifen wollen, können
sich Threads gegenseitig blockieren, wenn Thread A in der Bearbeitung des Blocks a
auf die Variable b wartet, während Thread B im Block b gerade darauf wartet,
den Zugriff auf den Block a zu bekommen.
Dieses Prinzip kennt man schlussendlich auch im Straßenverkehr: Wenn in einer Straße mit zwei Richtungen ein Engpass besteht, durch den nur ein Fahrzeug passt, gilt im Straßenverkehr die Regel, dass derjenige warten möge, auf dessen Seite das Hindernis steht.
Besteht das Hindernis aber an beiden Seiten, hat keine Seite ein Vorrecht, dh. jeder kann seinen eigenen Bereich für den Zugang nur dann freigeben, wenn der jeweils andere seinen Bereich verlässt. Da dieses hier aber nicht möglich ist, blockieren sich nicht nur Fahrzeuge an Engpässen sondern auch Threads in der Programmierung.
parallelStream() mit Aggregation-Operations
Unter der Voraussetzung, dass eine Collection, die in einem Stream
über .parallelStream() nebenläufig
(statt sequenziell mit .stream()
verarbeitet wird, in diesem Zeitraum NICHT
verändert wird, ermöglichen die Aggregation-Operations eine
nebenläufige Verarbeitung von Daten und verhindern hierbei
das Thread-Blocking.
Serielle Ausführung kann schneller sein.
Die Voraussetzung dafür, dass eine parallele Verarbeitung schneller ist als eine sequenzielle herkömmliche Verarbeitung, besteht in der Verfügbarkeit mehrerer ProzessorKerne und auch einer gewissen Datenmenge.
Das über die Aggregation-Operations automatisierte Erzeugen von Threads, das Aufspalten in Thread-Forks und das Zusammenführen über Thread-Joins erfordert Zeit für das Anlegen von Objekten im Arbeitsspeicher.
Parallel Stream
Parallel Computing | Executing Streams in Parallel / StreamParallel01
Wir verwenden hier .parallelStream()
mit .filter(..) für die Reduzierung
der Elemente im Stream und anschließender üblicher Ermittlung
des Durchschnitts über IntStream.average()
als Aggregate-Operation.
In diesem Beispiel berechnen wir in paralleler Ausführung das Durschnittsalter der Frauen, Männer und derer, die keines von beiden sind.
Beispiel
package com.stuelken.java.b1.collections.parallelism;
import java.util.List;
import com.stuelken.java.b2.exampledata.Person;
public class Parallel01 {
public static void main(String[] args) {
/**
*
*/
List<Person> gruppe = Person.createGruppe();
// @formatter:off
double durchschnittsalterFrauen = gruppe
.parallelStream() // (1) Stream<Person>
.filter(p -> p.getGender() == Person.Sex.FEMALE) // (2) filter(Predicate<Person> predicate)
.mapToInt(Person::getAge) // (3) mapToInt(mapper): Stream<Integer)
.average() // (4) Aggregate-Operation IntStream.average(): OptionalDouble
.getAsDouble(); // (5) OptionalDouble.getAsDouble()
System.out.println("Frauen:"+durchschnittsalterFrauen);
double durchschnittsalterMannen = gruppe
.parallelStream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();
System.out.println("Mannen:"+durchschnittsalterMannen);
double durchschnittsalterAndere = gruppe
.parallelStream()
.filter(p -> p.getGender() == Person.Sex.UNDEFINED)
.mapToInt(Person::getAge)
.average()
.getAsDouble();
System.out.println("Andere:"+durchschnittsalterAndere);
// @formatter:on
}
}
// @formatter:off
/*
22.0
40.166666666666664
24.0
*/
22.0 (Beispiel)
40.166666666666664
24.0
Stream T für parallele Verarbeitung
Mit Einführung der Stream-API und des JAVA Concurrency-Frameworks
kam über die sequentielle Verarbeitung von Streams über
.stream()
auch Methode .parallelStream
hinzu.
Auch hier wird ein Stream<T> bzw.
Stream<Person> erzeugt.
Die nebenläufige Verarbeitung erfolgt hierbei automatisch über die Runtime. Wir brauchen also nicht mehr manuell Threads erzeugen.
Das Mappen auf einen anderen Typ
Erzeugt wird hier eine zweite Collection für Integer-Werte, dh. für jedes Element im Stream wird automatisch ein Element im zweiten Stream angelegt.
Die Besonderheit hierbei besteht darin, dass die Runtime selbst entscheidet, in welcher Reihenfolge die Elemente verarbeitet werden.
Wenn vermutlich ein double-Wert kommen wird
Die average() Methode von IntStream ist eine Aggregate-Operation und sorgt dafür, dass wir letztendlich unsere Collection, die wir da im Stream verarbeiten, schlussendlich wieder auf einen einzigen Wert zusammenfassen können.
Nun kann es aber passieren, dass die Collection keinen einzigen Wert hat; wir wissen also nicht, ob ein double-Wert herauskommt oder doch eher gar nichts. Hierfür ist OptionalDouble gedacht.
JAVADOC, Eclipse
Diese Anweisung dient nur der Formatierung des Quellcodes in Eclipse.
Executing Streams in Parallel
Man kann Streams entweder seriell oder # parallel ausführen.
Wenn ein Stream parallel ausgeführt wird, partitioniert die Java-Laufzeitumgebung den Stream in mehrere Teilstreams. Diesen Prozess bezeichnet man als Fork.
Aggregatsoperationen iterieren über diese Teilstreams, verarbeiten sie parallel und kombinieren anschließend die Ergebnisse. Dieses Zusammenfügen der parallen Prozesse bezeichnet man als Join
Standardmäßig wird ein Stream immer als serieller Stream erzeugt, sofern nicht anders angegeben.
Um einen parallelen Stream zu erstellen,
nutzt man die Operation Collection.parallelStream() Operation.
Alternativ kannst du die Methode BaseStream.parallel() verwenden,
java.util.stream.BaseStream
[3]
Concurrent Reduction
Parallel Computing | Concurrent Reduction
StreamParallel02
In diesem Beispiel berechnen wir in paralleler Ausführung das Durschnittsalter der Frauen, Männer und derer, die keines von beiden sind. Wirkt nicht wirklich spannend, ist es aber: Wir müssen nämlich nicht einen einzigen Thread manuell erzeugen, weil das alles uns die Stream-API abnimmt.
Wir verwenden hier .parallelStream()
mit .filter(..) für die Reduzierung
der Elemente im Stream und anschließender üblicher Ermittlung
des Durchschnitts über IntStream.average()
als Aggregate-Operation.
Beispiel
package com.stuelken.java.b1.collections.parallelism;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import com.stuelken.java.b2.exampledata.Person;
public class Parallel02_ConcurrentReductionToMap {
public static void main(String[] args) {
// Gruppe als Collection
List<Person> gruppe = Person.createGruppe();
// @formatter:off
// Sequentielles Verfahren mit .stream()
Map<Person.Sex, List<Person>> mapPersonenGruppiertNachGeschlecht_sequentiell =
gruppe
.stream()
.collect(
Collectors.groupingBy(Person::getGender));
// Paralleles Verfahren mit .parallelStream() und
// als Map eine java.util.concurrent.ConcurrentMap;
ConcurrentMap<Person.Sex, List<Person>> mapPersonenGruppiertNachGeschlecht_parallel =
gruppe
.parallelStream()
.collect(
Collectors.groupingByConcurrent(Person::getGender));
System.out.println(mapPersonenGruppiertNachGeschlecht_parallel);
System.out.println(mapPersonenGruppiertNachGeschlecht_sequentiell);
// @formatter:on
}
}
// @formatter:off
/*
{FEMALE=[
Person-Objekt (name:Ephigene, num:0, geschlecht:FEMALE, geb.:2015-01-04, age:10, email:,
Person-Objekt (name:Berta, num:0, geschlecht:FEMALE, geb.:1990-07-15, age:34, email:berta@example.com], MALE=[
Person-Objekt (name:Excelsius, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:emil@example.com,
Person-Objekt (name:Torben, num:0, geschlecht:MALE, geb.:1969-09-10, age:55, email:torben@example.com,
Person-Objekt (name:Hakaan, num:0, geschlecht:MALE, geb.:1964-12-10, age:60, email:emil@example.com,
Person-Objekt (name:Cesar, num:0, geschlecht:MALE, geb.:1991-08-13, age:33, email:cesar@example.com,
Person-Objekt (name:Anton, num:0, geschlecht:MALE, geb.:1980-06-20, age:45, email:anton@example.com,
Person-Objekt (name:Dora, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:dora@example.com], UNDEFINED=[
Person-Objekt (name:Neutrum, num:0, geschlecht:UNDEFINED, geb.:2000-09-12, age:24, email:neutrum@example.com]}
{FEMALE=[
Person-Objekt (name:Berta, num:0, geschlecht:FEMALE, geb.:1990-07-15, age:34, email:berta@example.com,
Person-Objekt (name:Ephigene, num:0, geschlecht:FEMALE, geb.:2015-01-04, age:10, email:], MALE=[
Person-Objekt (name:Anton, num:0, geschlecht:MALE, geb.:1980-06-20, age:45, email:anton@example.com,
Person-Objekt (name:Cesar, num:0, geschlecht:MALE, geb.:1991-08-13, age:33, email:cesar@example.com,
Person-Objekt (name:Dora, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:dora@example.com,
Person-Objekt (name:Excelsius, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:emil@example.com,
Person-Objekt (name:Hakaan, num:0, geschlecht:MALE, geb.:1964-12-10, age:60, email:emil@example.com,
Person-Objekt (name:Torben, num:0, geschlecht:MALE, geb.:1969-09-10, age:55, email:torben@example.com], UNDEFINED=[
Person-Objekt (name:Neutrum, num:0, geschlecht:UNDEFINED, geb.:2000-09-12, age:24, email:neutrum@example.com]}
*/
{FEMALE=[
Person-Objekt (name:Ephigene, num:0, geschlecht:FEMALE, geb.:2015-01-04, age:10, email:,
Person-Objekt (name:Berta, num:0, geschlecht:FEMALE, geb.:1990-07-15, age:34, email:berta@example.com], MALE=[
Person-Objekt (name:Excelsius, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:emil@example.com,
Person-Objekt (name:Torben, num:0, geschlecht:MALE, geb.:1969-09-10, age:55, email:torben@example.com,
Person-Objekt (name:Hakaan, num:0, geschlecht:MALE, geb.:1964-12-10, age:60, email:emil@example.com,
Person-Objekt (name:Cesar, num:0, geschlecht:MALE, geb.:1991-08-13, age:33, email:cesar@example.com,
Person-Objekt (name:Anton, num:0, geschlecht:MALE, geb.:1980-06-20, age:45, email:anton@example.com,
Person-Objekt (name:Dora, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:dora@example.com], UNDEFINED=[
Person-Objekt (name:Neutrum, num:0, geschlecht:UNDEFINED, geb.:2000-09-12, age:24, email:neutrum@example.com]}
{FEMALE=[
Person-Objekt (name:Berta, num:0, geschlecht:FEMALE, geb.:1990-07-15, age:34, email:berta@example.com,
Person-Objekt (name:Ephigene, num:0, geschlecht:FEMALE, geb.:2015-01-04, age:10, email:], MALE=[
Person-Objekt (name:Anton, num:0, geschlecht:MALE, geb.:1980-06-20, age:45, email:anton@example.com,
Person-Objekt (name:Cesar, num:0, geschlecht:MALE, geb.:1991-08-13, age:33, email:cesar@example.com,
Person-Objekt (name:Dora, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:dora@example.com,
Person-Objekt (name:Excelsius, num:0, geschlecht:MALE, geb.:2000-09-12, age:24, email:emil@example.com,
Person-Objekt (name:Hakaan, num:0, geschlecht:MALE, geb.:1964-12-10, age:60, email:emil@example.com,
Person-Objekt (name:Torben, num:0, geschlecht:MALE, geb.:1969-09-10, age:55, email:torben@example.com], UNDEFINED=[
Person-Objekt (name:Neutrum, num:0, geschlecht:UNDEFINED, geb.:2000-09-12, age:24, email:neutrum@example.com]}
Beide Varianten
Das Beispiel zeigt beide Varianten, wie man sich
sequentiell eine Map
und parallel
eine ConcurrentMap
beschaffen kann.
Parallele Datenverarbeitung bei Maps
Dieses Beispiel verwendet die ConcurrentMap.
.groupingByConcurrent(..)
ist performanter als die normale groupingBy
Operation, wenn es um eine parallele Verarbeitung
mit .parallelStream() geht.
Das gilt sinngemäß auch für die Verwendung von
Collectors.toConcurrentMap()
Das Vereinen (engl. merge) von zweiter Map
Collections basierend auf dem jeweiligen Schlüssel (engl. Key) wird
für die parallele Verarbeitung optimiert.
Concurrent Reduction
JAVA führt eine sogenannte ConcurrentReduction dann durch, wenn für die aktuelle Pipeline eine Reihe von Bedinungen erfüllt sind.
parallele Datenverarbeitung
Der Stream ist parallel. Dieses wird über Collection.parallelStream() bewirkt.
Collector.Characteristics.CONCURRENT
Der characteristic-Parameter von collect(characteristic,..)
hat den einen
Collector.Characteristics.CONCURRENT
[4]
Wert.
Will man feststellen, welche Charakteristik ein java.util.function.Collector
hat, ruft man die Collector.characteristics() Methode.
Collector.Characteristics [5] ist eine Enumeration in java.util.stream.Collector.Characteristics.
Stream ungeordnet oder Collector mit ungeordneter Characteristic
Entweder der Stream ist unordnet (engl.unordered) oder aber
der Collector hat die
Characteristic
Collector.Characteristics.UNORDERED.
Will man sicherstellen, dass ein Stream ungeordnet sein soll, so kann man die
BaseStream.unordered() Operation,
BaseStream.unordered()
[6]
, aufrufen.
Geordnete Collections sind für Concurrent Reduction Verfahren ein Problem
Immer dann, wenn eine Reihenfolge oder Ordnung erhalten werden soll, muss man einen anderen Weg nutzen.
Ordered Sequentual
Parallel Computing | Ordered Sequentual Streams
Auch bei einer parallelen Verarbeitung lässt sich die Reihenfolge in Collections während der Verarbeitung sicherstellen. Hierbei gilt es aber Voraussetzungen zu schaffen.
In diesem Beispiel zeigen wir, wie man Arrays über
Collections.sort(..) in Verbindung mit einem
Comparator neu sortieren
kann. Wir sortieren hier Integer-Werte und zeigen,
wie wir über Methodenreferenzen auf Funktionale Interfaces
und Methoden diese Sortierung übergeben können.
Beispiel
package com.stuelken.java.b1.collections.parallelism;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
// @formatter:off
// @formatter:on
public class Parallel03_Sequential {
public static void main(String[] args) {
Integer[] intArray = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; // Eine geordnete Liste von int-Werten.
List<Integer> zahlenListOfInteger = // Liste als Collection
new ArrayList<>(Arrays.asList(intArray)); // Umlesen des Array in ArrayList
Integer[] arrayClone = intArray.clone();
List<Integer> zahlenListOfInteger_Clone = new ArrayList<>(Arrays.asList(arrayClone));
/*
* Sequentiell mit .stream().forEach(..)
*/
System.out.println("\nlistOfIntegers:");
zahlenListOfInteger //
.stream() //
.forEach(e -> System.out.print(e + " ")); //
System.out.println("");
/*
* Methoden-Referenzen definieren
*/
System.out.println("\\nlistOfIntegers sorted in reverse order:");
Comparator<Integer> normalComparator = Integer::compare; // Methoden_Referenz auf java.lang.Integer.compare
Comparator<Integer> reversedComparator = normalComparator.reversed(); // Methodenreferenz auf
// java.util.Comparator.reversed
System.out.println("\nSEQUENTIELL");
/*
* Sortiertierte Ausgabe umgekehrt, ohne die Collection zu verändern
*/
Collections.sort(zahlenListOfInteger, reversedComparator); // Collection.sort(
zahlenListOfInteger //
.stream() //
.forEach(e -> System.out.print(e + " ")); //
System.out.println("");
}
}
// @formatter:off
/*
listOfIntegers:
0 1 2 3 4 5 6 7 8 9
\nlistOfIntegers sorted in reverse order:
SEQUENTIELL
9 8 7 6 5 4 3 2 1 0
*/
listOfIntegers:
0 1 2 3 4 5 6 7 8 9
\nlistOfIntegers sorted in reverse order:
SEQUENTIELL
9 8 7 6 5 4 3 2 1 0
PARALLEL
Parallel stream : 3 4 1 0 2 8 7 5 9 6
Another parallel stream: 7 8 3 4 1 5 6 0 2 9
With forEachOrdered (>): 9 8 7 6 5 4 3 2 1 0
PARALLEL (Clone) zahlenListOfInteger_Clone
Parallel stream : 6 5 2 4 3 9 7 8 0 1
Another parallel stream: 2 7 8 9 5 1 4 6 0 3
With forEachOrdered (>): 0 1 2 3 4 5 6 7 8 9
stream() ist sequentiell
Hier gibt es keine Überraschungen.
Umgekehrte Reihenfolge
Collections.sort(..) verändert hier das Original, dh. die ArrayList von int-Werten, welche wir über das List-Interfaces adressieren, hat anschließend alle Zahlen in der umgekehrte Reihenfolge erfasst.
Sequentielle Verarbeitung mit «forEach(..)»
Die sequentielle Verarbeitung über .stream() und .foreach(consumer) macht genau das, was diese Operation tun sol..
Der Comparator of Integer wird hier über eine
Methoden-Referenz auf .comparator()
der Wrapperklasse für int-Werte, Integer
beschafft.
Da jeder Comparator eine Reihe bestimmter Methoden wie mitunter .reverse() bietet, verfügt auch der von Integer.comparor erhalte Comparator über diese Methode.
Ordered Parallel
Parallel Computing | Ordered Parallel
Auch bei einer parallelen Verarbeitung lässt sich die Reihenfolge in Collections während der Verarbeitung sicherstellen. Hierbei gilt es aber Voraussetzungen zu schaffen, denn mit der üblichen forEach(..) Methode funktioniert es nicht.
In diesem Beispiel zeigen wir, wie man Arrays über
Collections.sort(..) in Verbindung mit einem
Comparator neu sortieren
kann. Wir sortieren hier Integer-Werte und zeigen,
wie wir über Methodenreferenzen auf Funktionale Interfaces
und Methoden diese Sortierung übergeben können.
Beispiel
package com.stuelken.java.b1.collections.parallelism;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
// @formatter:off
// @formatter:on
public class Parallel03_Sequential {
public static void main(String[] args) {
Integer[] intArray = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; // Eine geordnete Liste von int-Werten.
List<Integer> zahlenListOfInteger = // Liste als Collection
new ArrayList<>(Arrays.asList(intArray)); // Umlesen des Array in ArrayList
Integer[] arrayClone = intArray.clone();
List<Integer> zahlenListOfInteger_Clone = new ArrayList<>(Arrays.asList(arrayClone));
/*
* Sequentiell mit .stream().forEach(..)
*/
System.out.println("\nlistOfIntegers:");
zahlenListOfInteger //
.stream() //
.forEach(e -> System.out.print(e + " ")); //
System.out.println("");
/*
* Methoden-Referenzen definieren
*/
System.out.println("\\nlistOfIntegers sorted in reverse order:");
Comparator<Integer> normalComparator = Integer::compare; // Methoden_Referenz auf java.lang.Integer.compare
Comparator<Integer> reversedComparator = normalComparator.reversed(); // Methodenreferenz auf
// java.util.Comparator.reversed
System.out.println("\nSEQUENTIELL");
/*
* Sortiertierte Ausgabe umgekehrt, ohne die Collection zu verändern
*/
Collections.sort(zahlenListOfInteger, reversedComparator); // Collection.sort(
zahlenListOfInteger //
.stream() //
.forEach(e -> System.out.print(e + " ")); //
System.out.println("");
}
}
// @formatter:off
/*
listOfIntegers:
0 1 2 3 4 5 6 7 8 9
\nlistOfIntegers sorted in reverse order:
SEQUENTIELL
9 8 7 6 5 4 3 2 1 0
*/
listOfIntegers:
0 1 2 3 4 5 6 7 8 9
\nlistOfIntegers sorted in reverse order:
SEQUENTIELL
9 8 7 6 5 4 3 2 1 0
PARALLEL
Parallel stream : 3 4 1 0 2 8 7 5 9 6
Another parallel stream: 7 8 3 4 1 5 6 0 2 9
With forEachOrdered (>): 9 8 7 6 5 4 3 2 1 0
PARALLEL (Clone) zahlenListOfInteger_Clone
Parallel stream : 6 5 2 4 3 9 7 8 0 1
Another parallel stream: 2 7 8 9 5 1 4 6 0 3
With forEachOrdered (>): 0 1 2 3 4 5 6 7 8 9
stream() ist sequentiell
Hier gibt es keine Überraschungen.
Umgekehrte Reihenfolge
Collections.sort(..) verändert hier das Original, dh. die ArrayList von int-Werten, welche wir über das List-Interfaces adressieren, hat anschließend alle Zahlen in der umgekehrte Reihenfolge erfasst.
Parallele Verarbeitung «forEachOrdered()»
Wir können auch bei einer parallen Verarbeitung die Ordnung erhalten,
indem nicht forEach(..) sondern forEachOrdered(..) nutzen.
Damit das allerdings funktionieren kann, sind ein paar Voraussetzungen erforderlich.
parallele Datenverarbeitung
Der Stream ist parallel. Dieses wird über Collection.parallelStream() bewirkt.
Collector.Characteristics.CONCURRENT
Der characteristic-Parameter von collect(characteristic,..) hat den einen Collector.Characteristics.CONCURRENT [7] Wert.
Will man feststellen, welche Charakteristik ein java.util.function.Collector
hat, ruft man die Collector.characteristics() Methode.
Collector.Characteristics [8] ist eine Enumeration in java.util.stream.Collector.Characteristics.
Stream ungeordnet oder Collector mit ungeordneter Characteristic
Entweder der Stream ist ungeordnet (engl.unordered) oder aber
der Collector hat die
Characteristic
Collector.Characteristics.UNORDERED.
Will man sicherstellen, dass ein Stream ungeordnet sein soll, so kann man die
BaseStream.unordered() Operation,
BaseStream.unordered()
[9]
, aufrufen.
Geordnete Collections sind für Concurrent Reduction Verfahren ein Problem
Immer dann, wenn eine Reihenfolge oder Ordnung erhalten werden soll, muss man einen anderen Weg nutzen.
Side Effects
Parallel Computing | Seiteneffekte
Seiteneffekte, Side Effects
Seitenffekte
Wann hat eine Methode/Ausruck einen Seiteneffekt?
Eine Methode oder ein Ausdruck hat einen Seiteneffekt, wenn er neben der Rückgabe bzw. Erzeugung eines Wertes auch den Zustand der Daten im Computer verändert.
Beispiele hierfür sind veränderbare Reduktionen (engl. Mutable Reductions), dh. Operationen, welche die collect-Operation verwenden.
Wann hat eine Methode/Ausruck einen Seiteneffekt?
Die Methode .collect(..) kann Site Effects bewirken. Sie wird im Kapitel de/uio3-docs/java/java-c2/collection-reduction erläutert.
Der Zugriff auf Daten während der Verarbeitung ist ein Seiteneffekt
Der Zugriff auf Daten während der Verarbeitung ist ein Seiteneffekt
JAVA API .parallelStream.collect(..) ist sicherer
Relativ sicher
Eine Reduzierung der Elemente des Streams auf ein Ergebnis mit der collect(..) Methode im Falle von .parallelStream() beinhandelt mögliche Seiteneffekte sehr gut.
Das bedeutet sinngemäß: Veränderungen an den Daten führen zu einer Exception.
Wert beschaffen
Man sollte aber immer Vorsicht walten lassen. In einer parallelen Verarbeitung kann es sein, dass ein übergebener Lambda-Ausdruck in mehreren Threads gleichzeitig ausgeführt wird.
Wert beschaffen
Man sollte aber immer Vorsicht walten lassen. In einer parallelen Verarbeitung kann es sein, dass ein übergebener Lambda-Ausdruck in mehreren Threads gleichzeitig ausgeführt wird.
Beispiele
Die nachfolgenden Abschnitte behandeln Interferencen und zustandsbehaftete Lambda-Ausdrücke, Statefull-Lambda-Expressions. In beiden Fällen können Seiteneffekte entstehen und damit zu inkonsistenten und unvorhersagbaren Ergebnissen führen.
Dies gilt insbesondere für parallele Datenverarbeitung mit .parallelStream().
Anmerkung: Das gilt auch dann, wenn man stattdessen mit Threads arbeitet. Die Packages und Klassen sind zwar erst einmal dann andere, aber das Prinzip ist das selbe.
Lazyness
Parallel Computing | Lazyness
Alle Zwischenoperationen in der sogenannten Pipeline auf einem Stream sind lazy. Dieses Prinzip man auch als Lazyness.
Ein Ausdruck, eine Methode oder ein Algorithmus ist lazy, wenn sein Wert nur dann berechnet wird, wenn er benötigt wird. (Ein Algorithmus ist eager, wenn er sofort ausgewertet oder verarbeitet wird.
Zwischenoperationen sind lazy, weil die Verarbeitung des Inhalts eines Streams erst dann beginnt, wenn die abschließende (terminal) Operation startet.
Durch das lazy Verarbeiten von Streams können der Java-Compiler und die Laufzeitumgebung die Stream-Verarbeitung optimal gestalten.
Siehe Terminal Operation, Aggregate-Operations, reduce(), collect()
Zum Beispiel könnte in einer Pipeline wie im Abschnitt Aggregate Operations am Beispiel filter-mapToInt-average die average-Operation zunächst einige Ganzzahlen aus dem von mapToInt erzeugten Stream übernehmen (der seinerseits Elemente aus der filter-Operation liefert). Dieser Vorgang wird solange wiederholt, bis alle erforderlichen Elemente aus dem Stream bezogen wurden, und erst danach wird der Durchschnitt berechnet.
Dieses Beispiel verkettet die in listOfStrings enthaltenen Zeichenketten zu einem
Optional <String>-Wert
mittels der reduce(..)-Operation, welche eine Terminal-Operation ist.
Allerdings ruft die Pipeline hier die Zwischenoperation peek auf, die versucht, während der Ausführung der Pipeline ein neues Element zu listOfStrings hinzuzufügen.
/*
Dieses Beispiel ist im Kapitel Aggregation-Operation enthalten.
Es fehlt aktuell allerdings der Link und/oder die Referenzierung.
*/
/* Beispiel fehlt. */
...
Das bedeutet, dass die Pipeline in diesem Beispiel erst gestartet wird, wenn die get-Operation aufgerufen wird, und die Ausführung erst endet, wenn die get-Operation abgeschlossen ist.
Interference
Interference
Parallel Computing | Interference
Intermedia Operations werden in der parallelen Datenverarbeitung infolge von .parallelStream() in einer völlig zufälligen, nicht-deteriministischen Reihenfolge verarbeitet. Jede Veränderung oder bereits ein Zugriff zur Ausgabe kann zu Interferenzen führen.
package com.stuelken.java.b1.collections.parallelism;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author t2m
*/
public class Parallel07_SideEffects_Interference {
public static void main(String[] args) {
try { // (1) Wirft ConcurrentModificationException aus
List<String> listOfStrings = new ArrayList<>( // (2)
Arrays.asList("eins", "zwei", "drei")); // (3)
// This will fail as the peek operation will attempt to add the
// string "three" to the source after the terminal operation has
// commenced.
String concatenatedString = listOfStrings.stream() // (4)
.peek(s -> listOfStrings.add("vier")) // (5) Interferenz!!!
.reduce((a, b) -> a + " " + b).get(); // (6)
System.out.println("Concatenated string: " + concatenatedString); // (7)
} catch (Exception e) { // (8)
System.out.println("Exception caught: " + e.toString()); // (9)
}
}
}
/*
* Output: Exception caught: java.util.ConcurrentModificationException
*/
/*
* Output: Exception caught: java.util.ConcurrentModificationException
*/
Direkt eine ArrayList erzeugen
Die Arrays Klasse bietet diverse statische praktische Methoden rund um das Arbeiten mit Arrays.
Das ist hier unzulässig
Die .peek Methode sorgt hier dafür, dass wir versuchen, dem Stream noch eine Zahl "vier" hinzufügen. .peek(comsumer) liefert neuen Stream und verändert den bisherigen.
Veränderungen werden unterbunden
Der praktische Aspekt dieser Ausnahme besteht darin, dass wir diesen Fehler kontrolliert abfangen können, allerdings beendet dieser Fehler auch die Durchführung der Verarbeitung.
... ...
Zwischenoperationen können zwar mal filtern oder mappen, aber Sie sollten dennoch nicht die Datenkonsistenz verändern.
Interference
Statefull-Lambda-Expressions
Parallel Computing | Statefull-Lambda-Expressions
Vermeide Sie es, zustandsbehaftete Lambda-Ausdrücke als Parameter in Stream-Operationen zu verwenden.
Ein zustandsbehafteter Lambda-Ausdruck ist einer, dessen Ergebnis von einem Zustand abhängt, der sich während der Ausführung einer Pipeline ändern kann.
Das folgende Beispiel fügt mit der Zwischenoperation map Elemente aus der Liste listOfIntegers zu einer neuen List-Instanz hinzu. Dies wird zweimal durchgeführt: zuerst mit einem seriellen Stream und danach mit einem parallelen Stream.
Die Operation forEachOrdered verarbeitet Elemente in der Reihenfolge, die im Stream festgelegt ist – unabhängig davon, ob der Stream seriell oder parallel ausgeführt wird.
Wird ein Stream jedoch parallel ausgeführt, verarbeitet die map-Operation die Elemente gemäß der Vorgaben der Java-Laufzeitumgebung und des Compilers.
Folglich kann sich die Reihenfolge, in der der Lambda-Ausdruck e -> { parallelStorage.add(e); return e; } Elemente zur Liste parallelStorage hinzufügt, bei jedem Programmdurchlauf ändern.
Für deterministische und vorhersehbare Ergebnisse sollte man darauf achten, dass Lambda-Ausdrücke in Stream-Operationen keinen zustandsbehafteten Code enthalten.
Hinweis: In diesem Beispiel wird die Methode synchronizedList aufgerufen, sodass die Liste parallelStorage threadsicher ist.
Bedenken Sie, dass Collections standardmäßig nicht threadsicher sind – das bedeutet, dass mehrere Threads nicht gleichzeitig auf dieselbe Collection zugreifen sollten.
Wenn wir beim Erzeugen von parallelStorage nicht die Methode synchronizedList aufrufen, List<Integer> parallelStorage = new ArrayList<>(); > > > verhält sich das Beispiel unvorhersehbar, weil mehrere Threads gleichzeitig auf parallelStorage zugreifen und diese modifizieren, ohne dass eine Synchronisierung erfolgt.
package com.stuelken.java.b1.collections.parallelism; // (1)
import java.lang.Integer; // (2)
import java.util.ArrayList; // (3)
import java.util.Arrays; // (4)
import java.util.Collections; // (5)
import java.util.List; // (6)
/**
*
* @author t2m
*
*/
public class Parallel08_SideEffects_StatefullLambdaExpressions { // (7)
public static void main(String[] args) { // (8)
List<Integer> serialStorage = new ArrayList<>(); // (9) Container für Werte
Integer[] intArray = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; // (10) Werte
List<Integer> zahlenListOfInteger = new ArrayList<>(Arrays.asList(intArray)); // (11) Liste
/*
* ===
*/
System.out.println("(1) seriell: " //
+ "\nstream().map(e->serialStorage.add(e);return(e)}).foreachOrdered(..): "); // (12)
zahlenListOfInteger.stream() // (13) Sequenzieller Durchlauf
.map(e -> { // (14)
System.out.println(" .stream().map(e ) mit e:" + e); // (15) Debugging
serialStorage.add(e); // (16) Seriellen Storage Werte der ArrayList geordnet geben.
return e; // (17)
}) // (18)
.forEachOrdered(e -> System.out.print(e + " ")); // (19)
System.out.println("\n"); // (20)
serialStorage.stream().forEachOrdered(e -> System.out.print(e + " ")); // (21) Geordnet.
System.out.println("\n"); // (22)
/*
* ===
*/
System.out.println("(2) Parallel stream "//
+ "\nmit parallelStorage, Collections.synchronizedList(..):"); // (23)
// WICHTIG: Thread-Sichere Liste erzeugen
List<Integer> parallelStorage = Collections.synchronizedList(new ArrayList<>()); // (24)
zahlenListOfInteger.parallelStream() // (25) Paralleler Stream, nebenläufig
.map(e -> { // (26)
parallelStorage.add(e); // (27) Reihenfolge ist NICHT deteriministisch, also zufällig
return e; // (28)
}) // (29)
.forEachOrdered(e -> System.out.print(e + " ")); // (30)
System.out.println(""); // (31)
/*
* ===
*/
System.out.println("\n"); // (32)
System.out.println("(3) parallelStrorage mit .stream().foreachOrdered(..):"); // (33)
parallelStorage.stream().forEachOrdered(e -> System.out.print(e + " ")); // (34)
System.out.println(""); // (35)
} // (36)
} // (37)
(1) seriell:
stream().map(e->serialStorage.add(e);return(e)}).foreachOrdered(..):
.stream().map(e ) mit e:0
0 .stream().map(e ) mit e:1
1 .stream().map(e ) mit e:2
2 .stream().map(e ) mit e:3
3 .stream().map(e ) mit e:4
4 .stream().map(e ) mit e:5
5 .stream().map(e ) mit e:6
6 .stream().map(e ) mit e:7
7 .stream().map(e ) mit e:8
8 .stream().map(e ) mit e:9
9
0 1 2 3 4 5 6 7 8 9
(2) Parallel stream mit parallelStorage, Collections.synchronizedList(..):
0 1 2 3 4 5 6 7 8 9
(3) parallelStrorage mit .stream().foreachOrdered(..):
8 1 2 6 9 7 5 0 3 4
Sychronisiert für parallele Verarbeitung.
Wichtig zum Verständnis: Normalerweise sind Collections NICHT synchronisiert. Um also eine Collection thread-sicher verarbeiten zu können, müssen wir diese in eine synchronisierte Fassung übersetzen.
Dieses Pattern sorgt für die synchronisierte Verarbeitung
Wir können also parallel arbeiten, ... ABER ...
Die Reihenfolge geht hier verloren
Gibt man den parallelStorage also mit .stream() mal
in der Reihenfolge aus, die wir dort erfasst haben,
kann man gut erkennen, dass wir zwar die Elemente geordnet
im .parallelStream() letzendlich ausgeben, aber WÄHREND der Verarbeitung
mit der Intermedia Operation map
wir eine völlig willkürliche Reihenfolge haben.
Collection.synchronizedList(c) wandelt eine andere Collection wie eine ArrayList für Integer in eine List<Integer> um, womit diese Collecion thread-sicher verrabeitet werden kann.
Die Reihenfolge bei Intermedia Operations wie .map(..) ist aber innerhalb der Pipeline im Falle von .parallelStream() weiterhin völlig willkürlich.
Links
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
Quellen, Notes, Tags
- [1]↑ JAVA: Programmiersprache
- [2]↑ JAVASCRIPT: Programmiersprache, nicht JAVA
- [3]↑ java.util.stream.BaseStream: https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.htm
- [4]↑ Collector.Characteristics.CONCURRENT : https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html#CONCURRENT
- [5]↑ Collector.Characteristics: https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html
- [6]↑ BaseStream.unordered(): https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#unordered--
- [7]↑ Collector.Characteristics.CONCURRENT : https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html#CONCURRENT
- [8]↑ Collector.Characteristics: https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.Characteristics.html
- [9]↑ BaseStream.unordered(): https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#unordered--
UIO3 Es ist einfacher als Du denkst.
Stelle noch heute Deine Anfrage.
