DEV Community

adrienshen
adrienshen

Posted on

Creating a Data Pipeline from Airtable to Backend

I often work with e-commerce systems and we frequently need to get product data into whatever e-commerce system we are using whether that might be Saleor, Magento, Wordpress, Shopify, or whatever comes next. One tool we find useful is Airtable for administration staff to fill in all the product info first in an organize manner appropriate for their team. Then afterwards, we could write a script in Python to import the rows through the Airtable API into our application database. The Airtable API is straightforward to use, but there are some considerations and issues we came across. I will try to outline below to make it easier for other developers.

Some technologies and tools we will be covering in this article:

  • Airtable and Airtable API
  • Python and Django (Saleor ecommerce framework)
  • GraphQL queries and mutations

For those not familiar with Airtable, it’s a user friendly database spreadsheet tool that has a nicely documented API that’s customized to each base you have. It has a generous free tier and I find it useful for various tasks from lightweight CMS systems to offering easy configuration settings to clients.

GraphQL is REST alternative and allows developers to query for exactly the data they need. One of the beat points of GraphQL is that it is self documenting and typed query language. Example of GraphQL query:

import gql from 'graphql-tag';
import { useQuery } from '@apollo/react-hooks';

const GET_DOGS = gql`
  {
    dogs {
      id
      breed
    }
  }
`;

Preparing Airtable columns and API Key

Airtable is really intuitive and simple to use. That’s why it’s a good tool for non-technical folks like office admins and data entry. As an example we have a product base like that contains the columns:

  • sku: Text
  • name: Text
  • description: Long Text
  • base_price: Number
  • publication_date: Date
  • is_published: Boolean
  • weight: Decimal
  • quantity_opt: Number
  • images_opt: Attachments (will explain more later)
  • category: SingleSelect
  • product_pk_auto: Text (ID)
  • has_been_imported: Boolean

There are some optional parameters for the GraphQL API that we’ll be posting the mutation to, so we can mark that with _opt suffix to let the administrators know the field is optional.

Airtable custom api screenshot

After all the columns and some example data is setup in the Airtable base, AirTable will provide us with a custom API filled in with our columns and example data.

We need to grab the BASE_ID and API_KEY to use in our GET request. Our API_KEY is on the left panel, just check “show API key”. For the purpose of importing a list of products to our database, we will need to use the “List collection records” endpoint. The example curl requests looks like this:

    curl "https://api.airtable.com/v0/appC7p39pnmKYOJ17/Collections?maxRecords=3&view=Grid%20view" \
      -H "Authorization: Bearer YOUR_API_KEY"

The general import logic that we want to is:

  1. Grab the list of all rows from Airtable
  2. Get authentication token from our e-commerce API if there is one
  3. Make sure to get any mappings like category → category_ids from our system
  4. Map each row from Airtable into our systems request object
  5. Create the POST request to update our system (GraphQL mutation in my case)
  6. Get the product_id and update Airtable with it
  7. Download and process any images in our application
  8. Repeat with each row until all rows are exhausted

In our case, we like to implement a custom Django command to initiate the imports task. It’s quite nice and possible later to run on a schedule cron job if we desire. Here is the handler function of the script to get the list of all rows from Airtable, authentication, sending mutation, and updating back to Airtable. In subsequent steps, we will fill in the sub function logic.

    from django.core.management.base import BaseCommand, CommandError
    import requests
    from pprint import pprint
    import json
    import os

    API_KEY = 'airtable-api-key';

    class Command(BaseCommand):

      def handle(self, *args, **options):
        # pull records from airtable
        response = requests.get('https://api.airtable.com/v0/appC7p39pnmKYOJ17/Products?maxRecords=1000&view=Grid%20view', headers={'Authorization': 'Bearer ' + API_KEY})

        product_rows = response.json().get('records')
        pprint('product rows >> ', product_rows)

        json = self.get_permission_token()
        if len(json['data']['tokenCreate']['errors']) is not 0:
          print('Abort importing, Authentication failed.')
          return
        print('New token created: {} for import user: {} \n\n'.format(json['data']['tokenCreate']['token'], json['data']['tokenCreate']['user']['email']))
        headers = {
          'Authorization': 'JWT {}'.format(json['data']['tokenCreate']['token'])
        }

        category_mappings = self.get_categories();

        for pr in product_rows:
          fields = pr.get('fields', None)

          if not fields or fields.get('has_been_imported') or fields.get('product_id_pk'):
            print('No fields or row has already been imported: sku={}, name={}'.format(fields.get('sku'), fields.get('name')))
            continue
          create_input = self.prepare_input(fields=fields, collections_mappings=collections_mappings)

          results = self.send_mutation(create_input, headers)
          if not results:
            self.handle_error()

          self.update_airtable(pr['id'], results)

      def prepare_input():
        pass

      def get_permission_token():
        pass

      def get_categories(self):
        pass

      def send_mutation(self):
        pass

      def update_airtable(self):
        pass

      def handle_error(self):
        pass

In the next sections we will implement each stage of the logic by filling in the functions. We are going to have a main for loop that goes through each record, so if we have an authentication token for our app which mostly likely you will, we need to get this token before entering into the loop. An example that works for our case is shown below:

 def get_permission_token():
   mutation = '''
     mutation TokenCreateMutation($email: String!, $password: String!) {
       tokenCreate(email: $email, password: $password) {
         token
         errors{
           field
           message
         }
         user{
           id
           email
           }
         }
       }
   '''
   input = {"email": auth_email, "password": auth_password}
   response = requests.post('http://localhost:8000/graphql/', json={'query': mutation, "variables": input})

Now that we have the auth token, we can use it every time we need to update anything on our e-commerce system. The next step would need to make sure the data is mapped correctly to the format our API will accept. We also have to make sure that any columns that are not optional needs to be provided and have the record skipped with optional error handling if required fields are missing. Every API and system is different, so you will have to adjust it so it works for you. I will show my implementation below for Saleor ProductCreate API:

def prepare_input(self, fields):
  if (fields.get('name') and fields.get('base_price') and fields.get('category') and fields.get('description') and fields.get('publication_date') and fields.get('weight')):
    input = {
      'publicationDate': fields.get('publication_date'),
      'name': fields.get('name'),
      'description': fields.get('description'),
      'isPublished': fields.get('is_published'),
      'weight': fields.get('weight'),
      'basePrice': fields.get('base_price'),
      'sku': fields.get('sku'),
      'weight': fields.get('weight'),
      # just pass the first image
      'airtableImageUrl': fields.get('images_opt')[0]['url']
    }
 # handle mappings for category

We also need to make sure relational mappings are correct during the imports process. So Product: wall cabinet A might belong to the Category: “Kitchen Sets”. In APIs this is usually done through passing a list of IDs like {'category': 'Q2F0ZWdvcnk6MTA'} to the create mutation. On the Airtable interface, we want the ids to be human readable inputs like: Kitchen Sets. We can keep the mappings as a hard-coded dictionary in the imports scripts or we can dynamically get the list of products from our application.

 def get_categories(self):
     query = '''
       query categories($first: Int) {
           categories(first: $first) {
               edges {
                   node {
                       id
                       name
                   }
                   cursor
               }
               totalCount
           }
       }
   '''
   response = requests.post(GRAPHQL_BASE, json={'query': query, 'variables': { 'first': 20 }})
   json = response.json()
   category_mappings_to_id = {}
   for category in json['data']['categories']['edges']:
     category_mappings_to_id[category['node']['name']] = category['node']['id']
   return category_mappings_to_id

   def prepare_input(self, fields, category_mappings):
     # basic input fields...
     # handle mappings for category and any other related models
     if fields.get('category'):
       input['category'] = category_mappings[fields.get('category')]

Now that the input is how we want it, we need to actually send the mutation through to our API to update the rows in the database. We will implement the send_mutation function now. We handle the error case if results from API is empty and return the product to the handler function.

 def send_mutation(self, input, headers):
     mutation = '''
       mutation ProductCreateMutation($input: ProductCreateInput!) {
           productCreate(input: $input) {
               errors {
                   field
                   message
               }
               product {
                   id
                   name
                   description
               }
           }
       }
   '''

   response = requests.post(GRAPHQL_BASE, json={'query': mutation, "variables": { "input": input }}, headers=headers)
   json = response.json()
   print('Product updated :: ', json['data']['productCreate'])
   if len(json['data']['productCreate']['errors']) is not 0:
     print('Import of {} failed'.format(input['name']))
     return None
   else:
     return json['data']['productCreate']['product']

Once, we are sure that the product has been updated and there were no errors in our application, we will want to update the Airtable row to reflect that the product now exists in our application database. This also ensures that the same script can be run multiple times as more products are added to the Airtable sheet as we can then skip the records that have already been imported.

 def update_airtable(self, id, updated):
   patch_payload = {
     "records": [{
       "id": id,
       "fields": {
         "has_been_imported_auto": True,
         "product_id_pk_auto": updated['id'],
       }
     }]
   }

   response = requests.patch('https://api.airtable.com/v0/<replace-with-your-baseid>/Products', json=patch_payload, headers={'Authorization': 'Bearer ' + api_key})
   data = response.json()
   print('Airtable id updated: {} \n'.format(data['records'][0]['id']))

So one way to do this is to have the application model save the airtable_image in another column, then use a background job to download the image attachment from the url saved. The specifics of background jobs are outside the scope of this tutorial, but the image download logic might look something like this:

 # background tasks file in Django
import requests
import tempfile

 @app.task
 def save_airtable_product_image(pk):
   instance = Product.objects.get(pk=pk)
   temp = download_airtable_attachment_image(instance.airtable_image_url)
   file_name = instance.airtable_image_url.split('/')[-1]
   pi = ProductImage()
   pi.product = instance
   pi.image.save(file_name, files.File(temp))
   create_product_thumbnails(pi.pk)

 def download_airtable_attachment_image(image_url):
   '''Download airtable (https://dl.airtable.com/.attachments) image and save to filesystem, then save reference in product_collection.background_image, then generate thumbnails for various sizes
   '''
   request.get(image_url, stream=True)
   if request.status_code != requests.codes.ok:
     print('Could not download image {}'.format(image_url))

   temp = tempfile.NamedTemporaryFile()
   for block in request.iter_content(1024 * 8):
     if not block:
       break
     temp.write(block)

   return temp

That sums up the main tutorial on creating a pipeline from Airtable to your E-commerce application. Some further improvements to try might be to add Slack alerts for team members with import summary or to automate the task on a daily schedule. Will leave this to the reader to try.

I enjoy building e-commerce applications on a variety of technology stacks including Django, Node.js, and React. Even more enjoyable is helping merchants succeed and and scale with technology. I hope my writing benefits developers and if you know merchants who could use digital technologies to enhance their business,feel free to reach out to to chat!

Top comments (1)

Collapse
 
sairina profile image
Sairina

Thanks for writing this! I'm new to this world, but I'm wondering if it's possible to use the airtable-python-wrapper SDK (airtable-python-wrapper.readthedoc...) in this case to pull all the records from Airtable, too?