Stream Gatherers
java feature jse24Wo kommen wir her?
Mit der Einführung der Stream API
in Java SE 8 (insbesondere mit
den JEP 107 Bulk Data Operations for Collections, JEP 109 Enhance Core Libraries with Lambda
und JEP 126 Lambda Expressions & Virtual Extension Methods) haben sich die Möglichkeiten
des “Wie” ein Java Projekt umgesetzt wird grundlegend erweitert. Durch die Annäherung und syntaktische Unterstützung von
funktionaler Programmierung eröffnete sich ein ganz neuer Horizont für die Java Welt.
Doch seit dem Start von OpenJDK 8 am 18.03.2014 (Release Page) haben sich kaum nennenswerte Erweiterungen in der API ergeben. Lediglich kleinere Anpassungen wie zum Beispiel der JEP 269: Convenience Factory Methods for Collections in dem Java SE 9 Release haben kleinere Features in den Umgang mit Streams gebracht.
Aufbau Streams
Im Wesentlichen besteht ein Stream in Java aus drei Hauptteilen:
- einer Quelle (source)
- beliebig vielen Zwischenstufen (intermediate operation)
- einem Endpunkt (terminal operation)
10public void runStream() {
11 Stream.of(1, 2, 3, 4, 5) // <- Quelle
12 .map(element -> element * element) // <- Zwischenstufe
13 .forEach(System.out::print); // <- Endpunkt
14 // Output: 1491625
15}
Java Streams unterliegen eine lazy evaluation - das heißt, dass erst dann Elemente in den Zwischenstufen (intermediate operation) verarbeitet werden, wenn das Ergebnis auch wirklich gebraucht wird, also wenn ein Endpunkt (terminal operation) angegeben ist.
Motivation für Gatherers
Wie weiter oben schon festgestellt wurde, hat sich der grundlegende Umfang von dem Stream API seit der Einführung nicht
mehr wirklich verändert. Das betrifft insbesondere die Funktionen der intermediate operations, also der
Zwischenstufen. Hier sind bis dato Java Entwicklungen auf die Angebote beschränkt, welche in java.util.stream.Stream
definiert werden. Zu den bekannteren Methoden zählen:
filter
map
sorted
distinct
Gerade die Verknüpfung mehrerer dieser Methoden bieten durchaus vielseite Einsatzmöglichkeiten. Jedoch stoßen auch sie
an ihre Grenzen. Wenn wir zum Beispiel eine Liste von Objekten haben und diese nach einem anderen Kriterium als die
Objekt-eigenen equals
und hashCode
Implementierung eindeutig (distinct
) filtern wollen, wäre dies nicht ohne
Umwege möglich.
10public void distinctByCustom() {
11 Stream.of("maria", "phil", "anna", "jo") // <- Quelle
12 .distinctBy(String::length) // <- gibt es so nicht
13 .forEach(System.out::print); // <- Endpunkt
14 // Erwarteter Output: mariaphiljo
15}
Oder auch die gerade aus der SQL
bekannten window
Funktion, ist nur mit erheblichem Mehraufwand implementierbar:
10public void windowFunction() {
11 Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) // <- Quelle
12 .windowFixed(3) // <- gibt es so nicht
13 .limit(2) // <- Zwischenstufe
14 .forEach(System.out::print); // <- Endpunkt
15 // Erwarteter Output: [0, 1, 2][3, 4, 5]
16}
Um nicht unzählige einzel Problem bezogene zusätzliche Funktionen in der Stream
-Klasse zu ergänzen, wurde nach einer
generischen Lösung gesucht - die Stream::gather(Gatherer)
Zwischenstufe soll sich dieser Herausforderung annehmen.
Aufbau eines Gatherers
Stream::gather
als intermediate operation und das Gatherer
Interface kamen mit Java 22 als Preview in die Java
Welt und werden mit Java 24 vollständig released. Wie auch schon bei den bestehenden Zwischenstufen ist ein Gatherer
recht simple zusammengefasst:
A gatherer represents a transform of the elements of a stream (JEP 461)
Jedoch ist hierbei noch im Vorhinein festgelegt, wie genau die Transformation aussehen soll - genau hier liegen die
Stärken der Gatherer
.
Im Groben besteht ein Gatherer
aus 4 Komponenten:
Integrator
Initializer
Finisher
Combiner
Nur der Integrator
ist zwingend notwendig, um einen Gatherer
zu implementieren. Die drei anderen Komponenten werden
für komplexere Problemstellungen benötigt.
Integrator
Das Integrator
Interface ist ein Functional Interface
mit der zentralen Methode integrate
:
10
11@FunctionalInterface
12public interface Integrator<A, T, R> {
13 boolean integrate(A state, T element, Downstream<? super R> downstream);
14 // . . .
15}
Der state
ist der aktuelle Zustand des Gatherers
und kann verwendet werden, um einen Zustand über die Verarbeitung
einzelner Elemente hinaus zu teilen. Mehr dazu weiter unten. Das element
ist vom Upstream
des Streams
kommende
aktuelle Element, welches transformiert werden soll. Die Referenz auf den Downstream
hat aus dem
Functional Interface
die Methode boolean push(T element)
, welche verwendet wird, um das transformierte Element an
die nächste Ebene im Stream
zu senden. Als Rückgabewert der integrate
Methode wird ein boolean
erwartet. Dieser
zeigt an, ob ein Element an den Downstream
gesendet wurde oder nicht. Die Downstream::push
Methode gibt den
booleschen Wert nach der gleichen Logik, nur bezogen auf die nächste Ebene des Streams, zurück.
Mit dem Wissen kann jetzt ein simpler Gatherer geschrieben werden:
10public Gatherer.Integrator<Void, Integer, Integer> squaredIntegrator() {
11 return Gatherer.Integrator.ofGreedy(
12 (state, element, downstream) -> {
13 int newElement = element * element;
14 return downstream.push(newElement);
15 });
16}
17
18public void runStreamWithIntegrator() {
19 Stream.of(1, 2, 3, 4, 5)
20 .gather(Gatherer.of(squaredIntegrator()))
21 .forEach(System.out::println);
22
23 // output: 1491625
24}
Ein besonderes Augenmerk sollte auf Gatherer.Integrator.ofGreedy
geworfen werden. Ein greedy
Integrator
hat keine eigene Logik, ob er ein weiteres Element aus dem Stream verarbeitet oder nicht. Er verlässt sich
dabei einzig und allein auf den Downstream
. Diese Angabe ermöglicht der JVM unter der Haube etwas an Optimierungen
vorzunehmen. Sollte der Integrator selbst entscheiden, ob ein Element vom Upstream
verarbeitet wird oder nicht, kann
ein simples Gatherer.Integrator.of
genutzt werden.
Mit dem Aufrufen von .gather(Gatherer.of(integrator))
kann jetzt einfach ein neuer Gatherer
mit dem eigens
definierten Integrator
für den Stream genutzt werden.
Initializer
Der optionale Intilizer
wird verwendet, um dem Gatherer
einen Zustand zu geben. Der im vorherigen Abschnitt gezeigte
Gatherer
ist zustandslos - er hat keinerlei Informationen neben dem aktuellen Element. Als Intializer
erwartet das
Gatherer
Interface einen Supplier
, welcher den initialen Status (Zustand) erzeugt. Um zum Beispiel einen Zähler zu
implementieren, der nach x
Elementen stoppt, kann einfach ein AtomicInteger
genutzt werden:
10public Gatherer.Integrator<AtomicInteger, Integer, Integer> squaredIntegratorWithLimit(int limit) {
11 return Gatherer.Integrator.of( // (A)
12 (state, element, downstream) -> {
13 if (state.getAndIncrement() < limit) {
14 int newElement = element * element;
15 return downstream.push(newElement); // (B)
16 } else {
17 return false; // (C)
18 }
19 });
20}
21
22public void runStreamWithIntegrator() {
23 Supplier<AtomicInteger> initializer = AtomicInteger::new;
24 Gatherer.Integrator<AtomicInteger, Integer, Integer> integrator = squaredIntegratorWithLimit(3);
25
26 Stream.of(1, 2, 3, 4, 5)
27 .gather(Gatherer.ofSequential(initializer, integrator)) // (D)
28 .forEach(System.out::print);
29
30 // output: 149
31}
Von dem Grundaufbau hat sich zu dem vorherigen Beispiel nicht viel geändert. Besonders betrachtet werden sollten die Stellen:
(A)
: Da derIntegrator
jetzt selbst entscheidet, ob ein Element weiter verarbeitet wird oder nicht, sollte dasofGreedy
durchof
ersetzt werden.(B)
: Wenn der aktuelle Zustand (state.getAndIncrement()
) kleiner als das übergebenelimit
ist, wird das Element wie schon vorher einfach an denDownstream
übergeben.(C)
: Wenn der aktuelle Zustand größer oder gleich demlimit
ist, wird die Verarbeitung des Elements abgelehnt.(D)
: Da nun ein zustandsbehafteterGatherer
genutzt wird, welcher keinenCombiner
hat, darf dieser nicht parallel in mehrerenThreads
laufen. DurchGatherer.ofSequential
wird dies sichergestellt.
Finisher
Wie der Name Finisher
schon vermuten lässt, wird dieser Part zum Ende eines Gatherers
benötigt. Doch wofür genau und
was heißt am Ende? Um dies genauer zu beleuchten, schauen wir uns das Beispiel der windowFunction
von oben genauer an.
Eine windowFunction
teilt eine Menge von x Elementen in eine Liste von Teilmengen von einer Länge von y. So wird
zum Beispiel aus [1,2,3,4,5,6]
ein [[1,2], [3,4], [5,6]]
. Was passiert aber, wenn x kein Vielfaches von y ist?
Also anders formuliert: Was passiert mit dem Zustand eines Gatherers
nachdem das letzte Element eines Streams
verarbeitet wurde?
Hier kommt der Finisher
ins Spiel. Dieser ist eine Implementierung des BiConsumer
Interfaces und erwartet als ersten
Parameter A
den state
des Gatherers
und als zweiten Parameter den Downstream
des Streams:
10BiConsumer<A, Downstream<? super R>> finisher;
Mit diesem Wissen kann das Problem der “losen” Elemente bei einer windowFunction
Implementierung gelöst werden.
10public Gatherer.Integrator<List<Integer>, Integer, List<Integer>> windowFunctionIntegrator(int windowSize) {
11 return Gatherer.Integrator.of(
12 ((state, element, downstream) -> {
13 state.add(element);
14
15 if (state.size() == windowSize) { // (A)
16 boolean downstreamResult = downstream.push(List.copyOf(state));
17 state.clear();
18 return downstreamResult;
19 }
20
21 return true;
22 }));
23}
24
25public BiConsumer<List<Integer>, Gatherer.Downstream<? super List<Integer>>> windowFunctionFinisher() {
26 return (state, downstream) -> {
27 if (!state.isEmpty()) { // (B)
28 downstream.push(List.copyOf(state));
29 state.clear();
30 }
31 };
32}
33
34public void runGathererWithFinisher() {
35 int windowSize = 3;
36
37 Stream.of(1, 2, 3, 4, 5)
38 .gather(
39 Gatherer.ofSequential(
40 ArrayList::new, windowFunctionIntegrator(windowSize), windowFunctionFinisher())) // (C)
41 .forEach(System.out::print);
42 // output: [1, 2, 3][4, 5]
43}
Im Gegensatz zu den Beispielen weiter oben wurde nun in (C)
ein Finisher
dem Gatherer
bei der Initialisierung
übergeben. Integrator
und Finisher
sind dabei recht nah miteinander verzahnt, da der Finisher
in der Regel auf der
Logik des Integrators
aufbaut. So auch in diesem Fall: In (A)
wird geprüft, ob schon genügend Elemente im aktuellen
state
liegen, ob ein window
zu füllen. Nur wenn dies der Fall ist, wird das Fenster in Gänze an den Downstream
gepushed. Somit kann es passieren, dass am Ende alle Elemente in dem Zustand des Gatherers
noch Elemente liegen,
welche über den Finisher
in (B)
aufgesammelt und ebenfalls an den Downstream
gesendet werden.
Combiner
Als letzte Komponente für einen Gatherer
bleibt der Combiner
übrig. In verschiedenen Endpunkten der Stream
-Klasse,
wie zum Beispiel der collect
oder reduce
Methode, kommt der Begriff combiner schon vor. Wie auch an diesen
Stellen, dient der Combiner
dazu, einen Gatherer
für die parallele Verarbeitung in mehreren Threads fit zu machen.
Ein Combiner
wird dann notwendig, wenn ein zustandsbehafteter Gatherer
in einem parallel ausgeführten Stream genutzt
wird. Ist dies der Fall, behandelt die Implementierung des Combiner
-Interface die Kombinierung der jeweiligen Zustände
in den unterschiedlichen Threads.
Aus einer Liste von Namen soll der längste Name gefunden werden und für die weitere Verarbeitung in die Stream-Pipline
geschickt werden. Hierzu hält der Zustand des Gatherers
den aktuell längsten Namen. Der Finisher
schickt den
längsten Namen dann für die weitere Verarbeitung an den Downstream
des Streams.
10public void runGathererWithCombiner() {
11 Stream.of("Anna", "Pete", "Suzanne", "Kay", "Mike")
12 .parallel()
13 .gather(
14 Gatherer.of(
15 AtomicReference<String>::new, //(A)
16 Gatherer.Integrator.of(
17 (state, element, downstream) -> {
18 String currentLongestName = state.get();
19
20 if (Objects.nonNull(element)
21 && (Objects.isNull(currentLongestName)
22 || element.length() > currentLongestName.length())) {
23 state.set(element); //(B)
24 }
25
26 return true;
27 }),
28 (stateA, stateB) -> { //(C)
29 String longestNameFromA = stateA.get();
30 String longestNameFromB = stateB.get();
31
32 if (longestNameFromA == null) {
33 return stateB;
34 } else if (longestNameFromB == null) {
35 return stateA;
36 } else if (longestNameFromA.length() > longestNameFromB.length()) {
37 return stateA;
38 } else {
39 return stateB;
40 }
41 },
42 (state, downstream) -> { //(D)
43 String longestName = state.get();
44
45 if (Objects.nonNull(longestName)) {
46 downstream.push(longestName);
47 }
48 }))
49 .forEach(System.out::println);
50 // output: Suzanne
51}
In dem Code-Snippet oben ist ein Gatherer
einmal in dem vollen Umfang und allen Unterkomponenten aufgebaut. Es wird
ein paralleler Stream von Namen erstellt. Bei //(A)
wird der initiale Zustand angelegt. Der Integrator
in //(B)
prüft, ob der aktuelle Namen länger ist, als der im Zustand befindliche. Wenn ja, wird der Zustand mit dem neuen
längsten Namen ersetzt. Erwähnenswert hierbei ist, dass der Integrator
kein Element in den Downstream
sendet. Dies
geschieht erst im Finisher
. Da der Integrator
alle Elemente annimmt, könnte er auch als greedy
initialisiert
werden. In //(C)
wir der Combiner
implementiert. Er bekommt die jeweiligen Zustände zweiter Threads als Parameter
übergeben und ermittelt den Zustand mit dem längsten Namen. Dieser wird dann zurückgegeben. Schließlich wird im
Finisher
(//(D)
) der am Ende aller Elemente im Zustand befindliche Name in den Downstream
geschickt.
Zusammenfassung
Die neuen Funktionen der Gatherer
geben den Stream-APIs von Java ein mächtiges Tool an die Hand. Insbesondere die
Möglichkeiten der freien Konfigurationen bieten erstaunlich viel Freiräume und quasi keine Grenzen für die
Anwendungsfälle. Gerade eine Kombination von Gatherern
und Generics
geben gerade für Framework-Entwicklungen viel
Spielraum den Nutzer:innen der Stream API viel “Magie” anzubieten. Aber auch für die klassischen Applikationsanwendungen
können clever genutzte Gatherer
einen erheblichen Mehrwert im Bereich der Wartbarkeit durch Codereduzierungen bringen.