Wo kommen wir her?
Mit der Einführung der Stream API in Java SE 8 (insbesondere mit
den JEP 107 Bulk Data Operations for Collections [1], JEP 109 Enhance Core Libraries with Lambda [2]
und JEP 126 Lambda Expressions & Virtual Extension Methods [3]) haben sich die Möglichkeiten
des “Wie” ein Java Projekt umgesetzt wird grundlegend erweitert. Durch die Annäherung und syntaktische Unterstützung von
funktionaler Programmierung eröffnete sich ein ganz neuer Horizont für die Java Welt.
Doch seit dem Start von OpenJDK 8 am 18.03.2014 [4] haben sich kaum nennenswerte Erweiterungen in der API ergeben. Lediglich kleinere Anpassungen wie zum Beispiel der JEP 269: Convenience Factory Methods for Collections [5] in dem Java SE 9 Release haben kleinere Features in den Umgang mit Streams gebracht.
Aufbau Streams
Im Wesentlichen besteht ein Stream in Java aus drei Hauptteilen:
- einer Quelle (source)
- beliebig vielen Zwischenstufen (intermediate operation)
- einem Endpunkt (terminal operation)
10public void runStream() {
11 Stream.of(1, 2, 3, 4, 5) // <- Quelle
12 .map(element -> element * element) // <- Zwischenstufe
13 .forEach(System.out::print); // <- Endpunkt
14 // Output: 1491625
15}
Java Streams unterliegen eine lazy evaluation - das heißt, dass erst dann Elemente in den Zwischenstufen (intermediate operation) verarbeitet werden, wenn das Ergebnis auch wirklich gebraucht wird, also wenn ein Endpunkt (terminal operation) angegeben ist.
Motivation für Gatherers
Wie weiter oben schon festgestellt wurde, hat sich der grundlegende Umfang von dem Stream API seit der Einführung nicht
mehr wirklich verändert. Das betrifft insbesondere die Funktionen der intermediate operations, also der
Zwischenstufen. Hier sind bis dato Java Entwicklungen auf die Angebote beschränkt, welche in java.util.stream.Stream
definiert werden. Zu den bekannteren Methoden zählen:
filtermapsorteddistinct
Gerade die Verknüpfung mehrerer dieser Methoden bieten durchaus vielseite Einsatzmöglichkeiten. Jedoch stoßen auch sie
an ihre Grenzen. Wenn wir zum Beispiel eine Liste von Objekten haben und diese nach einem anderen Kriterium als die
Objekt-eigenen equals und hashCode Implementierung eindeutig (distinct) filtern wollen, wäre dies nicht ohne
Umwege möglich.
10public void distinctByCustom() {
11 Stream.of("maria", "phil", "anna", "jo") // <- Quelle
12 .distinctBy(String::length) // <- gibt es so nicht
13 .forEach(System.out::print); // <- Endpunkt
14 // Erwarteter Output: mariaphiljo
15}
Oder auch die gerade aus der SQL bekannten window Funktion, ist nur mit erheblichem Mehraufwand implementierbar:
10public void windowFunction() {
11 Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) // <- Quelle
12 .windowFixed(3) // <- gibt es so nicht
13 .limit(2) // <- Zwischenstufe
14 .forEach(System.out::print); // <- Endpunkt
15 // Erwarteter Output: [0, 1, 2][3, 4, 5]
16}
Um nicht unzählige einzel Problem bezogene zusätzliche Funktionen in der Stream-Klasse zu ergänzen, wurde nach einer
generischen Lösung gesucht - die Stream::gather(Gatherer) Zwischenstufe soll sich dieser Herausforderung annehmen.
Aufbau eines Gatherers
Stream::gather als intermediate operation und das Gatherer Interface kamen mit Java 22 als Preview [6] in die Java
Welt und werden mit Java 24 [8] vollständig released [7]. Wie auch schon bei den bestehenden Zwischenstufen ist ein Gatherer
recht simple zusammengefasst:
A gatherer represents a transform of the elements of a stream [6]
Jedoch ist hierbei noch im Vorhinein festgelegt, wie genau die Transformation aussehen soll - genau hier liegen die
Stärken der Gatherer.
Im Groben besteht ein Gatherer aus 4 Komponenten:
IntegratorInitializerFinisherCombiner
Nur der Integrator ist zwingend notwendig, um einen Gatherer zu implementieren. Die drei anderen Komponenten werden
für komplexere Problemstellungen benötigt.
Integrator
Das Integrator Interface ist ein Functional Interface mit der zentralen Methode integrate:
10
11@FunctionalInterface
12public interface Integrator<A, T, R> {
13 boolean integrate(A state, T element, Downstream<? super R> downstream);
14 // . . .
15}
Der state ist der aktuelle Zustand des Gatherers und kann verwendet werden, um einen Zustand über die Verarbeitung
einzelner Elemente hinaus zu teilen. Mehr dazu weiter unten. Das element ist vom Upstream des Streams kommende
aktuelle Element, welches transformiert werden soll. Die Referenz auf den Downstream hat aus dem
Functional Interface die Methode boolean push(T element), welche verwendet wird, um das transformierte Element an
die nächste Ebene im Stream zu senden. Als Rückgabewert der integrate Methode wird ein boolean erwartet. Dieser
zeigt an, ob ein Element an den Downstream gesendet wurde oder nicht. Die Downstream::push Methode gibt den
booleschen Wert nach der gleichen Logik, nur bezogen auf die nächste Ebene des Streams, zurück.
Mit dem Wissen kann jetzt ein simpler Gatherer geschrieben werden:
10public Gatherer.Integrator<Void, Integer, Integer> squaredIntegrator() {
11 return Gatherer.Integrator.ofGreedy(
12 (state, element, downstream) -> {
13 int newElement = element * element;
14 return downstream.push(newElement);
15 });
16}
17
18public void runStreamWithIntegrator() {
19 Stream.of(1, 2, 3, 4, 5)
20 .gather(Gatherer.of(squaredIntegrator()))
21 .forEach(System.out::println);
22
23 // output: 1491625
24}
Ein besonderes Augenmerk sollte auf Gatherer.Integrator.ofGreedy geworfen werden. Ein greedy
Integrator hat keine eigene Logik, ob er ein weiteres Element aus dem Stream verarbeitet oder nicht. Er verlässt sich
dabei einzig und allein auf den Downstream. Diese Angabe ermöglicht der JVM unter der Haube etwas an Optimierungen
vorzunehmen. Sollte der Integrator selbst entscheiden, ob ein Element vom Upstream verarbeitet wird oder nicht, kann
ein simples Gatherer.Integrator.of genutzt werden.
Mit dem Aufrufen von .gather(Gatherer.of(integrator)) kann jetzt einfach ein neuer Gatherer mit dem eigens
definierten Integrator für den Stream genutzt werden.
Initializer
Der optionale Intilizer wird verwendet, um dem Gatherer einen Zustand zu geben. Der im vorherigen Abschnitt gezeigte
Gatherer ist zustandslos - er hat keinerlei Informationen neben dem aktuellen Element. Als Intializer erwartet das
Gatherer Interface einen Supplier, welcher den initialen Status (Zustand) erzeugt. Um zum Beispiel einen Zähler zu
implementieren, der nach x Elementen stoppt, kann einfach ein AtomicInteger genutzt werden:
10public Gatherer.Integrator<AtomicInteger, Integer, Integer> squaredIntegratorWithLimit(int limit) {
11 return Gatherer.Integrator.of( // (A)
12 (state, element, downstream) -> {
13 if (state.getAndIncrement() < limit) {
14 int newElement = element * element;
15 return downstream.push(newElement); // (B)
16 } else {
17 return false; // (C)
18 }
19 });
20}
21
22public void runStreamWithIntegrator() {
23 Supplier<AtomicInteger> initializer = AtomicInteger::new;
24 Gatherer.Integrator<AtomicInteger, Integer, Integer> integrator = squaredIntegratorWithLimit(3);
25
26 Stream.of(1, 2, 3, 4, 5)
27 .gather(Gatherer.ofSequential(initializer, integrator)) // (D)
28 .forEach(System.out::print);
29
30 // output: 149
31}
Von dem Grundaufbau hat sich zu dem vorherigen Beispiel nicht viel geändert. Besonders betrachtet werden sollten die Stellen:
(A): Da derIntegratorjetzt selbst entscheidet, ob ein Element weiter verarbeitet wird oder nicht, sollte dasofGreedydurchofersetzt werden.(B): Wenn der aktuelle Zustand (state.getAndIncrement()) kleiner als das übergebenelimitist, wird das Element wie schon vorher einfach an denDownstreamübergeben.(C): Wenn der aktuelle Zustand größer oder gleich demlimitist, wird die Verarbeitung des Elements abgelehnt.(D): Da nun ein zustandsbehafteterGatherergenutzt wird, welcher keinenCombinerhat, darf dieser nicht parallel in mehrerenThreadslaufen. DurchGatherer.ofSequentialwird dies sichergestellt.
Finisher
Wie der Name Finisher schon vermuten lässt, wird dieser Part zum Ende eines Gatherers benötigt. Doch wofür genau und
was heißt am Ende? Um dies genauer zu beleuchten, schauen wir uns das Beispiel der windowFunction von oben genauer an.
Eine windowFunction teilt eine Menge von x Elementen in eine Liste von Teilmengen von einer Länge von y. So wird
zum Beispiel aus [1,2,3,4,5,6] ein [[1,2], [3,4], [5,6]]. Was passiert aber, wenn x kein Vielfaches von y ist?
Also anders formuliert: Was passiert mit dem Zustand eines Gatherers nachdem das letzte Element eines Streams
verarbeitet wurde?
Hier kommt der Finisher ins Spiel. Dieser ist eine Implementierung des BiConsumer Interfaces und erwartet als ersten
Parameter A den state des Gatherers und als zweiten Parameter den Downstream des Streams:
10BiConsumer<A, Downstream<? super R>> finisher;
Mit diesem Wissen kann das Problem der “losen” Elemente bei einer windowFunction Implementierung gelöst werden.
10public Gatherer.Integrator<List<Integer>, Integer, List<Integer>> windowFunctionIntegrator(int windowSize) {
11 return Gatherer.Integrator.of(
12 ((state, element, downstream) -> {
13 state.add(element);
14
15 if (state.size() == windowSize) { // (A)
16 boolean downstreamResult = downstream.push(List.copyOf(state));
17 state.clear();
18 return downstreamResult;
19 }
20
21 return true;
22 }));
23}
24
25public BiConsumer<List<Integer>, Gatherer.Downstream<? super List<Integer>>> windowFunctionFinisher() {
26 return (state, downstream) -> {
27 if (!state.isEmpty()) { // (B)
28 downstream.push(List.copyOf(state));
29 state.clear();
30 }
31 };
32}
33
34public void runGathererWithFinisher() {
35 int windowSize = 3;
36
37 Stream.of(1, 2, 3, 4, 5)
38 .gather(
39 Gatherer.ofSequential(
40 ArrayList::new, windowFunctionIntegrator(windowSize), windowFunctionFinisher())) // (C)
41 .forEach(System.out::print);
42 // output: [1, 2, 3][4, 5]
43}
Im Gegensatz zu den Beispielen weiter oben wurde nun in (C) ein Finisher dem Gatherer bei der Initialisierung
übergeben. Integrator und Finisher sind dabei recht nah miteinander verzahnt, da der Finisher in der Regel auf der
Logik des Integrators aufbaut. So auch in diesem Fall: In (A) wird geprüft, ob schon genügend Elemente im aktuellen
state liegen, ob ein window zu füllen. Nur wenn dies der Fall ist, wird das Fenster in Gänze an den Downstream
gepushed. Somit kann es passieren, dass am Ende alle Elemente in dem Zustand des Gatherers noch Elemente liegen,
welche über den Finisher in (B) aufgesammelt und ebenfalls an den Downstream gesendet werden.
Combiner
Als letzte Komponente für einen Gatherer bleibt der Combiner übrig. In verschiedenen Endpunkten der Stream-Klasse,
wie zum Beispiel der collect oder reduce Methode, kommt der Begriff combiner schon vor. Wie auch an diesen
Stellen, dient der Combiner dazu, einen Gatherer für die parallele Verarbeitung in mehreren Threads fit zu machen.
Ein Combiner wird dann notwendig, wenn ein zustandsbehafteter Gatherer in einem parallel ausgeführten Stream genutzt
wird. Ist dies der Fall, behandelt die Implementierung des Combiner-Interface die Kombinierung der jeweiligen Zustände
in den unterschiedlichen Threads.
Aus einer Liste von Namen soll der längste Name gefunden werden und für die weitere Verarbeitung in die Stream-Pipline
geschickt werden. Hierzu hält der Zustand des Gatherers den aktuell längsten Namen. Der Finisher schickt den
längsten Namen dann für die weitere Verarbeitung an den Downstream des Streams.
10public void runGathererWithCombiner() {
11 Stream.of("Anna", "Pete", "Suzanne", "Kay", "Mike")
12 .parallel()
13 .gather(
14 Gatherer.of(
15 AtomicReference<String>::new, //(A)
16 Gatherer.Integrator.of(
17 (state, element, downstream) -> {
18 String currentLongestName = state.get();
19
20 if (Objects.nonNull(element)
21 && (Objects.isNull(currentLongestName)
22 || element.length() > currentLongestName.length())) {
23 state.set(element); //(B)
24 }
25
26 return true;
27 }),
28 (stateA, stateB) -> { //(C)
29 String longestNameFromA = stateA.get();
30 String longestNameFromB = stateB.get();
31
32 if (longestNameFromA == null) {
33 return stateB;
34 } else if (longestNameFromB == null) {
35 return stateA;
36 } else if (longestNameFromA.length() > longestNameFromB.length()) {
37 return stateA;
38 } else {
39 return stateB;
40 }
41 },
42 (state, downstream) -> { //(D)
43 String longestName = state.get();
44
45 if (Objects.nonNull(longestName)) {
46 downstream.push(longestName);
47 }
48 }))
49 .forEach(System.out::println);
50 // output: Suzanne
51}
In dem Code-Snippet oben ist ein Gatherer einmal in dem vollen Umfang und allen Unterkomponenten aufgebaut. Es wird
ein paralleler Stream von Namen erstellt. Bei //(A) wird der initiale Zustand angelegt. Der Integrator in //(B)
prüft, ob der aktuelle Namen länger ist, als der im Zustand befindliche. Wenn ja, wird der Zustand mit dem neuen
längsten Namen ersetzt. Erwähnenswert hierbei ist, dass der Integrator kein Element in den Downstream sendet. Dies
geschieht erst im Finisher. Da der Integrator alle Elemente annimmt, könnte er auch als greedy initialisiert
werden. In //(C) wir der Combiner implementiert. Er bekommt die jeweiligen Zustände zweiter Threads als Parameter
übergeben und ermittelt den Zustand mit dem längsten Namen. Dieser wird dann zurückgegeben. Schließlich wird im
Finisher (//(D)) der am Ende aller Elemente im Zustand befindliche Name in den Downstream geschickt.
Zusammenfassung
Die neuen Funktionen der Gatherer geben den Stream-APIs von Java ein mächtiges Tool an die Hand. Insbesondere die
Möglichkeiten der freien Konfigurationen bieten erstaunlich viel Freiräume und quasi keine Grenzen für die
Anwendungsfälle. Gerade eine Kombination von Gatherern und Generics geben gerade für Framework-Entwicklungen viel
Spielraum den Nutzer:innen der Stream API viel “Magie” anzubieten. Aber auch für die klassischen Applikationsanwendungen
können clever genutzte Gatherer einen erheblichen Mehrwert im Bereich der Wartbarkeit durch Codereduzierungen bringen.
Weiterführende Beispiele und Erklärungen finden sich in [9].