DEV Community

Cover image for To `Gather` or not to `Gather`? That is the question.
Kosmik for Onepoint

Posted on • Edited on

4 2 2

To `Gather` or not to `Gather`? That is the question.

Mais de quoi va-t-on parler ?

De Java. Oui je préfère le dire dès le début. Cet article va parler de Java !
logo java

Les gatherers sont le premier ajout d'importance à l’API java.util.Streams depuis sa sortie, et on parle de 2014. Cela signifie qu’il ne s’était pas passé grand-chose depuis environ longtemps.
Arrivés en preview en Java 22, il sont finalement standards en Java 24.

Table des matières

Remise à niveau

Même si on ne va pas faire une revue complète de l'API java.util.Streams, je vous propose de commencer par un petit quizz, tranquilles, posés.

Shakespeare posant une question

Quiz 1/3

1 public class Quizz {
2    public static void main(String[] args) {
3        getPeople()
4            .map(person -> person.getName())
5            .toList();
6    }
7 }
Enter fullscreen mode Exit fullscreen mode

Admettons que la méthode getPeople renvoie une instance de Stream de 10 objets de type Person.

Ici, le code est simple, on applique la méthode map qui prend en paramètre une Function, qui extraie une String à partir d'une instance de Person en renvoyant le résultat de la méthode Person.getName().

La question est :

Combien de fois la ligne 4 est-elle invoquée ?

① 1 fois ?
② 10 fois ?
③ Non Jérôme, elle n'est pas invoquée la ligne 4.
It depends!

spoiler
Vous êtes-vous dit qu’il y avait plusieurs bonnes réponses ?

La bonne réponse est:

It depends!

En réalité, la question était mal posée. La question, aurait du être :

"Est-ce que la méthode map est invoquée une fois, 10 fois, ainsi de suite ?" ou "Est-ce que la fonction qui est passée en paramètre de la méthode map est invoquée ?"

La méthode map, est bien appelée une seule fois, par contre, la fonction qui lui est passée en paramètre, elle, est bien invoquée 10 fois.

Il faut systématiquement différencier les méthodes de l’API Stream, qui ne font que de la configuration de pipeline, et les fonctions/predicats/... qu’on leur passe.

Quiz 2/3

Bon, ok, on a compris. À partir de maintenant, la question portera systématiquement sur le nombre d’invocations de la Function passée en paramètre de la méthode map.

1 public class Quizz {
2   public static void main(String[] args) {
3        getPeople()
4            .map(person -> person.getName());
5    }
6 }
Enter fullscreen mode Exit fullscreen mode

La question est :

Combien de fois la ligne 4 est-elle invoquée ?
① 1 fois ?
② 10 fois ?
③ Non Jérôme, elle n'est pas invoquée la ligne 4.

spoiler

La bonne réponse est :

Non Jérôme, elle n’est pas invoquée la ligne 4.

Dans un Stream, il y a deux types d’opérations :

  • Des opérations intermédiaires.
  • Des opérations finales.

Les opérations intermédiaires ne font que configurer un pipeline d’exécution, elles ne déclenchent rien. Tant qu’on n’a pas appelé une méthode finale sur un stream, il ne se passe rien du tout.

Quiz 3/3

1 public class Quiz {
2     public static void main(String[] args) {
3         getPeople()
4             .map(person -> person.getName())
5             .count();
6     }
7 }
Enter fullscreen mode Exit fullscreen mode

La question est :

Combien de fois la ligne 4 est-elle invoquée ?
① 1 fois ?
② 10 fois ?
③ Non Jérôme, elle n'est pas invoquée la ligne 4.

spoiler

La bonne réponse est :

Non Jérôme, elle n’est pas invoquée la ligne 4.

Eeeeeeet oui, l’API stream est intelligente, et parmi les opérations terminales, il y en a certaines qui possèdent des raccourcis. Et c'est le cas de la méthode count qui est capable d’évaluer si toutes les opérations qui ont été exécutées avant elle peuvent avoir un impact sur la cardinalité de ce qu’il y a en sortie.

Ici, une seule opération map n’aura aucun impact sur la cardinalité, et donc, inutile de l’invoquer, ça n’a aucun intérêt. On n’a pas besoin de transformer des personnes en chaînes de caractères pour savoir qu’il y en à 10.

C'est assez important de comprendre que vous n'avez pas de garantie d'invocation. Et si, par exemple, vous faites partie de la team peek-
💀 ARRÊTEZ-CA MAINTENANT 💀-, et que vous utilisez un count, votre peek risquerait bien de ne jamais être invoqué.

Rappels

  • Il y a des méthodes intermédiaires : map, filter, etc.
  • Il y a des méthodes terminales : anyMatch, toList, count, etc.

    L’invocation d’une opération terminale est le seul déclencheur de l’utilisation d’un pipeline. Je peux enchaîner autant d’opérations intermédiaires que je veux, il ne se passera jamais rien.

    Et dès que j’appelle une opération terminale, je consomme mon stream, et je ne peux plus rien en faire. Et évidemment, un peu tautologique, je ne peux pas avoir deux méthodes terminales qui s’enchaînent.

  • Certaines opérations terminales possèdent des courts-circuits.

  • On peut implémenter un nombre infini ♾️ d'opérations terminales grâce à l'API Collector.

  • Il existe un nombre fini d'opérations intermédiaires fournies pas l'API

Évidemment, l’API gatherer est là pour rattraper cette terrible injustice et nous laisser la capacité de coder toutes les opérations intermédiaires que l’on veut.

Collectors vs Gatherers

Nous sommes habitués à l'API Collectors, mais tout de même rafraîchissons nous la mémoire.

Les Collectors

Collector collector;

Collectors.groupingBy(...);

stream.collect(collector);
Enter fullscreen mode Exit fullscreen mode

① L'interface Collector que nous devons implémenter.
② La classe Collectors, qui fourni un certain nombre de collectors déjà implémentés
③ L'utilisation d'un collector via la méthode Stream#collect

Et maintenant leurs jumeaux

Les Gatherers

Gatherer gatherer;

Gatherers.windowFixed(...);

stream.gather(gatherer);
Enter fullscreen mode Exit fullscreen mode

① L'interface Gatherer que nous devons implémenter.
② La classe Gatherers, qui fourni un certain nombre de gatherers déjà implémentés. Ici windowFixed qui accumule n éléments avant de les pousser dans le stream sous forme de liste.
③ L'utilisation d'un gatherer via la méthode Stream#gather

Code utilisé pour les exemples

À partir de maintenant, tous les exemples suivront le code suivant


public void main() throws IOException {
    Stream<Oeuvre> oeuvres = Reader.read().stream();
    prettyPrint(
       oeuvres.gather(
          filter(oeuvre -> oeuvre.titre().contains("N")) 
       )
    );
}

public record Oeuvre(
   String titre, 
   Integer anneeParution, 
   boolean perdue) {
}
Enter fullscreen mode Exit fullscreen mode

① C'est ici que nous placerons les gatherer custom

L'interface Gatherer

💡
Le code que nous allons regarder n'est pas le vrai code, mais une version épurée de l'interface.
Si vous voulez voir le vrai code, je rappelle que tout ceci est libre d'accès

package java.util.stream;

public interface Gatherer<T, A, R> {

    default Supplier<A> initializer(); 

    Integrator<A, T, R> integrator(); 

    default BinaryOperator<A> combiner(); 

    default BiConsumer<A, Downstream<? super R>> finisher(); 
}
Enter fullscreen mode Exit fullscreen mode

On peut déjà constater que l'interface fait une utilisation massive des génériques.

Pour les illustrer, prenons l'exemple d'un gatherer qui réimplémente l'opération intermédaire map (d'une instance d'Oeuvre vers une String en utilisant la méthode Oeuvre#titre()).

Le type T représente le type de l'objet entrant, ici Oeuvre. Le type A représente le type de l'état du gatherer (on y reviendra). Le type R représente le type de retour du gatherer, ici String.

① La méthode initializer

Elle permet d'initialiser l'état, si besoin. Elle possède une implémentation par défaut et renvoie un Supplier d'état.

⚠️
Attention, elle ne renvoie pas un nouvel état, mais un Supplier d'état.

② La méthode integrator

Son rôle est de retourner un Integrator. C'est l'objet qui va intervenir sur le stream, et sur lequel nous allons revenir tout au long des exemples.

⚠️
Attention, encore une fois, elle n'implémente pas la méthode qui agit. C'est une factory.

③ La méthode combiner

Elle renvoie un BinaryOperator<X>, c'est à dire, une BiFunction<X,X,X>. Son rôle est de combiner les états en cas d'exécutions parallèles.

④ La méthode finisher

Elle renvoie un BinaryConsumer, qui permet en cas de besoin d'exécuter une action en fin de traitement.

Let's code !

Oui, ok, t'es mignon, mais c'est quand même super abstrait ce que tu nous racontes là.

Et c'est vrai !

Je vous propose donc de redévelopper la méthode filter dont vous connaissez déjà le fonctionnement.

On recode la méthode filter

C'est un gatherer simple, donc nous n'aurons besoin que d'implémenter la méthode integrator.

Nous allons l'implémenter à base d'anonymous inner class. À l'ancienne.

package org.github.jtama.gatherornot;

import java.util.function.Predicate;
import java.util.stream.Gatherer;

public class Filter implements Gatherer<Oeuvre, Object, Oeuvre> {

    private final Predicate<Oeuvre> filter;

    Filter(Predicate<Oeuvre> filter) {
        this.filter = filter;
    }

    @Override
    public Integrator<Object, Oeuvre, Oeuvre> integrator() {
        return new Integrator<Object, Oeuvre, Oeuvre>() {
            @Override
            public boolean integrate( 
              Object state, 
              Oeuvre oeuvre, 
              Downstream<? super Oeuvre> downstream) { 
                if (filter.test(oeuvre)) {
                    return downstream.push(oeuvre);
                }
                return true;
            }
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

① L'état que nous ignorons pour l'instant.
② L'instance d'Oeuvre en cours de traitement dans le stream.
③ Le downstream représente ce qui vient après dans le stream.
④ La méthode retourne un boolean qui permet d'indiquer à l'API stream si le gatherer accepte d'autres éléments. Comme, il s'agit d'un filtre, nous renvoyons toujours true ou la propagation du résultat de la méthode downstream.push.

Ici, pas vraiment de difficulté, mais un code vraiment verbeux que l'on va pouvoir simplifier.

L'implémentation de la classe Integrator ne contient qu'une méthode. On peut donc écrire une lambda.

package org.github.jtama.gatherornot;

import java.util.function.Predicate;
import java.util.stream.Gatherer;

public class Filter implements Gatherer<Oeuvre, Object, Oeuvre> {

    private final Predicate<Oeuvre> filter;

    Filter(Predicate<Oeuvre> filter) {
        this.filter = filter;
    }

    @Override
    public Integrator<Object, Oeuvre, Oeuvre> integrator() {
        return (_, oeuvre, downstream) -> { 
            if (filter.test(oeuvre)) {
                return downstream.push(oeuvre);
            }
            return true;
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

① La variable state n'étant pas utilisée, on peux utiliser un _.

L'implémentation de la classe Filter ne contient également qu'une méthode. Donc rebelote, transformation en lambda.

package org.github.jtama.gatherornot;

import java.util.function.Predicate;
import java.util.stream.Gatherer;

public class Filter {
    public static Gatherer<Oeuvre, ?, Oeuvre> filter(Predicate<Oeuvre> filter) {
        return () -> (_, oeuvre, downstream) -> {
            if (filter.test(oeuvre)) {
                return downstream.push(oeuvre);
            }
            return true;
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

Et voilà. C'est plus court. C'est mieux.

Non.

On a beaucoup perdu en lisibilité, mais les interfaces Gatherer et Integrator offrent des méthodes utilitaires pour la regagner.

package org.github.jtama.gatherornot;

import java.util.function.Predicate;
import java.util.stream.Gatherer;
import java.util.stream.Gatherer.Integrator;

public class Filter {
    public static Gatherer<Oeuvre, ?, Oeuvre> filter(Predicate<Oeuvre> filter) {
        return Gatherer.of(
                Integrator.ofGreedy(
                    (_, oeuvre, downstream) -> {
                        if (filter.test(oeuvre)) {
                            return downstream.push(oeuvre);
                        }
                        return true;
                    }));
    }
}
Enter fullscreen mode Exit fullscreen mode

① On utilise ici la méthode ofGreedy (pour ceux qui ne parlent pas couramment anglais, greedy veut dire avide, allez tout de suite regarder le film Se7en), qui permet de dire à l'API stream que ce Gatherer n'interrompra jamais de lui même la consommation du stream, et qui permet à l'API stream de faire des optimisations.

À partir de maintenant, nous utiliserons toujours cette façon d'écrire le code.

Un stream avec un index ?

N'avez-vous déjà pas eu envie d'accéder à l'index de l'élément en cours de traitement ? N'avez vous pas déjà essayé l'implémentation suivante ?

Stream<Oeuvre> oeuvres = Reader.read().stream();
AtomicInteger index = new AtomicInteger(0);
oeuvres.map(value -> new Tuple<>(index.getAndIncrement(), value)));
Enter fullscreen mode Exit fullscreen mode

① On utilise un AtomicInteger pour conserver/incrémenter l'index
② En admettant que la classe Tuple existe

C'est une approche qui fonctionne très bien jusqu'à ce que quelqu'un ait la bonne idée d'ajouter un petit .parallel() avant.

Et là :
C'est le drame

Bon je me permets d'ajouter qu'une bonne vieille boucle for est certainement ce qu'il vous faut.

Mais, pas d'inquiétude, on va pouvoir arranger ça.

Commençons par noter, que pour la première fois depuis le début de l'article nous allons avoir besoin de quelque chose pour maintenir l'état.

Et comme l'API n'est pas trop mal faite, dans un Gatherer, le nom de ce concept est state. ¯\(ツ)

package org.github.jtama.gatherornot;

import java.util.stream.Gatherer;
import java.util.stream.Gatherer.Integrator;

public class WithIndex {

  public static Gatherer<Oeuvre, Counting, Tuple<Integer, Oeuvre>> withIndex() {

    return Gatherer.ofSequential( 
            () -> new Counting(), 
            Integrator.ofGreedy(
             (state, oeuvre, downstream) -> downstream.push(new Tuple<>(state.index++, oeuvre)))); 
  }

  static class Counting {
      int index;
  }
}
Enter fullscreen mode Exit fullscreen mode

① Notre Supplier d'état, ici une instance de la classe counting qui contient l'index
② L'implémentation est exactement la même
③ On utilise la méthode ofSequential, qui permet d'interdire l'exécution du gatherer en parallel, même si le développeur le demande.

Un groupingBy, mais pas terminal.

Pour rappel, la méthode groupingBy est une opération finale, je vous encourage à aller lire la doc si vous voulez en savoir plus.

Ce que l'on cherche à implémenter c'est une opération intermédiaire qui va regrouper un ensemble cohérent d'élément dans une liste avant de les relacher dans le stream.

Par exemple avec un stream contenant les œuvres de Shakespeare classées par date de parution, j'aimerai pouvoir regrouper les oeuvres par année. Et tant qu'on y est, j'aimerais pouvoir l'utiliser pour d'autres objets, avec d'autres critères de regroupement.

Cela signifie que nous allons faire un Gatherer générique.

Pour une fois, on va commencer en regardant l'utilisation !

Stream<Oeuvre> oeuvres = Reader.read().stream();
oeuvres.gather(series(Oeuvre::anneeParution)));
Enter fullscreen mode Exit fullscreen mode

① Je passe à ma factory de gatherer un extracteur de clef.

Et maintenant l'implémentation :

package org.github.jtama.gatherornot;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Gatherer;

public class Serie {

    public static <K,V> Gatherer<V, State, List<V>> series(Function<V,K> keyExtractor) {
        return Gatherer.ofSequential(
                State::new,
                Gatherer.Integrator.ofGreedy((state, value, downstream) -> {
                    // First invocation or the same key value
                    if (state.key == null || 
                            keyExtractor.apply(value).equals(state.key)) { 
                        state.values.add(value);
                        state.key = keyExtractor.apply(value);
                        return true;
                    }
                    var more = downstream.push(state.values);  
                    state.values = new ArrayList<>();
                    state.key = keyExtractor.apply(value);
                    state.values.add(value);
                    return more;
                }),
                (state, downstream) -> downstream.push(state.values));
    }

    public static class State<K,V> {
        private K key;
        private List<V> values = new ArrayList<>();
    }
}

Enter fullscreen mode Exit fullscreen mode

① Oui on va rester en séquentiel, sinon il faudra s'asseoir sur le fait d'avoir de la donnée ordonnée.
② L'état va maintenir la clef de regroupement et la liste pour l'accumulation des valeurs regroupées.
③ Si c'est le premier tour de boucle ou que la valeur de regroupement est égale à celle de l'état, on accumule et réclame plus d'éléments.
④ Sinon, on pousse les valeurs déjà accumulées en conservant , et puis on reinitialise l'état et on propage le retour de l'invocation de la méthode downstream.push.

Je me rend bien compte que ce gatherer est un peu plus compliqué, mais l'avantage, c'est que si je veux regrouper mes éléments en fonction de la première lettre du titre, je peux.

Stream<Oeuvre> oeuvres = Reader.read().stream();
oeuvres.sorted(Comparator.comparing(Oeuvre::titre))
        .gather(series(oeuvre -> oeuvre.titre().substring(0,1)));
Enter fullscreen mode Exit fullscreen mode

Et ça fonctionnerait même avec une hypothétique classe Person! Si je veux regrouper un stream de personne par année de naissance :

Stream<Person> persons = Reader.read().stream();
persons.gather(series(Person::birthDate);
Enter fullscreen mode Exit fullscreen mode

Et maintenant, on fusionne des streams !

Il n'est pas possible, simplement, à ce jour de fusionner des stream. Ce que je veux je veux obtenir est l'équivalent du join de RxJava

Mais en plus strict.

Illustration du résultat souhaité

Voilà comme ça.

Plus précisément je ne veux permettre que des paires complètes.

On va reprendre le principe de commencer par l'utilisation.

Stream<Oeuvre> oeuvres = Reader.readUnordered().stream();
prettyPrint(oeuvres.gather(merge(streamToBeMerged)));
Enter fullscreen mode Exit fullscreen mode

Le code ci-dessus devrait produire :
┌──────────────────────────┬─────────────────────────┐
│Revue de presse           │Titre                    │
├──────────────────────────┼─────────────────────────┤
│Beaucoup  de  bruits  pour│Peines d amour gagnées   │
│rien                      │                         │
├──────────────────────────┼─────────────────────────┤
│Je ne m en  souviens  même│Cardenio                 │
│plus.                     │                         │
├──────────────────────────┼─────────────────────────┤
│Jamais entendu parler     │La Tempête               │
├──────────────────────────┼─────────────────────────┤
│Numéro  1  sept   semaines│Les Deux Gentilshommes de│
│d affilées                │Vérone                   │
├──────────────────────────┼─────────────────────────┤
│Meilleure pièce de l année│Les Joyeuses Commères  de│
│                          │Windsor                  │
├──────────────────────────┼─────────────────────────┤
│Un chef d oeuvre          │Mesure pour mesure       │
└──────────────────────────┴─────────────────────────┘

Enter fullscreen mode Exit fullscreen mode

Et maintenant l'implémentation :

package org.github.jtama.gatherornot;

import java.util.Iterator;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Merge {

  public static <T,Y> Gatherer<Y, Iterator<T>, Tuple<T, Y>> merge(Stream<T> stream) {
    return Gatherer.ofSequential(
            stream::iterator, 
            Gatherer.Integrator.of(
             (state, item, downstream) -> {
                if (state.hasNext()) 
                   return downstream.push(new Tuple<>(state.next(), item));
                return false; 
              }));
    }
}
Enter fullscreen mode Exit fullscreen mode

① Pour savoir si il me reste quelque chose dans mon stream "à fusionner" je ne peux directement faire un Stream.hasNext, ou Stream.next (Cette notion n'est pas cohérente la philosophie Stream). Je dois avoir un accès direct aux éléments de la collection. Je dois passer par un Iterator.
② Si mon itérateur "à fusionner" en a encore dans le ventre, on pousse au dowstream.
③ Sinon on interrompt la consommation du stream. Je rappelle qu'on a dit qu'on ne voulait que des paires complètes. Et puis c'est mon code, et je fais ce que je veux. Si vous voulez une autre implémentation, je ne vous empêche pas.

On y va ou pas ?

Alors que cet article touche à sa fin, j'espère vous avoir montré qu'il existe en effet des cas pour lesquels les Gatherer vont nous permettre de répondre à de réels besoins. J'aimerais aussi attirer votre attention sur le fait que si ils ont mis autant de temps à arriver, c'est certainement parce qu'on peut déjà faire beaucoup avec l'existant, pourvu qu'on prenne le temps de regarder ce que l'on a déjà à disposition.

Vous trouverez dans le dépôt github joint, tout le code présenté et même plus.

To Gather, or not to Gather, that is the question.

title image


Run locally

Use JBang !

Jbang logo


Slides

Readable version available on Github Pages → ici

Generate

jbang qrcode@maxandersen -i slides/images/qr_inlay.png <open feedback url> --qr-colo="linear-gradient(90deg, rgba(36,14,0,1) 0%, rgba(9,121,105,1) 35%, rgba(0,212,255,1) 100%);"
jbang qrcode@maxandersen -i slides/image/github.png https://github.com/jtama/to-gather-or-not-to-gather
podman container run --rm -v $(pwd)/slides:/documents -w /documents asciidoctor/docker-asciidoctor:1.80.0 asciidoctor-revealjs -r asciidoctor-diagram index.adoc
Enter fullscreen mode Exit fullscreen mode

Run locally

podman container run --name prez --rm -d -p 8080:80 -v $(pwd)/slides:/usr/share/nginx/html nginx
podman container run --name coder --rm -d -p 8443:8443 -v $(pwd)/app:/config/workspace ghcr.io/jtama/java_jbang_codeserver:latest
Enter fullscreen mode Exit fullscreen mode








Et surtout n'oubliez pas :

Rester curieux

Image of Datadog

Create and maintain end-to-end frontend tests

Learn best practices on creating frontend tests, testing on-premise apps, integrating tests into your CI/CD pipeline, and using Datadog’s testing tunnel.

Download The Guide

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

If you found this post helpful, please leave a ❤️ or a friendly comment below!

Okay