DEV Community

Cover image for A Data Pipeline for 1 million movies and 10 million streaming links
Alper Ortac
Alper Ortac

Posted on

A Data Pipeline for 1 million movies and 10 million streaming links

Feb 2023: I wanted to see all scores for movies + tv shows and where to stream them on one page but couldn't find an aggregator that included all sources that were relevant for me.

Mar 2023: So, I built an MVP that grabbed scores on the fly and put the site online. It worked, but was slow (10 seconds to display scores).

Oct 2023: Realizing that storing data on my side is a necessity, I discovered windmill.dev. It eclipses similar orchestration engines easily - at least for my needs.


Fast forward to today and after 12 months of continuous data munching, I want to share how the pipeline works in detail. You'll learn how to build a complex system that grabs data from many different sources, normalizes data and combines it into an optimized format for querying.

Pics or didn't happen!

GoodWatch Flow Runs on Windmill

This is the Runs view. Every dot represents a flow run. A flow can be anything, for example a simple one-step script:

GoodWatch Flow to fetch Daily Data Dump from TMDB

The block in the center contains a script like this (simplified):

def main():
    return tmdb_extract_daily_dump_data()

def tmdb_extract_daily_dump_data():
    print("Checking TMDB for latest daily dumps")
    init_mongodb()

    daily_dump_infos = get_daily_dump_infos()
    for daily_dump_info in daily_dump_infos:
        download_zip_and_store_in_db(daily_dump_info)

    close_mongodb()
    return [info.to_mongo() for info in daily_dump_infos]

[...]
Enter fullscreen mode Exit fullscreen mode

The following beast is also a flow (remember, this is only one of the green dots):

GoodWatch Priority Flow to fetch and store all related data to a movie or show

(higher resolution image: https://i.imgur.com/LGhTUGG.png)

Let's break this one down:

  1. Get the next prioritized movie or tv show (see next section)
  2. Get up-to-date data from TMDB
  3. Scrape IMDb, Metacritic and Rotten Tomatoes for current scores
  4. Scrape TV Tropes for... tropes
  5. Huggingface API to gather DNA data (will explain below)
  6. Store high dimensional vectors for DNA data
  7. Store relational data for movies, shows and streaming links

Each of those steps are more or less complex and involve using async processes.

Where do you start? Priority Queue

To determine which titles to pick next there are two lanes that are processed in parallel. This is another area where Windmill shines. Parallelization and orchestration works flawlessly with their architecture.

The two lanes to pick the next item are:

Lane 1: Flows for each data source separately

First of all, titles that don't have any data attached will be selected for each data source. That means if the Metacritic pipeline has a movie that wasn't scraped yet, it will be selected next. This makes sure that every title was processed at least once, including new ones.

Once every title has attached data, the pipeline selects those with the least recent data.

Here is example of such a flow run, here with an error because the rate limit was hit:

Flow to grab scores from Metacritic

Windmill allows you to define retries for each step in the flow easily. In this case, the logic is to retry three times in case of errors. Unless the rate limit was hit (which is usually a different status code or error message), then we stop immediately.

Lane 2: Priority Flow for each movie/show separately

The above works, but has a serious issue: recent releases are not updated timely enough. It can take weeks or even months until every data aspect has been successfully fetched. For example, it can happen that a movie has a recent IMDb score, but the other scores are outdated and the streaming links are missing completely. Especially for scores and streaming availability I wanted to achieve a much better accuracy.

To solve this problem, the second lane focuses on a different prioritization strategy: The most popular and trending movies/shows are selected for a complete data refresh across all data sources. I showed this flow before, it's the one I referred to as beast earlier.

Titles that are shown more often on the app get a priority boost as well. That means that every time a movie or show is coming up in the top search results or when their details view is opened, they will likely be refreshed soon.

Every title can only be refreshed once per week using the priority lane to ensure that we don't fetch data that likely hasn't changed in the meantime.

Are you allowed to do this? Scraping Considerations

You might ask: Is scraping legal? The act of grabbing the data is normally fine. What you do with the data needs careful consideration though. As soon as you make profit from a service that uses scraped data, you are probably violating their terms and conditions. (see The Legal Landscape of Web Scraping and ‘Scraping’ Is Just Automated Access, and Everyone Does It)

Scraping and related laws are new and often untested and there is a lot of legal gray area. I'm determined to cite every source accordingly, respect rate limits and avoid unnecessary requests to minimize impact on their services.

Fact is, the data will not be used to make profit. GoodWatch will be free to use for everyone forever.

More Work? Yes, Milord

Windmill uses workers to distribute code execution across multiple processes. Each step in a flow is sent to a worker, which makes them independent from actual business logic. Only the main app orchestrates the jobs, whereas workers only receive input data, code to execute and return the result.

It's an efficient architecture that scales nicely. Currently, there are 12 workers splitting the work. They're all hosted on Hetzner.

Each worker has a maximum resource consumption of 1 vCPU and 2 GB of RAM. Here is an overview:

Workers Dashboard

Windmill Editor

Windmill offers an in-browser IDE-like editor experience with linting, auto-formatting, an AI assistant and even collaborative editing (last one is a paid feature). The best thing is this button though:

Windmill Script Editor with highlighted Test Button

It allows me to quickly iterate and test scripts before deploying them. I usually edit and test files in the browser and push them to git when I'm finished.

Only thing that's missing for an optimal coding environment are debugging tools (breakpoints & variable context). Currently, I'm debugging scripts in my local IDE to overcome this weakness.

Numbers. I like Numbers

Me too!

Currently GoodWatch requires around 100 GB of persistent data storage:

  • 15 GB for raw preprocessing data (MongoDB)
  • 23 GB for processed relational data (Postgres)
  • 67 GB for vector data (Postgres)

Every day 6.500 flows run through Windmill's orchestration engine. This results in a daily volume of:

  • 30.000 IMDb pages
  • 9.000 TV Tropes pages
  • 5.000 Rotten Tomatoes pages
  • 1.500 Huggingface prompts
  • 600 Metacritic pages

These numbers are fundamentally different because of different rate limit policies.

Once per day, data is cleaned up and combined into the final data format. Currently the database that powers the GoodWatch webapp stores:

  • 10 million streaming links
  • 1 million movies
  • 300k DNA values
  • 200k tv shows
  • 70k movies/shows with DNA

What's that DNA you keep talking about?

Imagine you could only distinguish movies by their genre, extremely limiting right?

That's why I started the DNA project. It allows categorizing movies and shows by other attributes like Mood, Plot Elements, Character Types, Dialog or Key Props.

Here are the top 10 of all DNA values over all items:

Top 10 DNA values on GoodWatch

It allows two things:

  1. Filter by DNA values (using relational data)
  2. Search by similarity (using vector data)

Examples:

There will be dedicated blog post about the DNA with many more details in the future.

Deeper Dive into the Data Pipeline

To fully understand how the data pipeline works, here is a breakdown what happens for each data source:

1. Once a day, a MongoDB collection is updated with all required input data

For each data source there is an ìnit flow that prepares a MongoDB collection with all required data. For IMDb, that's just the imdb_id. For Rotten Tomatoes, the title and release_year are required. That's because the ID is unknown and we need to guess the correct URL based on the name.

2. Continuously fetch data and write it into the MongoDB collection

Based on the priority selection explained above, items in the prepared collections are updated with the data that is fetched. Each data source has their own collection which gets more and more complete over time.

3. Once a day, various flows collect the data from the MongoDB collections and write them into Postgres

There is a flow for movies, one for tv shows and another one for streaming links. They collect all necessary data from various collections and store them in their respective Postgres tables, which are then queried by the web application.

Here is an excerpt of the copy movies flow and script:

Copy movies script

Some of these flows take a long time to execute, sometimes even longer than 6 hours. This can be optimized by flagging all items that were updated and only copying those instead of batch processing the whole data set. One of many TODO items on my list 😅

Scheduling

Scheduling is as easy as defining cron expressions for each flow or script that needs to be executed automatically:

Schedule definition for Priority flow

Here is an excerpt of all schedules that are defined for GoodWatch:

Schedules Overview

In total there are around 50 schedules defined.

Challenges

With great data comes great responsibility. Lots can go wrong. And it did.

Very slow processing

Early versions of my scripts were taking ages to update all entries in a collection or table. That was because I upserted every item individually. That causes a lot of overhead and slows down the process significantly.

A much better approach is to collect data to be upserted and batch the database queries. Here is an example for MongoDB:

    # Process movies in batches
    for start in range(0, total_movies, BATCH_SIZE):
        end = min(start + BATCH_SIZE, total_movies)
        print(f"Processing movies {start} to {end}")

        tmdb_movie_cursor = (
            tmdb_movie_collection
                .find({"imdb_id": {"$ne": None}})
                .skip(start)
                .limit(BATCH_SIZE)
        )

        for tmdb_movie in tmdb_movie_cursor:
            operation = build_operation(tmdb_entry=tmdb_movie)
            movie_operations.append(operation)

    # Do all operations in bulk
    bulk_result = collection.bulk_write(movie_operations)
Enter fullscreen mode Exit fullscreen mode

Memory hungry scripts

Even with batch processing, some scripts consumed so much memory that the workers crashed. The solution was to carefully fine-tune the batch size for every use case.

Some batches are fine to run in steps of 5000, others store much more data in memory and run better with 500.

Windmill has a great feature to observe the memory while a script is running:

Script run memory display

Key Takeaways

Windmill is a great asset in any developer's toolkit for automating tasks. It's been an invaluable productivity booster for me, allowing me to focus on the flow structure and business logic while outsourcing the heavy lifting of task orchestration, error handling, retries and caching.

Handling large volumes of data is still challenging, and optimizing the pipeline is an ongoing process - but I'm really happy with how everything has turned out so far.

Okay, okay. That's enough

Thought so. Just let me link a few resources and we're finished:

Did you know that GoodWatch is open-source? You can take a look at all scripts and flow definitions in this repository: https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f

Let me know if you have any questions.


This is a series of blog posts:

  1. What do you want to watch next? This is why I built GoodWatch.
  2. A Data Pipeline for 1 million movies and 10 million streaming links (you are here)

Top comments (0)