======================
== OpenKnowledgeHub ==
======================

Stream Gatherers

java feature jse24

Wo kommen wir her?

Mit der Einführung der Stream API in Java SE 8 (insbesondere mit den JEP 107 Bulk Data Operations for Collections, JEP 109 Enhance Core Libraries with Lambda und JEP 126 Lambda Expressions & Virtual Extension Methods) haben sich die Möglichkeiten des “Wie” ein Java Projekt umgesetzt wird grundlegend erweitert. Durch die Annäherung und syntaktische Unterstützung von funktionaler Programmierung eröffnete sich ein ganz neuer Horizont für die Java Welt.

Doch seit dem Start von OpenJDK 8 am 18.03.2014 (Release Page) haben sich kaum nennenswerte Erweiterungen in der API ergeben. Lediglich kleinere Anpassungen wie zum Beispiel der JEP 269: Convenience Factory Methods for Collections in dem Java SE 9 Release haben kleinere Features in den Umgang mit Streams gebracht.

Aufbau Streams

Im Wesentlichen besteht ein Stream in Java aus drei Hauptteilen:

  • einer Quelle (source)
  • beliebig vielen Zwischenstufen (intermediate operation)
  • einem Endpunkt (terminal operation)
10public void runStream() {
11    Stream.of(1, 2, 3, 4, 5)                    // <- Quelle
12            .map(element -> element * element)  // <- Zwischenstufe
13            .forEach(System.out::print);        // <- Endpunkt
14    // Output: 1491625
15}

Java Streams unterliegen eine lazy evaluation - das heißt, dass erst dann Elemente in den Zwischenstufen (intermediate operation) verarbeitet werden, wenn das Ergebnis auch wirklich gebraucht wird, also wenn ein Endpunkt (terminal operation) angegeben ist.

Motivation für Gatherers

Wie weiter oben schon festgestellt wurde, hat sich der grundlegende Umfang von dem Stream API seit der Einführung nicht mehr wirklich verändert. Das betrifft insbesondere die Funktionen der intermediate operations, also der Zwischenstufen. Hier sind bis dato Java Entwicklungen auf die Angebote beschränkt, welche in java.util.stream.Stream definiert werden. Zu den bekannteren Methoden zählen:

  • filter
  • map
  • sorted
  • distinct

Gerade die Verknüpfung mehrerer dieser Methoden bieten durchaus vielseite Einsatzmöglichkeiten. Jedoch stoßen auch sie an ihre Grenzen. Wenn wir zum Beispiel eine Liste von Objekten haben und diese nach einem anderen Kriterium als die Objekt-eigenen equals und hashCode Implementierung eindeutig (distinct) filtern wollen, wäre dies nicht ohne Umwege möglich.

10public void distinctByCustom() {
11    Stream.of("maria", "phil", "anna", "jo")    // <- Quelle
12            .distinctBy(String::length)         // <- gibt es so nicht
13            .forEach(System.out::print);        // <- Endpunkt
14    // Erwarteter Output: mariaphiljo
15}

Oder auch die gerade aus der SQL bekannten window Funktion, ist nur mit erheblichem Mehraufwand implementierbar:

10public void windowFunction() {
11    Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)     // <- Quelle
12            .windowFixed(3)                     // <- gibt es so nicht
13            .limit(2)                           // <- Zwischenstufe
14            .forEach(System.out::print);        // <- Endpunkt
15    // Erwarteter Output: [0, 1, 2][3, 4, 5]
16}

Um nicht unzählige einzel Problem bezogene zusätzliche Funktionen in der Stream-Klasse zu ergänzen, wurde nach einer generischen Lösung gesucht - die Stream::gather(Gatherer) Zwischenstufe soll sich dieser Herausforderung annehmen.

Aufbau eines Gatherers

Stream::gather als intermediate operation und das Gatherer Interface kamen mit Java 22 als Preview in die Java Welt und werden mit Java 24 vollständig released. Wie auch schon bei den bestehenden Zwischenstufen ist ein Gatherer recht simple zusammengefasst:

A gatherer represents a transform of the elements of a stream (JEP 461)

Jedoch ist hierbei noch im Vorhinein festgelegt, wie genau die Transformation aussehen soll - genau hier liegen die Stärken der Gatherer.

Im Groben besteht ein Gatherer aus 4 Komponenten:

  • Integrator
  • Initializer
  • Finisher
  • Combiner

Nur der Integrator ist zwingend notwendig, um einen Gatherer zu implementieren. Die drei anderen Komponenten werden für komplexere Problemstellungen benötigt.

Integrator

Das Integrator Interface ist ein Functional Interface mit der zentralen Methode integrate:

10
11@FunctionalInterface
12public interface Integrator<A, T, R> {
13    boolean integrate(A state, T element, Downstream<? super R> downstream);
14    // . . .
15}

Der state ist der aktuelle Zustand des Gatherers und kann verwendet werden, um einen Zustand über die Verarbeitung einzelner Elemente hinaus zu teilen. Mehr dazu weiter unten. Das element ist vom Upstream des Streams kommende aktuelle Element, welches transformiert werden soll. Die Referenz auf den Downstream hat aus dem Functional Interface die Methode boolean push(T element), welche verwendet wird, um das transformierte Element an die nächste Ebene im Stream zu senden. Als Rückgabewert der integrate Methode wird ein boolean erwartet. Dieser zeigt an, ob ein Element an den Downstream gesendet wurde oder nicht. Die Downstream::push Methode gibt den booleschen Wert nach der gleichen Logik, nur bezogen auf die nächste Ebene des Streams, zurück.

Mit dem Wissen kann jetzt ein simpler Gatherer geschrieben werden:

10public Gatherer.Integrator<Void, Integer, Integer> squaredIntegrator() {
11    return Gatherer.Integrator.ofGreedy(
12            (state, element, downstream) -> {
13                int newElement = element * element;
14                return downstream.push(newElement);
15            });
16}
17
18public void runStreamWithIntegrator() {
19    Stream.of(1, 2, 3, 4, 5)
20            .gather(Gatherer.of(squaredIntegrator()))
21            .forEach(System.out::println);
22
23    // output: 1491625
24}

Ein besonderes Augenmerk sollte auf Gatherer.Integrator.ofGreedy geworfen werden. Ein greedy Integrator hat keine eigene Logik, ob er ein weiteres Element aus dem Stream verarbeitet oder nicht. Er verlässt sich dabei einzig und allein auf den Downstream. Diese Angabe ermöglicht der JVM unter der Haube etwas an Optimierungen vorzunehmen. Sollte der Integrator selbst entscheiden, ob ein Element vom Upstream verarbeitet wird oder nicht, kann ein simples Gatherer.Integrator.of genutzt werden.

Mit dem Aufrufen von .gather(Gatherer.of(integrator)) kann jetzt einfach ein neuer Gatherer mit dem eigens definierten Integrator für den Stream genutzt werden.

Initializer

Der optionale Intilizer wird verwendet, um dem Gatherer einen Zustand zu geben. Der im vorherigen Abschnitt gezeigte Gatherer ist zustandslos - er hat keinerlei Informationen neben dem aktuellen Element. Als Intializer erwartet das Gatherer Interface einen Supplier, welcher den initialen Status (Zustand) erzeugt. Um zum Beispiel einen Zähler zu implementieren, der nach x Elementen stoppt, kann einfach ein AtomicInteger genutzt werden:

10public Gatherer.Integrator<AtomicInteger, Integer, Integer> squaredIntegratorWithLimit(int limit) {
11    return Gatherer.Integrator.of(                                  // (A)
12            (state, element, downstream) -> {
13                if (state.getAndIncrement() < limit) {
14                    int newElement = element * element;
15                    return downstream.push(newElement);             // (B)
16                } else {
17                    return false;                                   // (C)
18                }
19            });
20}
21
22public void runStreamWithIntegrator() {
23    Supplier<AtomicInteger> initializer = AtomicInteger::new;
24    Gatherer.Integrator<AtomicInteger, Integer, Integer> integrator = squaredIntegratorWithLimit(3);
25
26    Stream.of(1, 2, 3, 4, 5)
27            .gather(Gatherer.ofSequential(initializer, integrator)) // (D)
28            .forEach(System.out::print);
29
30    // output: 149
31}

Von dem Grundaufbau hat sich zu dem vorherigen Beispiel nicht viel geändert. Besonders betrachtet werden sollten die Stellen:

  • (A): Da der Integrator jetzt selbst entscheidet, ob ein Element weiter verarbeitet wird oder nicht, sollte das ofGreedy durch of ersetzt werden.
  • (B): Wenn der aktuelle Zustand (state.getAndIncrement()) kleiner als das übergebene limit ist, wird das Element wie schon vorher einfach an den Downstream übergeben.
  • (C): Wenn der aktuelle Zustand größer oder gleich dem limit ist, wird die Verarbeitung des Elements abgelehnt.
  • (D): Da nun ein zustandsbehafteter Gatherer genutzt wird, welcher keinen Combiner hat, darf dieser nicht parallel in mehreren Threads laufen. Durch Gatherer.ofSequential wird dies sichergestellt.

Finisher

Wie der Name Finisher schon vermuten lässt, wird dieser Part zum Ende eines Gatherers benötigt. Doch wofür genau und was heißt am Ende? Um dies genauer zu beleuchten, schauen wir uns das Beispiel der windowFunction von oben genauer an. Eine windowFunction teilt eine Menge von x Elementen in eine Liste von Teilmengen von einer Länge von y. So wird zum Beispiel aus [1,2,3,4,5,6] ein [[1,2], [3,4], [5,6]]. Was passiert aber, wenn x kein Vielfaches von y ist? Also anders formuliert: Was passiert mit dem Zustand eines Gatherers nachdem das letzte Element eines Streams verarbeitet wurde?

Hier kommt der Finisher ins Spiel. Dieser ist eine Implementierung des BiConsumer Interfaces und erwartet als ersten Parameter A den state des Gatherers und als zweiten Parameter den Downstream des Streams:

10BiConsumer<A, Downstream<? super R>> finisher;

Mit diesem Wissen kann das Problem der “losen” Elemente bei einer windowFunction Implementierung gelöst werden.

10public Gatherer.Integrator<List<Integer>, Integer, List<Integer>> windowFunctionIntegrator(int windowSize) {
11    return Gatherer.Integrator.of(
12            ((state, element, downstream) -> {
13                state.add(element);
14
15                if (state.size() == windowSize) { // (A)
16                    boolean downstreamResult = downstream.push(List.copyOf(state));
17                    state.clear();
18                    return downstreamResult;
19                }
20
21                return true;
22            }));
23}
24
25public BiConsumer<List<Integer>, Gatherer.Downstream<? super List<Integer>>> windowFunctionFinisher() {
26    return (state, downstream) -> {
27        if (!state.isEmpty()) { // (B)
28            downstream.push(List.copyOf(state));
29            state.clear();
30        }
31    };
32}
33
34public void runGathererWithFinisher() {
35    int windowSize = 3;
36
37    Stream.of(1, 2, 3, 4, 5)
38            .gather(
39                    Gatherer.ofSequential(
40                            ArrayList::new, windowFunctionIntegrator(windowSize), windowFunctionFinisher())) // (C)
41            .forEach(System.out::print);
42    // output: [1, 2, 3][4, 5]
43}

Im Gegensatz zu den Beispielen weiter oben wurde nun in (C) ein Finisher dem Gatherer bei der Initialisierung übergeben. Integrator und Finisher sind dabei recht nah miteinander verzahnt, da der Finisher in der Regel auf der Logik des Integrators aufbaut. So auch in diesem Fall: In (A) wird geprüft, ob schon genügend Elemente im aktuellen state liegen, ob ein window zu füllen. Nur wenn dies der Fall ist, wird das Fenster in Gänze an den Downstream gepushed. Somit kann es passieren, dass am Ende alle Elemente in dem Zustand des Gatherers noch Elemente liegen, welche über den Finisher in (B) aufgesammelt und ebenfalls an den Downstream gesendet werden.

Combiner

Als letzte Komponente für einen Gatherer bleibt der Combiner übrig. In verschiedenen Endpunkten der Stream-Klasse, wie zum Beispiel der collect oder reduce Methode, kommt der Begriff combiner schon vor. Wie auch an diesen Stellen, dient der Combiner dazu, einen Gatherer für die parallele Verarbeitung in mehreren Threads fit zu machen. Ein Combiner wird dann notwendig, wenn ein zustandsbehafteter Gatherer in einem parallel ausgeführten Stream genutzt wird. Ist dies der Fall, behandelt die Implementierung des Combiner-Interface die Kombinierung der jeweiligen Zustände in den unterschiedlichen Threads.

Aus einer Liste von Namen soll der längste Name gefunden werden und für die weitere Verarbeitung in die Stream-Pipline geschickt werden. Hierzu hält der Zustand des Gatherers den aktuell längsten Namen. Der Finisher schickt den längsten Namen dann für die weitere Verarbeitung an den Downstream des Streams.

10public void runGathererWithCombiner() {
11    Stream.of("Anna", "Pete", "Suzanne", "Kay", "Mike")
12            .parallel()
13            .gather(
14                    Gatherer.of(
15                            AtomicReference<String>::new, //(A)
16                            Gatherer.Integrator.of(
17                                    (state, element, downstream) -> {
18                                        String currentLongestName = state.get();
19
20                                        if (Objects.nonNull(element)
21                                                && (Objects.isNull(currentLongestName)
22                                                || element.length() > currentLongestName.length())) {
23                                            state.set(element); //(B)
24                                        }
25
26                                        return true;
27                                    }),
28                            (stateA, stateB) -> { //(C)
29                                String longestNameFromA = stateA.get();
30                                String longestNameFromB = stateB.get();
31
32                                if (longestNameFromA == null) {
33                                    return stateB;
34                                } else if (longestNameFromB == null) {
35                                    return stateA;
36                                } else if (longestNameFromA.length() > longestNameFromB.length()) {
37                                    return stateA;
38                                } else {
39                                    return stateB;
40                                }
41                            },
42                            (state, downstream) -> { //(D)
43                                String longestName = state.get();
44
45                                if (Objects.nonNull(longestName)) {
46                                    downstream.push(longestName);
47                                }
48                            }))
49            .forEach(System.out::println);
50    // output: Suzanne
51}

In dem Code-Snippet oben ist ein Gatherer einmal in dem vollen Umfang und allen Unterkomponenten aufgebaut. Es wird ein paralleler Stream von Namen erstellt. Bei //(A) wird der initiale Zustand angelegt. Der Integrator in //(B) prüft, ob der aktuelle Namen länger ist, als der im Zustand befindliche. Wenn ja, wird der Zustand mit dem neuen längsten Namen ersetzt. Erwähnenswert hierbei ist, dass der Integrator kein Element in den Downstream sendet. Dies geschieht erst im Finisher. Da der Integrator alle Elemente annimmt, könnte er auch als greedy initialisiert werden. In //(C) wir der Combiner implementiert. Er bekommt die jeweiligen Zustände zweiter Threads als Parameter übergeben und ermittelt den Zustand mit dem längsten Namen. Dieser wird dann zurückgegeben. Schließlich wird im Finisher (//(D)) der am Ende aller Elemente im Zustand befindliche Name in den Downstream geschickt.

Zusammenfassung

Die neuen Funktionen der Gatherer geben den Stream-APIs von Java ein mächtiges Tool an die Hand. Insbesondere die Möglichkeiten der freien Konfigurationen bieten erstaunlich viel Freiräume und quasi keine Grenzen für die Anwendungsfälle. Gerade eine Kombination von Gatherern und Generics geben gerade für Framework-Entwicklungen viel Spielraum den Nutzer:innen der Stream API viel “Magie” anzubieten. Aber auch für die klassischen Applikationsanwendungen können clever genutzte Gatherer einen erheblichen Mehrwert im Bereich der Wartbarkeit durch Codereduzierungen bringen.

Quellen