This post is a part of a serie:
- Introduction to transducers
- Anatomy of a transducer
- Stateful transducers
- Early termination in transducers (this post)
- Functions which are using transducers
- Transducer exercises and solutions
Early termination and reduced?
values in transducers
Suppose that you want to write a transducer that uses some of the elements in the input stream and then ignore anything after it. It would be a waste to have to iterate on the remaining values of the stream if we do not use them.
(defn tired-map-transducer [func energy tired-answer]
(fn [rf]
(let [state (volatile! energy)]
(fn ([] (rf))
([result] (rf result))
([result input]
(rf result
(let [energy @state]
(if (pos? energy)
(do
(vreset! state (dec energy))
(func input))
tired-answer))))))))
(into []
(tired-map-transducer inc 5 :whatever)
(list 1 2 3 4 5 6 7 8 9 10))
; => [2 3 4 5 6 :whatever :whatever :whatever :whatever :whatever]
We certainly want to find a way to let the function calling the transducers to stop calling them.
We could do that in multiple ways, like sending a special value :enough
after the last data. But what if that special data :enough
appears in some input data stream? Sending it to the output would be misleadingly interpreted as the transducer's message to early terminate.
The Clojure core team came up with something that is very unlikely to appear in the input data stream by luck (or we could call it a bug and blame the user instead :-) ), and established it as a convention for transducers to inform their calling function about a value being the last one their would like to return.
That convention is: (reduced last-value)
(defn responsible-map-transducer [func energy]
(fn [rf]
(let [state (volatile! energy)]
(fn ([] (rf))
([result] (rf result))
([result input]
(let [energy @state]
(vreset! state (dec energy))
(cond-> (rf result (func input))
(<= energy 1) reduced)))))))
(into []
(responsible-map-transducer inc 5)
(list 1 2 3 4 5 6 7 8 9 10))
; => [2 3 4 5 6]
reduced
is simply returning a wrapped value. A value can be tested by the calling function as being wrapped or not by using the predicate reduced?
.
(reduced? 7)
; => false
(reduced? (reduced 7))
; => true
Are we done, then?
There is one more thing to check before calling it done.
Let's test our transducer a little more. Is it still going to work in those cases?
(into []
(comp
(responsible-map-transducer inc 5) ; Step 1
(responsible-map-transducer inc 3)) ; Step 2
(list 1 2 3 4 5 6 7 8 9 10))
; => [3 4 5]
; It works fine.
(into []
(comp
(responsible-map-transducer inc 3) ; Step 1
(responsible-map-transducer inc 5)) ; Step 2
(list 1 2 3 4 5 6 7 8 9 10))
; => [3 4 5]
; It works fine.
It all works fine. So why did I ask? Because you have to know why it still works.
The implementation of responsible-map-transducer
only took care of what it returns when it has no energy. Nothing special was done with the result of (rf result ...)
in the normal case, it was simply returned as is. If it was a reduced?
value, it means that the rf
function wanted an early termination. Our transducer has to make sure that in this case the returned value is reduced
.
It happens to be the case here because we simply returned what the rf
function returned, so we are in luck. This may not always be the case, so always pay attention and watch in both ways:
- Handle the case where your transducer wants an early termination,
- Handle the case where
rf
wants an early termination.
The devil is in the details
Did you think about testing that?
(into []
(comp
(responsible-map-transducer inc 3) ; Step 1
(responsible-map-transducer inc 3)) ; Step 2
(list 1 2 3 4 5 6 7 8 9 10))
You didn't? Too bad because you should have: It triggers a bug and you get an extremely unclear error message now.
; => ClassCastException clojure.lang.Reduced cannot be cast to clojure.lang.ITransientCollection clojure.core/persistent! (core.clj:3240)
Something happened when we returned a value which was wrapped at the same time by both transducers: (reduced (reduced last-value))
The function which is calling the transducers (i.e. into
) is supporting early termination by expecting either a value or a reduced
value, in which case it is getting the unwrapped value via (unreduced reduced-value)
. But instead of getting the value, it still gets a reduced value.
(reduced? (unreduced (reduced (reduced 7))))
; => true
How to fix it? Either we ask the Clojure core team to change all the functions which are using transducers to support multi-wrapped values, or either we avoid wrapping the values more than once.
"There is a function for that" --Rich Hickey
We can use the function ensure-reduced
from the standard library. Its implementation is very simple:
(defn ensure-reduced [x]
(if (reduced? x) x (reduced x)))
Now let's correct our transducer.
(defn correct-map-transducer [func energy]
(fn [rf]
(let [state (volatile! energy)]
(fn ([] (rf))
([result] (rf result))
([result input]
(let [energy @state]
(vreset! state (dec energy))
(cond-> (rf result (func input))
(<= energy 1) ensure-reduced))))))) ; <- The difference is here
(into []
(comp
(correct-map-transducer inc 3) ; Step 1
(correct-map-transducer inc 3)) ; Step 2
(list 1 2 3 4 5 6 7 8 9 10))
; => [3 4 5]
; Idiomatic way:
; (into []
; (comp
; (take 3) ; Step 1
; (map inc) ; Step 2
; (map inc)) ; Step 3
; (range 1 11))
What's next
You already know everything you need to implement your own transducers. I encourage you to try some ideas on your own. If you encounter blocking problems in your adventure, feel free to leave a comment and I will try to help you (when I have time).
In the next part, I talk about functions like into
which are using the transducers and how to write one.
Top comments (0)