If you need to bring a lot of data into your app, a data pipeline can help. This article describes how to build a simple data pipeline with Ruby on Rails, Sidekiq, Redis, and Postgres.
What's a data pipeline?
A data pipeline takes information from one place and puts it somewhere else. It likely transforms that data along the way. In enterprise circles, this is called ETL, or extract-transform-load.
Generally, data pipelines:
- Ingest a lot of data at once
- Ingest new or updated data on a schedule
Splitting data into small jobs
A reliable pipeline starts big and reduces data into small slices that can be independently processed. Bigger jobs "fan out" into small ones. The smallest jobs, handling one record each, do most of the work.
Breaking work into smaller jobs helps handle failure. If the pipeline ingests 1,000,000 records, chances are that a few of them are invalid. If those invalid records are processing in their own jobs, then they can fail without impacting other records.
Lobby Focus, a project of mine, ingests United States lobbying data from the U.S. Senate. There are about 1,000,000 lobbying records from 2010-2020. Processing them all in a single job would be risky, as the entire job would fail if even one record triggered an error. Instead, Lobby Focus breaks down data into small slices and ends up processing each record in its own job. To bring in 1,000,000 lobbying records, it takes 1,001,041 jobs.
Pipelines can handle a ton of jobs with ease. Splitting up work into many small jobs offers advantages:
- Most jobs deal with only one record. If a job fails, it's easy to pin down the problematic record, fix it, and retry it. The rest of the pipeline continues uninterrupted.
- The pipeline can ingest data in parallel, with many small jobs running at once.
- You can track progress by looking at how many jobs succeeded and how many remain.
A Ruby on Rails pipeline stack
This stack, used in the example below, makes for a solid data pipeline:
- Ruby on Rails interfaces with all the pieces below and has a strong ecosystem of libraries.
- Sidekiq manages data pipeline jobs, providing visibility into progress and errors.
- Redis backs Sidekiq and stores the job queue.
- Postgres stores intermediate and final data.
- Amazon S3 serves as a staging area for source data.
Managed platforms like Heroku or Cloud 66 are good hosting options for this stack.
Real-world example: U.S. Senate lobbying data
Lobby Focus ingests lobbying data from the U.S. Senate and makes it accessible to users. Here's a peek into its data pipeline, which breaks data into small jobs and uses the stack above.
Step 1: Scrape
1 Sidekiq job deals with 1,000,000 records
- Gathers a list of ZIP files from the U.S. Senate, using HTTParty to retrieve the webpage and Nokogiri to parse it
- Launches a download job for each ZIP file. To ingest lobbying data from 2010 to 2020, it will launch 40 download jobs for 40 ZIP's.
Step 2: Download
40 Sidekiq jobs deal with 25,000 records each
Parameter: ZIP file URL
- Checks the last-modified header to see if the current version was already processed. If so, aborts the job.
- Downloads the ZIP and unzips its contents into Amazon S3, using rubyzip and aws-s3-sdk.
- Launches a process job for each XML file in the ZIP.
Step 3: Process
1,000 Sidekiq jobs deal with 1,000 records each
Parameter: XML file URL (S3)
- Parses the XML with Nokogiri
- Adds the content of each record in the XML file to Postgres, as a raw XML string.
- Launches a load job for each record in the XML file, passing the Postgres record ID.
Postgres can bear a high volume of record inserts. The pipeline uses it as an intermediate data store. The next job, load, receives the Postgres record ID and accesses it to get the XML string. Using Postgres to pass the XML avoids passing giant parameters between jobs. Sidekiq recommends small job parameters to keep the queue lean and preserve Redis memory.
Step 4: Load
1,000,000 Sidekiq jobs deal with 1 record each
Parameter: Postgres record ID
- Retrieves the XML string from Postgres and parses it with Nokogiri
- Validates data and aborts the job with an error if there is an issue
- Upserts final data record into Postgres
Breaking out data loads into small jobs is key to a resilient pipeline. Here are some more tips:
Make jobs idempotent
Any job should be able to run twice without duplicating data. In other words, jobs should be idempotent.
Look for a unique key in the source data and use it to avoid writing duplicate data.
For instance, you can set a Postgres unique index on the key. Then, use Rails' create_or_find_by method to create or update a record, depending on whether the key already exists.
Use constraints to reject invalid data
Set constraints to stop invalid data from coming in. If a field should always have a value, use a Postgres constraint to require that field. Use Ruby's strict conversion methods to throw an error and abort a job if you're expecting one data type (say, an integer like 3) but get another (say, a string like "n/a").
Data is never perfect. If you catch invalid data errors early, you can adjust your pipeline to handle unexpected values.
What to read next
A job management library like Sidekiq is critical to a data pipeline. Read more about Sidekiq, the backbone of this example. Its best practices docs are small and useful. If you're not using Ruby, find the top job management libraries for your language.
To practice creating a pipeline, find a public dataset that interests you and build a pipeline for it. The United States Government, for instance, lists its open government data on data.gov. Datasets range from physician comparison data and a database of every dam in the U.S.
Finally, check out managed solutions like Google Dataflow for particularly large pipelines.
Have questions or want advice? Send me a tweet @coreybeep
Originally posted on my blog
Photo by Victor Garcia on Unsplash
Top comments (0)