A while back I wrote about how I automate blog image creation using Python. It worked, but I had to open a CSV file, type in the title, and then open the terminal to run the script.
Now using Airtable or nocoDB, I’ve automated the process completely.
Here’s how to automate blog image using Python and n8n: Open a form URL on your computer or phone, type your blog post name, select your category, and hit submit. A webhook triggers n8n, which fetches your background image and calls a Python script running in a self-hosted Docker container. It processes the image and sends it back to Airtable via webhook callback.
This post uses Airtable, but I also have a nocoDB workflow available, which is a free self-hosted alternative.
The Airtable Form
The form is simple – just two fields:
- Title– Your blog post name
- Category– Select from your blog categories (Automation, Homelab, Cybersecurity, etc.) Submit the form, and behind the scenes, the entire automation kicks off.
When submitted it creates a new entry in Airtable, but the processed image is empty.
A few seconds later when n8n has processed the image it will appear in Airtable again with the given blog post title and blog category.
How It Works
The workflow is straightforward:
- Airtable form submissiontriggers a webhook to n8n
- Get Record IDfetches the full record details from Airtable (including background image URL)
- Build Python Payloadprepares the Python script with your actual values (title, category, image URL)
- Call Image Generation APIsends everything to my self-hosted Python API
- Python processes the imageand uploads it to MinIO (also self-hosted)
- Airtable updateswith the final image via webhook callback ## Airtable Automation Script
When a record is created and enters the “Ready for Processing” view, this script runs:
// 1. Fetch and normalize Airtable variables
let table = base.getTable('Variables');
let query = await table.selectRecordsAsync();
let variables = {};
for (let rec of query.records) {
let name = rec.getCellValue('Name');
let value = rec.getCellValue('Value');
if (name && value) {
let key = name.trim().toUpperCase().replace(/[^A-Z0-9]+/g, "_").replace(/^_|_$/g, "");
variables[key] = value;
}
}
// 2. Strip "-test" if in PRODUCTION mode
let production = String(variables["PRODUCTION"]).toLowerCase() === "true";
if (production) {
for (let key in variables) {
if (typeof variables[key] === "string" && variables[key].includes("-test")) {
variables[key] = variables[key].replace("-test", "");
}
}
}
// 3. Separate into webhook outputs and NCA endpoints
let ncaEndpoints = {};
let webhookUrl = null;
for (const [key, value] of Object.entries(variables)) {
const isWebhook = value.includes("webhook");
// camelCase conversion
const camelKey = key
.toLowerCase()
.split("_")
.map((w, i) => i === 0 ? w : w[0].toUpperCase() + w.slice(1))
.join("");
// Webhook outputs
if (isWebhook) {
output.set(`${camelKey}`, value);
if (!webhookUrl) webhookUrl = value;
}
// Grouped APIs
else if (key.startsWith("NCA_")) {
const ncaKey = key.replace("NCA_", "").toLowerCase();
ncaEndpoints[ncaKey] = value;
}
}
// 4. Get input params
let params = input.config();
// 5. Build the payload
let requestBody = {
recordId: params.recordId,
nca: ncaEndpoints,
};
// 6. Send the POST request to n8n
await fetch(webhookUrl, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(requestBody)
});
output.set("ncaEndpoints", JSON.stringify(ncaEndpoints));
// 1. Fetch and normalize Airtable variables
let table = base.getTable('Variables');
let query = await table.selectRecordsAsync();
let variables = {};
for (let rec of query.records) {
let name = rec.getCellValue('Name');
let value = rec.getCellValue('Value');
if (name && value) {
let key = name.trim().toUpperCase().replace(/[^A-Z0-9]+/g, "_").replace(/^_|_$/g, "");
variables[key] = value;
}
}
// 2. Strip "-test" if in PRODUCTION mode
let production = String(variables["PRODUCTION"]).toLowerCase() === "true";
if (production) {
for (let key in variables) {
if (typeof variables[key] === "string" && variables[key].includes("-test")) {
variables[key] = variables[key].replace("-test", "");
}
}
}
// 3. Separate into webhook outputs and NCA endpoints
let ncaEndpoints = {};
let webhookUrl = null;
for (const [key, value] of Object.entries(variables)) {
const isWebhook = value.includes("webhook");
// camelCase conversion
const camelKey = key
.toLowerCase()
.split("_")
.map((w, i) => i === 0 ? w : w[0].toUpperCase() + w.slice(1))
.join("");
// Webhook outputs
if (isWebhook) {
output.set(`${camelKey}`, value);
if (!webhookUrl) webhookUrl = value;
}
// Grouped APIs
else if (key.startsWith("NCA_")) {
const ncaKey = key.replace("NCA_", "").toLowerCase();
ncaEndpoints[ncaKey] = value;
}
}
// 4. Get input params
let params = input.config();
// 5. Build the payload
let requestBody = {
recordId: params.recordId,
nca: ncaEndpoints,
};
// 6. Send the POST request to n8n
await fetch(webhookUrl, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(requestBody)
});
output.set("ncaEndpoints", JSON.stringify(ncaEndpoints));
This script reads myVariables table(where I store all API endpoints and webhook URLs), normalizes the variable names, and sends the record ID + endpoints to n8n.
The screenshot above shows the Airtable script that handles orchestration.
Build Python Payload (The Tricky Part)
This is where template variables get interpolated.
Here’s the key function:
// Get data from the previous node (Get Record Id1 from Airtable)
const title = $json.Title || 'Untitled';
const backgroundImageUrl = $json['Background Image (from Category)']?.[0]?.url || '';
const category = $json['Category (from Category)']?.[0] || 'General';
// Create the Python code WITH ACTUAL VALUES (not n8n template variables)
const pythonCode = `from PIL import Image, ImageDraw, ImageFont
import requests
from io import BytesIO
import sys
import os
import subprocess
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
try:
import boto3
from botocore.client import Config
except ImportError:
print('Installing boto3...', file=sys.stderr)
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'boto3', '--break-system-packages', '--quiet'])
import boto3
from botocore.client import Config
print('boto3 installed', file=sys.stderr)
WIDTH, HEIGHT = 1200, 628
FONT_PATH = '/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf'
FONT_SIZE = 50
SMALL_FONT_SIZE = 24
TEXT_COLOR = 'white'
SHADOW_COLOR = 'black'
CATEGORY_COLOR = '#94a3b8'
S3_ENDPOINT = os.getenv('S3_ENDPOINT_URL')
S3_ACCESS_KEY = os.getenv('S3_ACCESS_KEY')
S3_SECRET_KEY = os.getenv('S3_SECRET_KEY')
S3_BUCKET = os.getenv('S3_BUCKET_NAME', 'nca-toolkit')
background_url = '${backgroundImageUrl}'
title = '''${title}'''
category = '${category}'
try:
response = requests.get(background_url, timeout=30)
response.raise_for_status()
bg = Image.open(BytesIO(response.content)).resize((WIDTH, HEIGHT)).convert('RGBA')
overlay = Image.new('RGBA', bg.size, (0, 0, 0, 100))
img = Image.alpha_composite(bg, overlay)
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
small_font = ImageFont.truetype(FONT_PATH, SMALL_FONT_SIZE)
except Exception as e:
print(f'Font loading failed: {e}', file=sys.stderr)
font = ImageFont.load_default()
small_font = ImageFont.load_default()
draw.text((40, 30), category, font=small_font, fill=CATEGORY_COLOR)
words = title.split()
lines, line = [], ''
for word in words:
test = f'{line} {word}'.strip()
if draw.textlength(test, font=font) < WIDTH - 100:
line = test
else:
lines.append(line)
line = word
lines.append(line)
y_start = (HEIGHT - len(lines) * FONT_SIZE) // 2
for i, l in enumerate(lines):
text_width = draw.textlength(l, font=font)
x = (WIDTH - text_width) // 2
y = y_start + i * FONT_SIZE
draw.text((x + 2, y + 2), l, font=font, fill=SHADOW_COLOR)
draw.text((x, y), l, font=font, fill=TEXT_COLOR)
output_path = '/tmp/blog-image.png'
img.convert('RGB').save(output_path, 'PNG', quality=95)
if S3_ACCESS_KEY and S3_ENDPOINT:
safe_name = title.lower().replace(' ', '-').replace('/', '-')
safe_name = ''.join(c for c in safe_name if c.isalnum() or c == '-')
object_name = f'blog-images/{safe_name}.png'
s3_client = boto3.client('s3', endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY, config=Config(signature_version='s3v4'))
s3_client.upload_file(output_path, S3_BUCKET, object_name, ExtraArgs={'ContentType': 'image/png'})
public_url = f'{S3_ENDPOINT}/{S3_BUCKET}/{object_name}'
print(public_url)
else:
print(output_path)
print('WARNING: S3 credentials not found', file=sys.stderr)
except Exception as e:
print(f'ERROR: {str(e)}', file=sys.stderr)
sys.exit(1)`;
// Build the API payload with the interpolated Python code
return [{
json: {
code: pythonCode,
timeout: 120,
webhook_url: "https://hooks.airtable.com/workflows/v1/genericWebhook/app4Jxxx",
id: $json.id || $execution.id
}
}];
// Get data from the previous node (Get Record Id1 from Airtable)
const title = $json.Title || 'Untitled';
const backgroundImageUrl = $json['Background Image (from Category)']?.[0]?.url || '';
const category = $json['Category (from Category)']?.[0] || 'General';
// Create the Python code WITH ACTUAL VALUES (not n8n template variables)
const pythonCode = `from PIL import Image, ImageDraw, ImageFont
import requests
from io import BytesIO
import sys
import os
import subprocess
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
try:
import boto3
from botocore.client import Config
except ImportError:
print('Installing boto3...', file=sys.stderr)
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'boto3', '--break-system-packages', '--quiet'])
import boto3
from botocore.client import Config
print('boto3 installed', file=sys.stderr)
WIDTH, HEIGHT = 1200, 628
FONT_PATH = '/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf'
FONT_SIZE = 50
SMALL_FONT_SIZE = 24
TEXT_COLOR = 'white'
SHADOW_COLOR = 'black'
CATEGORY_COLOR = '#94a3b8'
S3_ENDPOINT = os.getenv('S3_ENDPOINT_URL')
S3_ACCESS_KEY = os.getenv('S3_ACCESS_KEY')
S3_SECRET_KEY = os.getenv('S3_SECRET_KEY')
S3_BUCKET = os.getenv('S3_BUCKET_NAME', 'nca-toolkit')
background_url = '${backgroundImageUrl}'
title = '''${title}'''
category = '${category}'
try:
response = requests.get(background_url, timeout=30)
response.raise_for_status()
bg = Image.open(BytesIO(response.content)).resize((WIDTH, HEIGHT)).convert('RGBA')
overlay = Image.new('RGBA', bg.size, (0, 0, 0, 100))
img = Image.alpha_composite(bg, overlay)
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
small_font = ImageFont.truetype(FONT_PATH, SMALL_FONT_SIZE)
except Exception as e:
print(f'Font loading failed: {e}', file=sys.stderr)
font = ImageFont.load_default()
small_font = ImageFont.load_default()
draw.text((40, 30), category, font=small_font, fill=CATEGORY_COLOR)
words = title.split()
lines, line = [], ''
for word in words:
test = f'{line} {word}'.strip()
if draw.textlength(test, font=font) < WIDTH - 100:
line = test
else:
lines.append(line)
line = word
lines.append(line)
y_start = (HEIGHT - len(lines) * FONT_SIZE) // 2
for i, l in enumerate(lines):
text_width = draw.textlength(l, font=font)
x = (WIDTH - text_width) // 2
y = y_start + i * FONT_SIZE
draw.text((x + 2, y + 2), l, font=font, fill=SHADOW_COLOR)
draw.text((x, y), l, font=font, fill=TEXT_COLOR)
output_path = '/tmp/blog-image.png'
img.convert('RGB').save(output_path, 'PNG', quality=95)
if S3_ACCESS_KEY and S3_ENDPOINT:
safe_name = title.lower().replace(' ', '-').replace('/', '-')
safe_name = ''.join(c for c in safe_name if c.isalnum() or c == '-')
object_name = f'blog-images/{safe_name}.png'
s3_client = boto3.client('s3', endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY, config=Config(signature_version='s3v4'))
s3_client.upload_file(output_path, S3_BUCKET, object_name, ExtraArgs={'ContentType': 'image/png'})
public_url = f'{S3_ENDPOINT}/{S3_BUCKET}/{object_name}'
print(public_url)
else:
print(output_path)
print('WARNING: S3 credentials not found', file=sys.stderr)
except Exception as e:
print(f'ERROR: {str(e)}', file=sys.stderr)
sys.exit(1)`;
// Build the API payload with the interpolated Python code
return [{
json: {
code: pythonCode,
timeout: 120,
webhook_url: "https://hooks.airtable.com/workflows/v1/genericWebhook/app4Jxxx",
id: $json.id || $execution.id
}
}];
The key here: extract actual values first (const title = $json.Title), then embed them into the Python code string using template literals (${title}).
const title = $json.Title
${title}
This way, Python receives real values, not n8n syntax.
What the Python Script Does
- Downloads the background imagefrom the URL you selected
- Adds a dark overlayso text is readable
- Renders the categoryin the top-left corner (gray text)
- Wraps and centers the titlewith a shadow effect for contrast
- Uploads to MinIO/S3with a safe filename (lowercase, hyphens)
- Returns the public URLso Airtable can store it The image is 1200×628 pixels (standard for LinkedIn, Twitter, and blog featured images).
HTTP Request to Python API
The HTTP node sends a POST request with the Python code and configuration:
{
"code": "...",
"timeout": 120,
"webhook_url": "...",
"id": "..."
}
{
"code": "...",
"timeout": 120,
"webhook_url": "...",
"id": "..."
}
The Python API runs the code and returns the image URL.
Ready to Automate?
Ready to build more automations like this? Get the complete n8n workflow, Airtable base, and community support:https://www.skool.com/build-automate/about
Related Posts
- Automate Blog Image Creation Using Python
- Automate WordPress Blog Publishing with Airtable and n8n
- Automate Blog to Social Media Using AI and Make
Originally published at https://kjetilfuras.com/automate-blog-image-using-python-and-n8n/
Top comments (0)