DEV Community

Cover image for API to GCP with Mage
Cris Crawford
Cris Crawford

Posted on

API to GCP with Mage

I'm going to use Mage to read a file, make some changes to the data, and then write the result to GCP. I'm going to partition the data as well.

First we will reuse some of the functions that we wrote before. Start a new standard batch pipeline. In the file menu on the left, find data_loaders directory and drag the file "load_api_data.py" into the main window. Next find the transformers directory and drag "transform_taxi_data.py" into the main window. I couldn't get the transform_taxi_data.py to appear below the load_api_data.py, so I had to do this in the opposite order. Now you should connect the two in the Tree view on the right. Just select the dot on the bottom of "load_api_data" and connect to the dot on top of the "transform_taxi_data". It should look like this:

Tree view of preexisting Mage blocks

Next add a Data exporter. For this, create a Python data exporter to Google Cloud Storage. Call it "taxi_to_gcs_parquet". Edit the file to have your bucket name and the object_key "nyc_taxi_data.parquet", which will be the target. Execute this file with all upstream blocks. A parquet file should appear in the bucket.

Next, I created a partitioned file. I partitioned it by date. This speeds up queries that search by date. First, I opened a new Export data Python file with a generic template. I called it "taxi_to_gcs_partitioned_parquet".

It should look like this:

Tree view of Mage blocks with exporters

I defined my credentials manually. I used the pyarrow library to handle the partitioning. Otherwise I would have to divide the data myself and put it into the database in chunks.

import pyarrow as pa
import pyarrow.parquet as pq
import os
Enter fullscreen mode Exit fullscreen mode

Then set up the credentials and variables:

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/home/src/keys.json"

bucket_name = 'mage-zoomcamp-cris-crawford'
project_id = 'data-engineering-2024-411821'

table_name = 'nyc_taxi_data'

root_path = f'{bucket_name}/{table_name}'
Enter fullscreen mode Exit fullscreen mode

In the export_data function, I added a new column for the date, because all we have in the New York taxi database is the datetime, and I'm partitioning by date. Then pyarrow needs its own table, and needs to know which file system we'll write to.

@data_exporter
def export_data(data, *args, **kwargs):
    data['tpep_pickup_date'] = data['tpep_pickup_datetime'].dt.date

    table = pa.Table.from_pandas(data)

    gcs = pa.fs.GcsFileSystem()

    pq.write_to_dataset(
        table,
        root_path=root_path,
        partition_cols=['tpep_pickup_date'],
        filesystem=gcs
    )
Enter fullscreen mode Exit fullscreen mode

Now run this block of code. You'll see the Google cloud storage bucket now contains a dataset called "nyc_taxi_data" that has one parquet file for every day in that file. I'm not sure how project_id was necessary, but it's time to move on to the next module, Google Cloud to Google Big Query.

Top comments (0)