Cover image for Elixir Stream and large HTTP  responses: processing text

Elixir Stream and large HTTP responses: processing text

alvisesus profile image Alvise Susmel Originally published at poeticoding.com on ・13 min read

Are you passionate about Elixir and Phoenix? Subscribe to the Poeticoding newsletter and join happy regular readers and receive new posts by email.

You find the code, of this and the previous article, on poeticoding/httpstream_articles GitHub repo. The code you find at this repo is not meant for production use, is just part of the experiments of these articles.

In the last article we've put together the concepts of HTTP async response and processing large files using Elixir Streams, to be able to easily download and save a large file using our HTTPStream module.

The stream we've built around HTTPoison is able to emit the chunks of an HTTP chunked response. In this way we avoid the huge memory impact we could have saving the whole response into memory.

In the last example of the previous article, each chunk is streamed down to the pipeline's functions, saved into our local file and then garbage collected. Then, it was easy to add compression just including the StreamGzip.gzip function in the pipeline.

Text response

Previously we where treating the response just as a binary, delegating the compression to the StreamGzip library and savings the result into a file.

In this part we want to process the lines of a large text file. To make it fun and a bit more realistic, we are going to get some inspiration from the first day of the Advent of code challenge. There are also some cool videos made by JosΓ© Valim, where he uses Elixir to solve the challenges of the Advent of Code.

I've generated and uploaded a 125Mb text file at this link


with 30 million lines. In each line there is a random integer from -1000 to 1000


We want to:

  • 1. process this text file on the fly, while downloading it, line by line.
  • 2. like we did in the first part, we want to avoid big memory spikes (we cannot load the full response into memory)
  • 3. last but not least, if we want to process just the first 30 lines of text, we just want to download the first few chunks needed.

To do just some tests, you can also find a smaller version of 4.5Mb with 1 million lines


which is the same you find in the poeticoding/httpstream_articles repo.

What we are going to build

To process an HTTP response line by line, we need something that works like

File.stream! "numbers.txt" #, [], :line

which creates a stream that opens a file and, by default, emits each line. We want to do something similar with our HTTP async response.

Instead of changing the current HTTPStream module implementation, we want to take full advantage of the Elixir Stream's composability, writing a function we just add to the pipeline, that converts the chunks to lines.

Chunks to Lines

Ok, but why can't we just use the implementation we've done already for the binary file?

The problem with the previous implementation is that the stream we've built emits chunks without distinguishing the lines. We need an Elixir Stream that emits lines instead of chunks.

Unfortunately this is not as easy at it seems: splitting chunks into lines is not enough. This could be true only in a easy and specific case:


where each element of the list is a chunk and each chunk ends with a new line. In this easy case we just need to split each chunk, filter out the empty strings, emitting the result

iex> String.split("767\n138\n")

The problem though, is that we do not have any guarantee that all the chunks will be in this form. It's actually more than likely that multiple parts of the same line will be over different chunks


You see how in this case the first two chunks don't end with a newline character \n and the second chunk starts with the final part of the line started in the previous chunk.


We are now going to implement a function HTTPStream.lines(chunks_enum) which takes a stream of chunks as an input and returns a stream of lines.

Chunks to Lines

We'll then use this function as part of our pipeline in this way:

|> HTTPStream.lines()
|> Stream.map(fn line -> ... end)

We don't want to use Enum.reduce since this function is greedy and it would hold all the lines in memory. Instead, we can use Stream.transform which is lazy

Stream.transform(enum, initial_accumulator, fn element, acc ->
{list_of_elements_to_emit, new_accumulator}

The first element of the returned tuple is a list of elements we are going to emit, in our case lines.

accumulating final part of the chunk and using it when processing next chunk

We see from the image that the accumulator acc is the last part of the first chunk and is prepended to the second chunk.

We need a function, we've called next_lines, that splits a chunk into separate lines and returns a tuple with two elements: the lines we want to emit and the last part of the chunk.

Stream.transform(chunks_enum, "", fn chunk, prev ->
  {lines, last_part} = next_lines(chunk, prev)
  {lines, last_part}

Our initial accumulator is an empty string. This empty string will be passed as prev while processing the first chunk.

Recursive implementation

We now need to write the next_lines(chunk,prev) function. We can implement it using recursion, going through each single UTF-8 character looking for newlines. Remember that to behave like File.stream! we need to preserve the newlines \n.

# https://github.com/poeticoding/httpstream_articles/blob/36bc2167b7024a990b04b28f9447fb9bc0e0310e/lib/http_stream.ex#L78

def next_lines(chunk, prev), do: next_lines(chunk,prev,[])

def next_lines(<<"\n"::utf8, rest::binary>>, prev, lines) do
next_lines(rest,"",[prev <> "\n" | lines])

def next_lines(<<c::utf8, rest::binary>>, prev, lines) do
  next_lines(rest,<<prev::binary, c::utf8>>,lines)

def next_lines(<<>>, prev, lines), do: {Enum.reverse(lines), prev}

Ok, there is a lot happening here. Let's start from the beginning

  • next_lines(chunk,prev\\"")
    The first clause is just a helper. We pass a chunk, and the accumulator prev. The function calls next_lines/3 , passing an empty list of lines as third argument.

  • next_lines(<<"\n"::utf8, rest::binary>>, prev, lines)
    We are pattern matching a sequence of UTF-8 characters. This function is called only when we reach a newline character. We then call recursively next_lines passing the rest of the chunk we need to process, setting the accumulator to an empty string "", passing list of lines where we've prepended the accumulated line, prev.

  • next_lines(<<c::utf8, rest::binary>>,prev,lines)
    Since every time c is a newline the clause above is matched, in this clause c != "\n" so we just need to append it to prev and recursively call next_lines going through the rest of the chunk.

  • next_lines(<<>>,prev,lines)
    <<>> is an empty binary and means we've reached the end of the chunk. For performance reason we've pepended the lines.
    [prev <> "\n" | lines] is faster than lines ++ [prev], especially when the lines list is big. When we reach the end of our recursion, we need to reverse the lines' list.

Let's try this function on iex

# ["767\n13","8\n-701\n98\n-","504\n22"]
iex> {lines, prev} = HTTPStream.next_lines "767\n13"
{["767\n"], "13"}
iex> {lines, prev} = HTTPStream.next_lines "8\n-701\n98\n-", prev
{["138\n", "-701\n", "98\n"], "-"}
iex> {lines, prev} = HTTPStream.next_lines "504\n22", prev
{["-504\n"], "22"}
iex> prev

Perfect, exactly what we need πŸ‘. We go through the chunks' list passing the obtained prev to the next call.


next_lines returns the same tuple we need to return in the reducer function passed to Stream.transform/3. We can then write HTTPStream.lines/1 in a nice and compact way

def lines(chunks_enum) do
|> Stream.transform("",&next_lines/2)

Let's try it on iex

iex> ["767\n13","8\n-701\n98\n-","504\n22"] \
...> |> HTTPStream.lines() \
...> |> Enum.each(&IO.inspect/1)

Mmm πŸ€” ... there is something wrong here. The last line "22" is missing.

Emitting last line

The reason why it's not emitted is because it doesn't end with a newline and it remains stuck as an accumulator (prev). We have to emit it when the stream is ended, but using Stream.transform/3 the reducer function doesn't know when the stream is going to end! (Please let me know in the comments if you know there is a way to catch the end of a stream)

A workaround we can use, to let next_lines/2 know when the stream reached the end, is to add an :end atom at the end of our chunks' stream. next_lines/2 than has to handle the case with a specific clause

def next_lines(:end,prev), do: { [prev], ""}

which emits the final line. The accumulator is set to an empty string but it could be anything at this point.

Let's try it again on iex

iex> ["767\n13","8\n-701\n98\n-","504\n22", :end] \
...> |> HTTPStream.lines() \
...> |> Enum.each(&IO.inspect/1)

Great, it works! πŸŽ‰

But now how can we easily add a :end atom at the end of our HTTP chunked response stream?

Emitting :end at the end of the streamed HTTP response

If you have an alternative way of doing this, please share it in the comments section below! πŸ‘©β€πŸ’»πŸ‘¨β€πŸ’»

We need to make a small but significant change to our HTTPStream.get(url) function.

# HTTPStream.get/1
def get(url) do

    # next_fun
      #first clause
%HTTPoison.AsyncResponse{id: id}=resp ->
    receive do
%HTTPoison.AsyncEnd{id: ^id}->
  # emitting :end
{[:end], {:end, resp}}

#second clause
    {:end, resp} -> 
    {:halt, resp}

  • 1. When we receive the %HTTPoison.AsyncEnd{} message we know that we've reached the end of the HTTP response. Instead of just halting the stream, we emit the :end and set a new accumulator {:end, resp}, where resp is the %HTTPoison.AsyncResponse{} struct.

  • 2. After emitting :end, next_fun is called again. This time the accumulator is the one we've just set, {:end, resp}, which pattern matches the second clause of our next_fun.

AsyncEnd, emits :end and :halt

Something I don't like about this change, is that now we always have to handle the final :end, especially when saving the stream into a file.

|> Stream.reject(&match?(:end,&1))
|> Stream.into(File.stream!("image.tiff.gz"))
|> Stream.run()

The function in second line pattern matches each chunk and filters out the :end atom.

It's maybe better to enable and disable the final :end via an option passed to HTTPStream.get(url, emit_end), like the version you see on GitHub.

Sum numbers in 30M lines remote file

Let's use what we've implemented to process a 125MB remote text file with 30M numbers, each one separated by a newline \n. While processing the lines on the fly, we sum the numbers to get the total.

|> HTTPStream.get()
|> HTTPStream.lines()
|> Stream.map(fn line-> 
case Integer.parse(line) do
{num, _} -> num
:error -> 0
|> Enum.sum()
|> IO.puts()

STATUS: : 200
  {"Content-Length", "131732144"},
  {"Accept-Ranges", "bytes"},
  {"Content-Type", "text/plain"},


Fantastic, we got the result: 12468816! πŸŽ‰

With the Erlang Observer sometime I've seen some memory spike (still below 100Mb) and sometime the line is almost flat. I think this could be related to how big the chunks are.

memory spike

In the GitHub repo you find a memory_test.exs script you can play with, to see the HTTPStream.line memory allocation with different chunks sizes. Even with a 4.5Mb file, if we exaggerate with the chunk size (like 2_000_000) we have a huge memory spike. With 2_000 the line is almost flat.

2mb vs 2kb chunk

It would be great to be able to set a maximum chunks' size in the HTTPoison options, unfortunately I didn't find any option to do that.


Let's see another way of writing the HTTPStream.lines(chunks) function. In the previous implementation we've used recursion to go through each single character of the chunk and to find newlines.

If we don't need to preserve newlines, we can use String.split/2 along with Stream.transform/3.

def lines(enum) do
  |> Stream.transform("",fn 
    :end, prev -> 
    chunk, prev ->
      [last_line | lines] = 
        String.split(prev <> chunk,"\n")
        |> Enum.reverse()

The idea is similar to what we did before. We split the chunk into lines and the last element of the list becomes the accumulator, which is concatenated to the next chunk.

See how we extract the last item of the list of lines.

lines = String.split(prev <> chunk, "\n")
[last_line | rev_lines] = Enum.reverse(lines)
{ Enum.reverse(rev_lines), last_line }
  • We split the concatenated string prev <> chunk obtaining a list of lines. We now need to get the last element of the list.
  • We reverse the list, creating a list of new elements. Now, the head of Enum.reverse(lines) is the last element of lines.
  • rev_lines is the list of lines we want to emit, but in the wrong order, so we emit Enum.reverse(rev_lines) and set last_line as the next accumulator.

split and extract last

Let's see an example

iex> chunks = ["767\n138\n-701\n98\n-5", "04\n22\n375"]
iex> [chunk | remaining_chunks] = chunks
iex> chunk

iex> lines = String.split(chunk,"\n")
["767", "138", "-701", "98", "-5"]
iex> [last_line | rev_lines] = Enum.reverse(lines)
["-5", "98", "-701", "138", "767"]
iex> last_line
iex> lines_to_emit = Enum.reverse(rev_lines)
["767", "138", "-701", "98"]

#let's process another chunk
iex> [next_chunk | remaining_chunks] = remaining_chunks
iex> next_chunk
# we need to prepend last_line
iex> chunk = last_line <> next_chunk
iex> lines = String.split(chunk,"\n")
["-504", "22", "375"]

It turns out that this implementation is much faster than the previous one. Let's see some benchmarking

Benchmark HTTPStream.line

If you want to run this benchmark on your computer, you find everything on poeticoding/httpstream_articles.

Let's consider the stream created by


which by default emits the lines of a file. We want to compare the speed of this function with HTTPStream.line.

Instead of using a remote file, we are going to use a smaller ~4Mb version, numbers_small.txt you can find on the GitHub repo.

We need to now find a way to simulate the stream of chunks made by HTTPStream.get.

chunk_stream = File.stream!("numbers_small.txt",[],16_000)

Passing chunks size as third argument of File.stream!/3, the stream, instead of lines, will emit chunks (in this case of 16kb).

In the script bench_lines.exs we use Benchee

# bench_lines.exs
chunk_stream = File.stream!("numbers_small.txt",[],16_000)
lines_stream = File.stream!("numbers_small.txt", [], :line)

stream_sum = fn enum ->
  |> Stream.map(fn line-> 
case Integer.parse(line) do
{num, _} -> num
:error -> 0
|> Enum.sum()

"File.stream! :line" => fn ->
|> stream_sum.()
"with next_lines" => fn ->
|> HTTPStream.lines(:next_lines)
|> stream_sum.()
"with String.split" => fn ->
|> HTTPStream.lines(:string_split)
|> stream_sum.()
  time: 10
$ mix run bench_lines.exs

Name                        ips        average
with String.split          3.35      298.30 ms
File.stream! :line          2.08      481.22 ms
with next_lines            1.14      875.01 ms

with String.split          3.35
File.stream! :line          2.08 - 1.61x slower +182.93 ms
with next_lines            1.14 - 2.93x slower +576.71 ms

The interesting thing is that the version "with String.split" is even faster than "File.stream! :line", while the first implementation we did is the slowest.

Honestly, I don't know why the version "with String.split" is the fastest one. Maybe some optimisation in the String.split/2 function? If you are interested about these details, I've opened a topic about this on the elixir forum: Streaming lines from an enum of chunks.

Reducing the chunk size from 16_000 to 2_000 we see how both "with String.split" and "with next_lines" are a bit faster

chunk_stream = File.stream!("numbers_small.txt",[],2000)

Name                        ips        average  
with String.split          3.79      263.67 ms
File.stream! :line          2.06      484.98 ms
with next_lines            1.42      706.48 ms

with String.split          3.79
File.stream! :line          2.06 - 1.84x slower +221.31 ms
with next_lines            1.42 - 2.68x slower +442.81 ms

With a smaller chunk all the split, reverse and concatenation operations are faster.

Sum the first 30 numbers

The stream of lines we've built is lazy, this means we can take just the first 30 lines and halt the stream, without downloading the whole HTTP response.

To take the first 30 lines we use Enum.take/2.

|> HTTPStream.get()
|> HTTPStream.lines()
|> Stream.map(fn line-> 
case Integer.parse(line) do
{num, _} -> num
:error -> 0

|> Enum.take(30)

|> Enum.sum()
|> IO.puts()

You find this code in sum_30_lines.exs

$ mix run sum_30_lines.exs
STATUS: : 200
  {"Content-Length", "131732144"},
  {"Accept-Ranges", "bytes"},
  {"Content-Type", "text/plain"},


It should be really quick. Once took 30 lines, the stream is halted and the HTTP connection is closed.

Are you passionate about Elixir and Phoenix? Subscribe to the Poeticoding newsletter and join happy regular readers and receive new posts by email.

Posted on by:

alvisesus profile

Alvise Susmel


CTO in a London-based Hedge Fund. Loves learning new technologies, paradigms and architectures.


Editor guide