DEV Community

Cover image for Pipeline orchestration with Mage: connecting to postgres.
Cris Crawford
Cris Crawford

Posted on

Pipeline orchestration with Mage: connecting to postgres.

The first time I took the free data engineering zoomcamp from DataTalksClub, we used airflow to orchestrate the data transformation. The second time, we used Prefect. This time, we're using Mage, a free open-source orchestrator. I learned that a good orchestrator handles:

  • Workflow management
  • Automation
  • Error handling and recovery
  • Monitoring and alerting
  • Resource optimization
  • Observability
  • Debugging
  • Compliance and Auditing

Mage is user-friendly. I'm only starting out learning to use it, and I can already see this. It has a variety of block templates that come pre-coded with information about what I have to do to make them run, accessible through a reasonable user interface. It did hang at one point, and I had to start over from scratch, but that was probably due to something I did.

Mage is organized by projects, which contain pipelines, which contain blocks. Blocks are functions that are strung together and can be reused. The main blocks are Load data, Transform data, and Export data. We read a compressed .csv file of taxi trip data into pandas (Load data), removed the data that had no passengers (Transform data), and wrote the file to a local Postgres database (Export data).

All of this was run on a VM instance on Google cloud. You can read about how to set that up in "Provision the VM instance on GCP" First I downloaded Mage files from the repo. I ran git clone https://github.com/mage-ai/mage-zoomcamp.git mage-zoomcamp. Since git does not store files named .env, I had to copy dev.env to .env. The file .env contains things like passwords and should not be pushed to a git repo. It should be included in .gitignore.

Next I ran docker-compose build and then docker-compose up. That started the services that were set up in the docker-compose.yml file. (If we get a message to update Mage, which happens often, we should run docker pull mageai/mageai:latest.)

I had a brief demo of a pipeline, "example_pipeline". I opened it up by choosing "pipelines" from the left side menu and then chose "example_pipeline" and edited it (also on the left side menu). It had three blocks, one of which was a python file that pulled a csv file from the web and read it into a pandas dataframe. This was the "data loader" block. There was a transformer block, and a data exporter block. I could run these one at a time, or I could choose "Execute with all upstream blocks" from the dot menu on the upper right.

In the next video, I tested the connection to postgres. The docker-compose.yml file contains two services - "Magic" and postgres. The postgres service uses the environment variables set up in .env - username, password, port, etc. In order to verify that I could connect to postgres, I went to the Mage GUI on localhost:6789 and selected "Files" from the menu on the left. (You need to have opened VSCode in the VM instance and mapped this port first.) I opened the io_config.yaml and added a new profile, "dev". In there, I cut and pasted the postgres definitions from the .default profile and edited them to take the variables from the .env file. I used a jinja format to define the variables.

dev:
  POSTGRES_CONNECT_TIMEOUT: 10
  POSTGRES_DBNAME: "{{ env_var('POSTGRES_DBNAME') }}"
  POSTGRES_SCHEMA: "{{ env_var('POSTGRES_SCHEMA') }}"
  POSTGRES_USER: "{{ env_var('POSTGRES_USER') }}"
  POSTGRES_PASSWORD: "{{ env_var('POSTGRES_PASSWORD') }}"
  POSTGRES_HOST: "{{ env_var('POSTGRES_HOST') }}"
  POSTGRES_PORT: "{{ env_var('POSTGRES_PORT') }}"
Enter fullscreen mode Exit fullscreen mode

I navigated to pipeline and created a standard batch pipeline. I selected "Edit->Pipeline Settings" from the tab menu and changed the pipeline name to "test_config". I clicked "Save pipeline settings" at the bottom. Then back to the pipeline, I made a "Load data" block and selected "SQL" as the data type. Within the block, I set configuration to "PostgreSQL", selected "dev" as the profile, and checked the "Use raw SQL" box.

Then I typed SELECT 1 as the contents of the Load data block and ran the block. I was able to see a message from Postgres, indicating that I connected successfully. Note: I originally left the single quotes out of the dev schema above, and had to ask about it on the Slack channel. I wasn't sidetracked for very long.

Using a pipeline to load a csv file into postgres

I used Mage to read a compressed .csv file of New York taxi data into pandas, remove the lines that had zero passengers, and then write the file into a local Postgres database.

I started by creating a new standard batch pipeline. I renamed it "api_to_postgres", and saved it. Back to edit pipeline view. I created a new Data loader block in python, and used an API template. I renamed it "load_api_data". Back in the editor, I used the url for yellow_taxi_data from the course repo, for January 2021. I deleted the line below the url since fetching a .csv doesn't require a response.

Next I created a "data_types" array for pandas to read the data from the .csv file. I copied the data types from the upload_data.ipynb notebook and replaced BIGINT with pd.Int64Dtype() and FLOAT(53) with float. This will reduce memory usage in pandas. I also told pandas that the file was zipped and which columns to parse as dates, with an array named "parse_dates". The whole function looks like this:

def load_data_from_api(*args, **kwargs):
    """
    Template for loading data from API
    """
    url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz'

    taxi_dtypes = {
        'VendorID': pd.Int64Dtype(),  
        'passenger_count': pd.Int64Dtype(), 
        'trip_distance': float, 
        'RatecodeID': pd.Int64Dtype(), 
        'store_and_fwd_flag': str, 
        'PULocationID': pd.Int64Dtype(), 
        'DOLocationID': pd.Int64Dtype(), 
        'payment_type': pd.Int64Dtype(), 
        'fare_amount': float, 
        'extra': float, 
        'mta_tax': float, 
        'tip_amount': float, 
        'tolls_amount': float, 
        'improvement_surcharge': float,
        'total_amount': float, 
        'congestion_surcharge': float
    }

    parse_dates = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']

    return pd.read_csv(url, sep=',', compression="gzip", dtype=taxi_dtypes, parse_dates=parse_dates)

Enter fullscreen mode Exit fullscreen mode

I ran the block. It printed a few rows of the familiar output.

Next I transformed the data. I created a generic Python Data transformer block and called it "transform_taxi_data". Here, I removed the rows that have 0 passenger count. I started with a row that prints how many rows have passenger_count equal to 0. print(f"Rows with passenger count = 0: {data['passenger_count'].isin([0]).sum()}")
Then I returned data[data['passenger_count'] > 0]. I added a test assertion. Any function with an @test decorator will be called, so I could just add one.

@test
def test_output(output, *args):
    assert output['passenger_count'].isin([0]).sum() == 0, 'There are rides with 0 passengers'
Enter fullscreen mode Exit fullscreen mode

Next I created a block to export the data. I made it a Python file named "taxi_data_to_postgres" and had it write the data to PostgreSQL. I edited the parameters in the function to have the name of the database (ny_taxi), the name of the table (yellow_taxi_data), and the config profile "dev" that I wrote. I ran that file. It took a few seconds.

Finally, to look at the data, I created a standalone SQL data loader. I set the connection to PostgreSQL and the profile to "dev". I checked the box for raw SQL. Then I typed the command SELECT * FROM ny_taxi.yellow_taxi_data LIMIT 10. I saw the familiar data that was now fetched from the postgres database.

Top comments (0)