DEV Community

Cover image for Stream: a library to lazily process data with PHP.
Bruno Clemente
Bruno Clemente

Posted on

Stream: a library to lazily process data with PHP.

First of all, when we are talking about streams inside the PHP universe, the thing that will probably go to your head is the streamable resources of PHP. This is not what I will talk about today. Stream is a PHP library that helps us to define a pipeline of data transformations ergonomically and lazily. It is named "stream" because it was heavily inspired by the Elixir Stream.

So, let's take a look at it. First, add it to your project using composer composer require ebanx/stream. Then, we can create a very simple example to illustrate how it works.

use EBANX\Stream\Stream;

$result = Stream::rangeInt(0, 5)
    ->map(function (int $value): int {
        return $value + 3;
    })
    ->filter(function (int $value): bool {
        return $value % 2 === 0;
    })
    ->collect();
print_r($result);
Enter fullscreen mode Exit fullscreen mode

The output of this example should be:

Array 
( 
   [0] => 4
   [1] => 6
   [2] => 8
)
Enter fullscreen mode Exit fullscreen mode

Ok, now we need to understand what is happening here. The first thing that we do is to create a Stream. There are 4 methods that allow us to create a stream: Stream::of, Stream::ofKeyValueMap, Stream::rangeInt, Stream::rangeFloat. The last two allow us to create a stream by providing a numeric range. Note that this is lazy, which means that each element of the range will be only created when you consume it. We can create the following Stream and it would not use a crazy amount of memory from your computer: Stream::rangeInt(PHP_INT_MIN, PHP_INT_MAX).

The other two stream creators allow us to create a stream providing an iterable to it. The Stream::of will ignore any key that each element might have. Stream::ofKeyValueMap will transform each element into an array tuple where the first element is the key and the second one is the element value.

Now, in the example, following the stream creation there are two transformation methods. The first one is the map. In the map, we pass to it a callable. This callable will be called for each element that is being processed and pass on the value that the callable returns to the stream chain. So, in our example, we add 3 to each element of the stream.

The next transformation method is the filter. Here we also pass a callable that will receive as a parameter each element of the stream. But this callable needs to return a boolean. If it returns true, the element will follow to the next step of the stream. If it returns false, the element will not continue and it will be filtered out. So in this case, we are only keeping the even numbers.

The last method in the chain is the collect. This is the only eager method that we saw this far. This method will consume the entire stream by collecting each element into an array. In the end, it returns this array. After calling this method, the original stream goes into an invalid state and you cannot work with it anymore. All methods that consume the stream have commentaries that point that out.

A little more focus on the Lazy part.

The Stream tries to be as lazy as it can. Because of that, the following code will not output anything.

use EBANX\Stream\Stream;

$result = Stream::of([1, 2])
    ->map(function (int $value): int {
        echo "Original Element $value\n";
        return $value * 5;
    })
    ->map(function (int $value): int {
        echo "After first map $value\n";
        return $value / 2;
    })
Enter fullscreen mode Exit fullscreen mode

This happens because map is lazy. If no one tries to use/consume the stream, we will never apply the map. So, let's try again by adding a 'sum' to the end.

use EBANX\Stream\Stream;

$result = Stream::of([1, 2])
    ->map(function (int $value): int {
        echo "Original Element $value\n";
        return $value * 5;
    })
    ->map(function (int $value): int {
        echo "After first map $value\n";
        return $value / 2;
    })
    ->sum();
echo "The sum is $sum";
Enter fullscreen mode Exit fullscreen mode

This should output the following:

Original Element 1
After first map 5
Original Element 2
After first map 10
The sum is 7
Enter fullscreen mode Exit fullscreen mode

Note the order. We don't apply the first map to all elements and then apply the second one. We apply all transformations to each element as soon as they are consumed. This allows us to execute fewer transformations in case we are using something like take, where we can take only N elements of the stream.

The reduce.

There are a lot of different transformations and ways to consume your streams. But I want to focus a little bit on the reduce. The reduce is an eager method so it consumes the stream. It receives an accumulator initial value and a callable. Then, for each element of the stream the callable will receive the current value of the accumulator and the current element. The return of the callable will be the new value of the accumulator. After the last iteration, it returns whatever is in the accumulator.

The powerful thing about the reduce is that all other methods that consume the stream could be written using the reduce. The only reason that we added the other methods is for easy-of-use. For example, here we are implementing the sum using the reduce.

$result = Stream::rangeInt(0, 5)
    ->reduce(0, function(int $acc, int $value): int {
        return $acc + $value;
    })
Enter fullscreen mode Exit fullscreen mode

Arrays in the parameters.

Let's try to implement the collect method using the reduce:

$result = Stream::rangeInt(0, 5)
    ->reduce([], function(array $acc, int $value): array {
        $acc[] = $value;
        return $acc;
    })
Enter fullscreen mode Exit fullscreen mode

If you execute it, you will see that it will work as expected. But what about performance? Let's create a range of 100000 entities and check the performance using reduce and using collect. These are the results on my machine:

Using collect:

real    0m0.128s
user    0m0.076s
sys 0m0.032s
Enter fullscreen mode Exit fullscreen mode

Using reduce:

real    1m29.543s
user    0m46.033s
sys 0m43.108s
Enter fullscreen mode Exit fullscreen mode

Wow, that is a BIG difference. Why does it take so much more to execute it using reduce?? Well, that is because of PHP and arrays.

PHP will pass an array by value in functions parameters. What this means is that every time that we try to change and array that we received as a parameter, PHP will clone the entire array to a new memory location. In our example, we are doing this 100k times and every time the array is a little bit longer.

A way around this issue is to receive the array as a reference. Here is the same example using references and the amount of time that it took to process the same amount of entities.

use EBANX\Stream\Stream;

$result = Stream::rangeInt(0, 100000)
    ->reduce([], function(array &$acc, int $value): array {
        $acc[] = $value;
        return $acc;
    });
Enter fullscreen mode Exit fullscreen mode
real    0m0.136s
user    0m0.085s
sys 0m0.033s
Enter fullscreen mode Exit fullscreen mode

To finish it all up

Stream is a very useful and addictive library. You can get more details of all methods on it by taking a look at our unit tests.

It is under a MIT license so feel free to go wild with it. Cheers.

Top comments (0)