DEV Community

Cover image for Rust: CSV processing
Daniele Frasca for AWS Community Builders

Posted on • Edited on • Originally published at dfrasca.hashnode.dev

Rust: CSV processing

This blog post is about CSV processing, and it sounds boring, but I want to share my experience on how I read and write a CSV file of almost 1 GB in just a few seconds.

As a part of the series Serverless Rust, you can check out the other parts:

Part 1 and 2 describe how to set up Rust and VsCode.

Part 3 shows how to process in parallel AWS SQS messages.

Part 4 shows how to execute AWS Step Function for each AWS SQS message received.

Part 5 shows how to inject configuration with AWS AppConfig.

Problem

I want to read IMDb Datasets and process the title.basics.tsv.gz so that I can play with Amazon Neptune.

I will not discuss Amazon Neptune in this blog post but focus on CSV processing.

Because the files of this dataset are massive, I already knew from past experiences in different languages that it could be a problem. So I asked for advice from Nicolas Moutschen, leveraging his experience with Rust. Nicolas pointed me straight away to the libraries and some methods to use.

CSV

I use CSV crate.
There is excellent documentation around, and you can find a few links here:

Processing - take 1

As I said, I started following Nicolas Moutschen suggestions for reading and processing data from streams, and Nicolas was kind to show me this.

I needed to change the original CSV to something that I could use with Amazon Neptune, so I replaced it:

println!("{}", String::from_utf8_lossy(&buffer[..len]));
Enter fullscreen mode Exit fullscreen mode

With the necessary code needed to write a new CSV.

Cargo.toml dependencies:

[dependencies]
async-compression = { version = "0.3.12", features = ["all", "tokio"] }
csv = "1.1.6"
hyper = { version = "0.14", features = ["full"] }
hyper-tls = "0.5.0"
serde =  { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.8"
tokio-util = { version = "0.6.9", features = ["full"] } 
Enter fullscreen mode Exit fullscreen mode

The complete code is the following:

use csv;
use hyper::Client;
use hyper_tls::HttpsConnector;
use std::io;
use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
const LINK: &str = "https://datasets.imdbws.com/title.basics.tsv.gz";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, hyper::Body>(https);
    let res = client.get(LINK.parse()?).await?;
    let body = res
        .into_body()
        .map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
    let body = StreamReader::new(body);
    let mut decoder = async_compression::tokio::bufread::GzipDecoder::new(body);
    let mut buffer = [0; 1024];
    let mut wtr = csv::Writer::from_path("./export/title.csv")?;
    wtr.write_record(&[
        "~id",
        "~label",
        "titleType",
        "primaryTitle",
        "originalTitle",
        "isAdult",
        "startYear",
        "endYear",
        "runtimeMinutes",
        "genres",
    ])?;
    loop {
        let len = decoder.read(&mut buffer).await?;
        if len == 0 {
            break;
        }
        let line = String::from_utf8_lossy(&buffer[..len]);
        let line: Vec<&str> = line.split("\t").collect();
        wtr.write_record(&[
            line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],
            line[8],
        ])?;
    }
     wtr.flush()?;
    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

This part of the code (please do not mind the quality) :

  let line = String::from_utf8_lossy(&buffer[..len]);
        let line: Vec<&str> = line.split("\t").collect();
        wtr.write_record(&[
            line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],
            line[8],
        ])?;
Enter fullscreen mode Exit fullscreen mode

This code has a problem when I convert the bytes to strings.
Because of the CSV formatting and the buffer size, the rows with this conversion were mixed.
If, for example, we assume that the original CSV is made of:

tconst  titleType   primaryTitle    originalTitle   isAdult startYear   endYear runtimeMinutes  genres
tt0000001   short   Carmencita  Carmencita  0   1894    \N  1   Documentary,Short
tt0000002   short   Le clown et ses chiens  Le clown et ses chiens  0   1892    \N  5   Animation,Short
tt0000003   short   Pauvre Pierrot  Pauvre Pierrot  0   1892    \N  4   Animation,Comedy,Romance
tt0000004   short   Un bon bock Un bon bock 0   1892    \N  12  Animation,Short
tt0000005   short   Blacksmith Scene    Blacksmith Scene    0   1893    \N  1   Comedy,Short
tt0000006   short   Chinese Opium Den   Chinese Opium Den   0   1894    \N  1   Short
tt0000007   short   Corbett and Courtney Before the Kinetograph Corbett and Courtney Before the Kinetograph 0   1894    \N  1   Short,Sport
Enter fullscreen mode Exit fullscreen mode

A line might be:

//get the trailer of the new row
tt0000001   short   Carmencita  Carmencita  0   1894    \N  1   Documentary,Short
tt0000002

or
//incomplete
Courtney Before the Kinetograph
Enter fullscreen mode Exit fullscreen mode

The problem is how I used the libraries or missed something, but I could not find a solution, so I moved on.

Processing - take 2

I tried to force the stream into the CSV ReaderBuilder.

loop {
        let len = decoder.read(&mut buffer).await?;
        if len == 0 {
            break;
        }

        let mut rdr = csv::ReaderBuilder::new()
            .has_headers(true)
            .delimiter(b'\t')
            .flexible(true)
            .from_reader(&buffer[..len]);

        for result in rdr.records() {
             let record = result?;
             wtr.serialize(Record {
                    id: record[0].to_string(),
                    label: "movies".to_string(),
                    title_type: record[1].to_string(),
                    primary_title: record[2].to_string(),
                    original_title: record[3].to_string(),
                    is_adult: record[4].to_string().to_bool(),
                    start_year: record[5].parse::<u16>().unwrap_or_default(),
                    end_year: record[6].parse::<u16>().unwrap_or_default(),
                    runtime_minutes: record[7].parse::<u16>().unwrap_or_default(),
                    genres: record[8].to_string(),
                })?;
        }
    }
Enter fullscreen mode Exit fullscreen mode

But the problem was the same incomplete data, but with some conditions around. I made it work, but I was unhappy because the code needed to be more attractive, so I decided to move on.

Processing - final take...for now

I have downloaded the compressed file and have the CSV in a folder for this part of the code.

use csv;
use serde::{Deserialize, Deserializer, Serialize};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut wtr = csv::WriterBuilder::default()
        .has_headers(false)
        .from_path("./export/title.csv")?;

    wtr.write_record(&[
        "~id",
        "~label",
        "titleType",
        "primaryTitle",
        "originalTitle",
        "isAdult",
        "startYear",
        "endYear",
        "runtimeMinutes",
        "genres",
    ])?;

    let mut rdr = csv::ReaderBuilder::new()
        .has_headers(true)
        .delimiter(b'\t')
        .double_quote(false)
        .escape(Some(b'\\'))
        .flexible(true)
        //.comment(Some(b'#'))
        .from_path("./import/title.basics.tsv")?;

    for result in rdr.deserialize() {
        let record: Record = result?;
        wtr.serialize(record)?;
    }

    wtr.flush()?;

    Ok(())
}

#[derive(Debug, Deserialize, Serialize)]
struct Record {
    #[serde(alias = "tconst")]
    id: String,

    #[serde(default = "default_label")]
    label: String,

    #[serde(alias = "titleType")]
    title_type: String,

    #[serde(alias = "primaryTitle")]
    primary_title: String,

    #[serde(alias = "originalTitle")]
    original_title: String,

    #[serde(alias = "isAdult")]
    #[serde(deserialize_with = "bool_from_string")]
    is_adult: bool,

    #[serde(alias = "startYear")]
    #[serde(deserialize_with = "csv::invalid_option")]
    start_year: Option<u16>,

    #[serde(alias = "endYear")]
    #[serde(deserialize_with = "csv::invalid_option")]
    end_year: Option<u16>,

    #[serde(alias = "runtimeMinutes")]
    #[serde(deserialize_with = "csv::invalid_option")]
    runtime_minutes: Option<u16>,

    #[serde(alias = "genres")]
    #[serde(deserialize_with = "csv::invalid_option")]
    genres: Option<String>,
}

fn default_label() -> String {
    "movies".to_string()
}

/// Deserialize bool from String with custom value mapping
fn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
    D: Deserializer<'de>,
{
    match String::deserialize(deserializer)?.as_ref() {
        "1" => Ok(true),
        "0" => Ok(false),
        _ => Ok(false),
    }
}

Enter fullscreen mode Exit fullscreen mode

I decide to use csv::ReaderBuilder to read the CSV file.
To read this CSV data, I set the following:

  • Enable headers. This should skip the first line.
  • Change the delimiter from "," to "tabs".
  • Escape the backslash.
  • Permit flexible length records since some are in a strange format.

Instead of dealing with arbitrary records, I use Serde to deserialize records with specific types. For example, I have applied Serde annotation to attributes to map the original CSV to mine.

#[derive(Debug, Deserialize, Serialize)]
struct Record {
    #[serde(alias = "tconst")]
    id: String,
  .....
Enter fullscreen mode Exit fullscreen mode

Because the "isAdult" column in the original CSV is in the form of "1" and "0" and not pure boolean, I need to convert them, so I wrote an extension:

/// Deserialize bool from String with custom value mapping
fn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
    D: Deserializer<'de>,
{
    match String::deserialize(deserializer)?.as_ref() {
        "1" => Ok(true),
        "0" => Ok(false),
        _ => Ok(false),
    }
}
Enter fullscreen mode Exit fullscreen mode

Finally, because other data is not mandatory in the original CSV and it will appear as /N, I must handle the wrong deserialization when the target is in a different type.

For this, I can use csv::invalid_option:

#[serde(deserialize_with = "csv::invalid_option")]
Enter fullscreen mode Exit fullscreen mode

It will tell Serde to convert any deserialization errors on this field to a None value.

The outcome of the serialization will be a CSV with empty values (note the ",,")

~id,~label,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt0000001,movies,short,Carmencita,Carmencita,false,1894,,1,"Documentary,Short"
Enter fullscreen mode Exit fullscreen mode

Conclusion

Currently, AWS Lambda has 512-MB temporary storage, so this use case will not fit because of the file size. However, if we have more significant temporary storage one day, we could run it inside the AWS Lambda.

One alternative is to use Amazon EFS, a fully managed, flexible, shared file system designed to be consumed by other AWS services. It was announced on Jun 16, 2020. AWS Lambda will automatically mount the file system and provide a local path to read and write data. If you want to read more, there is an excellent article here.

Another alternative is to use AWS Batch with spot instances in conjunction with AWS Step Functions leveraging the service integration Run a Job (.sync) pattern. After calling AWS Batch submitJob, the workflow pauses. When the job is complete, Step Functions progresses to the next state.

The CSV crate does a fantastic job, and it is unbelievably faster. I can run this script on my computer in release mode and process it all in around 10 seconds.

Top comments (2)

Collapse
 
svgatorapp profile image
SVGator

Following Nicolas Moutschen! Great post!

Collapse
 
ymwjbxxq profile image
Daniele Frasca

Nicolas is the best