DEV Community

Cover image for How to Build a Memory-efficient Elixir App with Streams
Tracey Onim for AppSignal

Posted on • Originally published at blog.appsignal.com

How to Build a Memory-efficient Elixir App with Streams

We have all encountered collections of data at some point when working on Elixir applications. These collections are very handy for storing, retrieving, and manipulating data using different data structures, making them very efficient in managing clean code.

In this article, we'll go through the following:

  • What are collections in Elixir?
  • The problems likely to be faced when working with collections
  • The greedy approach when working with large datasets
  • The enumerable in Elixir, used to work around collections

Finally, we'll explore how to build a memory-efficient Elixir application using the lazy processing approach with streams.

What are Collections in Elixir?

You can view a collection as a container that groups multiple things/objects/elements into a single unit. This collection allows you to store, retrieve, and manipulate data.

A couple of examples of collections encountered in the real world include:

  • A mail folder that contains received and sent letters.
  • A phone directory mapping phone numbers to corresponding names.

In Elixir, List, Maps, Ranges, files, and tuples are the most common types of collections. Some of these collections can be iterated through. Each element is passed through once, in order, so we can term them enumerables. Things that can be iterated in Elixir implement the Enumerable protocol. List, Maps, and Ranges are the most common datatypes used as enumerables.

Elixir has two modules with iteration functions that use the Enumerable protocol: Enum and Stream.

The Challenges of Collections

Working with collections can be very efficient, especially when your collections aren't that large. However, manipulating these collections can be very difficult when dealing with large datasets.

For example, let's say we want to retrieve organization data from a CSV file and display it in our application. Ideally, we have to load all the organizations from the CSV file into an application's memory and transform the retrieved data into the data we want.

Our application can be safe if the memory contains a predictable number of organizations that can be easily transformed or manipulated. However, if we have extensive data on organizations retrieved from the CSV, managing this data can result in slow application performance, because more memory is needed to consume it.

So, how can we elegantly manage our large CSV file? (keeping in mind some of the problems we might face while processing it).

We can process our CSV file using one of two approaches:

  1. The greedy approach that consumes more memory.
  2. The memory-efficient lazy processing approach.

Earlier, we mentioned that Elixir has two modules, Enum and Stream, with iteration functions. We'll use these modules to process our CSV file.

The Greedy Approach For Our Elixir Application

In the greedy approach, we are going to use the Enum module because:

  1. When given a collection, it will consume all the content of that collection. That means the whole collection should be loaded into memory before being processed.
  2. The returned result is another collection that will be loaded into memory, which might be inefficient if data is not needed at that time.

Let's process our CSV file greedily.

We shall use an organization's CSV file with 2,000,000 records downloaded from here for demonstration purposes. The size of the CSV file (after unzipping) is around 283.4MB.

Our focus will be to get the records of all organizations and transform them into data we can easily display in the browser. The result should look like this:

[
%{
index: "1",
organization_id: "1234",
name: "PLC",
website: "https://park.com/",
country: "Kenya",
description: "Focused for the best",
founded: "2016",
industry: "sports",
num_of_employees: 300

},..
]
Enter fullscreen mode Exit fullscreen mode

Before we begin, let's open the interactive shell to manipulate the CSV file. After that, start the Erlang Observer application by running:

iex > :observer.start
Enter fullscreen mode Exit fullscreen mode

We shall use the Observer application to inspect memory allocators when processing the CSV file.

Step 1: Loading the CSV File Into Memory

Use File.read!("organization.csv") to load the whole CSV file into memory. The observer shows an extra 283.84 MB allocated to memory.

file opening

iex > org_csv = File.read!("organization.csv")


    "Index,Organization Id,Name,Website,Country,Description,Founded,Industry,Number of employees\r\n1,391dAA77fea9EC1,Daniel-Mcmahon,https://stuart-rios.biz/,Cambodia,Focused eco-centric help-desk,2013,Sports,1878\r\n2,9FcCA4A23e6BcfA, Mcdowell,http://jacobs.biz/,Guyana,Front-line real-time portal,2018,Legal Services,9743\r\n3,DB23330238B7B3D,\"Roberts, Carson and Trujillo\",http://www.park.com/,Jordan,Innovative hybrid data-warehouse,1992,Hospitality,7537\r\n4,bbf18835CFbEee7,\"Poole, Jefferson and Merritt\",http://hayden.com/,Cocos (Keeling) Islands,Extended regional Graphic Interface,1991,Food Production,9974\r\n5,74ECD725ceaDfd9,\"Ritter, Patel and Cisneros\",https://www.mason-blackwell.info/,Ecuador,Re-contextualized actuating website,2019,Computer Networking,5050\r\n6,ea42e2FECDAdF0c,Stafford Ltd,http://www.fuller.biz/,Qatar,Multi-channeled optimizing customer loyalty,1979,Aviation / Aerospace,7506\r\n7,b8EE5AE2Df8BA46,Roach Ltd,https://www.oconnell.com/,Guatemala,Switchable explicit complexity,2020,Hospitality,109\r\n8,21829Cf5a968024,Gill PLC,https://www.berry.com/,Lesotho,Future-proofed systemic pricing structure,1983,Printing,1822\r\n9,f9C605c034f47cD,Summers-Jordan,https://le.net/,Luxembourg,Cross-platform bottom-line system engine,1976,Primary / Secondary Education,1603\r\n10,0fBa55ecDA6cb4B,\"Richard, Lane and Weaver\",https://bauer-hardy.com/,United States Minor Outlying Islands,Optimized system-worthy complexity,2006,Building Materials,3505\r\n11,d085befF19Be6fB,\"Harrington, Sutton and Wilkins\",http://henson.com/,Guinea-Bissau,Diverse scalable instruction set,1985,Airlines / Aviation,1926\r\n12,13B77fAF602E949,Evans LLC,http://harrington-powers.com/,Cook Islands,Re-contextualized stable flexibility,2017,Design,3338\r\n13,90234C8d0D7eB75,\"Patterson, Deleon and Donovan\",https://thornton.net/,Holy See (Vatican City State),Versatile encompassing migration,1970,Automotive,9312\r\n14,155A7db5D8Cce47,\"Carlson, Snyder and Holland\",http://foley.com/,Korea,Devolved optimal secured line,1979,International Trade / Development,2649\r\n15,77B7F7fb1Ac2A44,Duffy-Stark,https://www.morales.com/,Monaco,Re-contextualized 24/7 Graphical User Interface,1990,Aviation / Aerospace,1714\r\n16,1f6bABBA98cd33E,\"Frederick, Fry and Poole\",http://www.madden.org/,Vanuatu,Synchronized empowering structure,2008,Fishery,1862\r\n17,05dbf87Ee09b6BC,\"Walton, Proctor and Peters\",http://www.casey-bell.com/,Grenada,Diverse local collaboration,1979,Alternative Medicine,4678\r\n18,20ef50A2fdB3993,Oneal and Sons,http://munoz.org/,Fiji,Operative uniform definition,1987,Religious Institutions,8332\r\n19,a9C703D1A2B074C,Maynard-Bush,http://leblanc.com/,Senegal,Customizable zero tolerance functionalities,2011,Apparel / Fashion,1668\r\n20,1ea33Ac2face255,\"Rosario, Martin and White\",https://www.mueller-rhodes.net/,Isle of Man,Synchronized 6thgeneration flexibility,1994,Music,5134\r\n21,CDCAe4a6DA0eC6d,Brady Ltd,http://montgomery.biz/,Guyana,Self-enabling modular archive,1989,Professional Training,5153\r\n22,f41b9Ce917bDD28,Odom Group,http://www.riley-dalton.net/,Saint Kitts and Nevis,Grass-roots bandwidth-monitored projection,2003,Computer Hardware,4793\r\n23,B1181c8C3d43216,Stanton-Arroyo,http://baird.org/,Montserrat,Integrated empowering core,1976,Mental Health Care,6919\r\n24,b3E9C4E4cbC4a8e,Stafford-Hickman,https://love.net/,United States Virgin Islands,Virtual background application,1996,Hospital / Health Care,3855\r\n25,f92AacaEEcB6eC6,Ryan and Sons,http://koch-raymond.org/,Chad,Public-key maximized definition,2017,Airlines / Aviation,1564\r\n26,8eb91E6eeDBC2A5,Wolf Group,https://aguilar-solomon.net/,Cayman Islands,Public-key value-added alliance,1984,Package / Freight Delivery,4509\r\n27,fDaeD26dB6fd79a,\"Horne, Bailey and Oconnor\",https://cruz.com/,Western Sahara,Customer-focused needs-based service-desk,1973,Wholesale,2670\r\n28,19Bbb8dB90dFaFF,Cobb Inc,http://miles.com/,Netherlands,Assimilated local ability,2008,Political Organization,2494\r\n29,00AF7EacF3fEb91,Parrish-Peterson,https://strong.org/,France,Multi-lateral encompassing structure,2004,Political Organization,9059\r\n30,8Ec27fFD6b945A9,\"Barr" <> ...
Enter fullscreen mode Exit fullscreen mode

Step 2: Create a List for Each Row In the CSV

The data in Step 1 is loaded text, which means we are still far from our end goal. In this step, let's take the loaded text data and split it into lines. This will result in a list of string elements, with each element representing a row of the CSV.

iex > [_header | rows] = org_csv |> String.split("\n")

     ["Index,Organization Id,Name,Website,Country,Description,Founded,Industry,Number of employees\r",
 "1,391dAA77fea9EC1,Daniel-Mcmahon,https://stuart-rios.biz/,Cambodia,Focused eco-centric help-desk,2013,Sports,1878\r",
 "2,9FcCA4A23e6BcfA,Mcdowell, http://jacobs.biz/,Guyana,Front-line real-time portal,2018,Legal Services,9743\r",
...]
Enter fullscreen mode Exit fullscreen mode

The observer shows an increase in allocated memory:

step 2 greedy approach

Step 3: Create a Column from Each Row

In the second step, we pattern-matched our result to [_header | rows]. We are not interested in the header of the CSV file, but in the body.

In this step, we will split each row into a list, with the elements in each list representing a column of the CSV file.

iex > org_records = rows |> Enum.map(&String.split(&1, ","))

  [
    ["1", "391dAA77fea9EC1", "Daniel-Mcmahon", "https://stuart-rios.biz/",
     "Cambodia", "Focused eco-centric help-desk", "2013", "Sports", "1878\r"],
    ["2", "9FcCA4A23e6BcfA", "Mcdowell",
     "http://jacobs.biz/", "Guyana", "Front-line real-time portal", "2018",
     "Legal Services", "9743\r"],
...]
Enter fullscreen mode Exit fullscreen mode

This step alone takes almost 60 seconds to process. The observer also shows a spike increase in memory allocation:

step 3 greedy approach

Step 4: Create a Keyword List of Each Row

Here, each column in each row will be zipped to its corresponding header name.

iex > org_records = org_records |> Enum.map(fn record ->  Enum.zip([:org_id, :name, :website, :country, :description, :founded, :industry, :number_of_employees], record) end)
    [
      [
       index: "1",
       org_id: "391dAA77fea9EC1",
       name: "Daniel-Mcmahon",
       website: "https://stuart-rios.biz/",
       country: "Cambodia",
       description: "Focused eco-centric help-desk",
       founded: "2013",
       industry: "Sports",
       number_of_employees: "1878"
      ],
      [
        index: "2",
        org_id: "9FcCA4A23e6BcfA",
        name: "Mcdowell",
        website: "http://jacobs.biz/",
        country: "Guyana",
        description: "Front-line real-time portal",
        founded: "2018",
        industry: "Legal Services",
        number_of_employees: "9743"
      ],
...]
Enter fullscreen mode Exit fullscreen mode

In this case, we use Enum.map/2 and Enum.zip/2 to process the organization record results from step 3 (a list of lists, where each list element represents a row of records). Remember that each call to the Enum module takes a collection and returns a collection. Both Enum.map/2 and Enum.zip/2 will greedily process our data, which explains the spike increase in memory allocation as seen from the observer.

step 4 greedy approach

Enum.map/2 takes in a list of lists, where each list element is a row of organization records. It iterates through each corresponding row and invokes Enum.zip/2 on each row to transform it into a keyword list.

The Enum.zip/2 function takes in two enumerables. The first enumerable is a list of header names, and the second is a list of each row. The zip function zips the corresponding elements from the list of headers and the list of each row into a list of tuples. The tuple element in the list has an atom as the first element (which is the header name and value of the corresponding header name).

Elixir represents this kind of output as a keyword list. In your interactive shell, you will notice each row is transformed into a keyword list, but not a list of tuples.

Enum.map/2 returns a new list of keyword lists.

Step 5: Convert to Map

Remember, our end goal was to work with data that's easy to manipulate when we want to display a list of organizations. In this final step, we will convert the keyword list above into maps.

iex > org_records |> Enum.map(fn data -> Map.new(data) end)

    [
      %{
        index: "1",
        org_id: "391dAA77fea9EC1",
        name: "Daniel-Mcmahon",
        website: "https://stuart-rios.biz/",
        country: "Cambodia",
        description: "Focused eco-centric help-desk",
        founded: "2013",
        industry: "Sports",
        number_of_employees: "1878"
      },
      %{
        index: "2",
        org_id: "9FcCA4A23e6BcfA",
        name: "Mcdowell",
        website: "http://jacobs.biz/",
        country: "Guyana",
        description: "Front-line real-time portal",
        founded: "2018",
        industry: "Legal Services",
        number_of_employees: "9743"
      },
...]
Enter fullscreen mode Exit fullscreen mode

Compiling everything together:

iex > [_| rows ] =
  "organization.csv"
 |>  File.read!()
 |>  String.split("\n")
 |> Enum.map(&String.split(&1, ","))


iex >
   rows
|> Enum.map(fn record ->  Enum.zip([:org_id, :name, :website, :country, :description, :founded, :industry, :number_of_employees], record) end)
|> Enum.map(fn record -> Map.new(record) end)
Enter fullscreen mode Exit fullscreen mode

Observation with Enum

  • In each Enum step, we generated an output before the final result. The intermediate result in each step is stored in memory as a full collection. That's because, with Enum implementation, the whole collection must be available for the next step in the pipeline to start processing.
  • With Enum, each step has to wait for its intermediate result to start processing, so it takes more time to process data in each step.
  • There is a spike increase in memory allocation in each step, meaning an intermediate output must be kept in memory.

The greedy approach, or working with Enum, works well with a predictable number of elements. We couldn't have easily noticed these problems in a small CSV file.

However, this approach is very inefficient when we have to keep an intermediate result in memory in each step. This is a massive waste of memory that can lead to slow application performance.

Solution: Build a Memory-efficient Elixir App with Streams

First, what are our goals?

  1. Memory allocation that's only for the final result.
  2. To process records only when we need them.

Using streams, we can do these things. Streams are known to be lazy enumerables.

This is because streams:

  • Lazily load data into memory: That is, one element at a time instead of loading everything at once.
  • Only load data when needed.
  • Instead of applying a transformation to the given enumerable, return a value with the intended specifications.
  • Process one piece of data at a time as it arrives instead of waiting for the whole collection to be available.
  • Are a composable enumerator that reads and pipes one single line at a time without needing to wait for all the passed data to be processed at every single step.

The Lazy Processing Approach with Streams in Elixir

We can leverage lazy processing to process our organization's CSV file.

The good thing is that more Elixir modules now support streams. Instead of opening the CSV file and loading the data on step 1, let's use File.stream!/3 to create a stream without necessarily opening the file.

iex > File.stream!("/org.csv")
    %File.Stream{
      line_or_bytes: :line,
      modes: [:raw, :read_ahead, :binary],
      path: "/org.csv",
      raw: true
    }
Enter fullscreen mode Exit fullscreen mode

The File.stream!/3 function returns %File.Stream{} with intended specifications.

File.stream

The observer doesn't reflect our CSV file's extra memory size, meaning the file still needs to be opened and loaded into memory.

Streams are composable enumerables. We can pipe one stream to another. We will replace the Enum functions from steps 2-4 of the greedy approach with stream functions. We'll avoid processing data and loading it into memory until we decide to run the stream (compute/process the data as intended) by passing it to a function in the Enum module.

Let's compose our first part of the pipeline with streams:

iex > header = [:index, :org_id, :name, :website, :country, :description, :founded, :industry, :number_of_employees]
iex >
"/org.csv"
|> File.Stream!()
|> Stream.drop(1)
|> Stream.map(&String.split(&1, ["\n", ","], trim: true))
|> Stream.map(&Stream.zip(header, &1))
|> Stream.map(&(Map.new(&1)))

  #Stream<[
  enum: %File.Stream{
    path:  "/org.csv",
    modes: [:raw, :read_ahead, :binary],
    line_or_bytes: :line,
    raw: true,
    node: :nonode@nohost
  },
  funs: [#Function<34.53678557/1 in Stream.drop/2>,
   #Function<48.53678557/1 in Stream.map/2>,
   #Function<48.53678557/1 in Stream.map/2>,
   #Function<48.53678557/1 in Stream.map/2>]
]>
Enter fullscreen mode Exit fullscreen mode

The composed pipeline of streams above returns a stream that only stores the intended computation instead of applying the transformation.

Here is what is happening within our stream processing pipeline:

  • Instead of opening the file, we use File.Stream/3 to create a stream.
  • Stream.drop/2 lazily drops the first element, the header.
  • Stream.map/2:
    • splits data into columns
    • lazily zips each row into its corresponding header name with Stream.zip/2
    • transforms each piece of data into a map

To process our desired result, we have to pass the stream value to a function in the Enum module.
First, let's take only the first 3 elements from our stream and observe what happens.

iex > csv
     |> File.stream!()
     |> Stream.drop(1)
     |> Stream.map(&String.split(&1, ["\n", ","], trim: true))
     |> Stream.map(&Stream.zip(header, &1))
     |> Stream.map(&(Map.new(&1)|> IO.inspect(label: "=============")))
     |> Enum.take(3)
Enter fullscreen mode Exit fullscreen mode

Enum.take/2 receives the transformed map data from the stream and only gets the first 3 elements. Once it gets the 3 elements from the stream, there is no more processing.

Running this pipeline, you will notice that:

  • The result is returned instantaneously.
  • IO.inspect only prints the first 3 elements. This is proof that the elements are being processed one at a time by subsequent calls to Enum functions.

Lazy Approach Using Streams Vs. Greedy Approach

Let's use the greedy approach to take the first 3 elements from our code:

iex >
[_| rows ] =
  csv
 |>  File.read!()
 |>  String.split("\n")
 |> Enum.map(&String.split(&1, ","))


iex >
   rows
|> Enum.map(fn record ->  Enum.zip([:org_id, :name, :website, :country, :description, :founded, :industry, :number_of_employees], record) end)
|> Enum.map(fn record -> Map.new(record) |> IO.inspect(label: "+++++++++++++++++") end)
|> Enum.take(3)
Enter fullscreen mode Exit fullscreen mode

IO.inspect prints the whole 2,000,000 records. This shows that with Enum, we have to wait for the whole collection to be available upfront before starting the next processing batch. Using this approach, an intermediate list of 2,000,000 records has to be kept in memory at each step (even if, in the end, we only get the first 3 records using Enum.take/2).

Unlike the greedy approach, using the lazy approach, there is no need to process the whole collection for the next process to take place. By passing Enum.take/2 to fetch the first 3 elements, we can see there isn't even a peak in memory.

With streams, even if we process 2,000,000 records, we can decide the amount of records to load in memory and ensure that the amount of utilized memory stores the final result. This is not the same case with Enum, since in each step there is a certain amount of records kept in memory.

We can also compare the greedy and lazy approaches by monitoring memory allocators using the Erlang observer.

Remember, in the greedy approach, we processed all records. We will do the same with our lazy approach by passing the stream pipeline to Enum.to_list/1.

iex > csv
     |> File.stream!()
     |> Stream.drop(1)
     |> Stream.map(&String.split(&1, ["\n", ","], trim: true))
     |> Stream.map(&Stream.zip(header, &1))
     |> Stream.map(&(Map.new(&1)|> IO.inspect(label: "=============")))
     |> Enum.to_list()
Enter fullscreen mode Exit fullscreen mode

Here's the memory allocated to the lazy approach:

Lazy Approach

And to the greedy approach:

step 4 greedy approach

Wrapping Up

In this post, we first took a look at collections in Elixir and some of their associated challenges. We then explored two methods of processing large datasets: the greedy approach and the lazy approach using streams.

We've seen how working with streams is one of the best approaches to help create a memory-efficient Elixir application.

Happy coding!

P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!

Top comments (0)