Your AI-generated content looks great today—but Google deprioritizes it after 60 days. Here's the Python script content creators use to auto-detect stale posts and trigger regeneration before traffic tanks.
I've been running an AI content pipeline for about 18 months, and the first time I checked Search Console after a content drought I lost 40% of my organic traffic in six weeks. Every post was technically accurate when published—but "technically accurate when published" is not the same as "currently relevant." Topics shift, statistics expire, and Google's freshness signals are ruthless about it.
So I built a monitor. Here's exactly how it works.
Why AI Content Goes Stale Faster Than Hand-Written Posts
Human writers update posts instinctively. They remember writing about a framework, notice a new version dropped, and go back and edit. AI-generated content has no such feedback loop—it sits there at its original quality until someone manually decides to check it.
The decay patterns I've observed fall into three buckets:
- Data decay: Statistics, benchmark numbers, pricing. A post saying "GPT-4 costs $0.03 per 1K tokens" is already wrong.
- Terminology drift: The ecosystem renames things. "Serverless" became "edge functions" became "cloud-native." Same concept, different SEO target.
- Competitive decay: A post ranking for a keyword you owned six months ago now competes with 40 newer posts. Your content hasn't changed but the landscape has.
AI content is more susceptible because it's usually generated in bulk runs. You publish 200 posts in a month, then another 200. The first batch ages while you're generating the second.
Architecture Overview
The system has four components:
- Content crawler — reads your CMS or markdown files, extracts metadata
- Freshness scorer — calculates an age-weighted relevance score using Claude API
- Threshold checker — compares scores against configurable decay curves
- Queue writer — pushes stale content IDs to a refresh queue (Redis, SQS, whatever you use)
The scoring step is the interesting one. Raw publish date alone is a poor proxy for staleness—a post about sorting algorithms from 2019 is fine, but a post about LLM pricing from six months ago is outdated. You need semantic staleness, not just chronological staleness.
Setting Up the Environment
pip install anthropic requests python-frontmatter redis python-dotenv numpy
You'll need an ANTHROPIC_API_KEY in your .env file, plus REDIS_URL if you're using the queue integration. The python-frontmatter library handles parsing markdown files with YAML headers, which covers most static site generators.
The Content Crawler
This reads a directory of markdown posts and builds a structured list with publish dates, titles, and body text.
import os
import frontmatter
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import json
def load_content_index(content_dir: str) -> list[dict]:
"""
Walk a directory of markdown files and extract metadata + body text.
Returns a list of content records ready for freshness scoring.
"""
posts = []
content_path = Path(content_dir)
for md_file in content_path.rglob("*.md"):
try:
post = frontmatter.load(str(md_file))
# Parse publish date — handle multiple common frontmatter key names
pub_date = None
for date_key in ["date", "published_at", "publishedAt", "created_at"]:
if date_key in post.metadata:
raw_date = post.metadata[date_key]
if isinstance(raw_date, datetime):
pub_date = raw_date.replace(tzinfo=timezone.utc)
elif isinstance(raw_date, str):
pub_date = datetime.fromisoformat(raw_date).replace(tzinfo=timezone.utc)
break
if pub_date is None:
# Fall back to file modification time
mtime = os.path.getmtime(md_file)
pub_date = datetime.fromtimestamp(mtime, tz=timezone.utc)
# Calculate age in days
age_days = (datetime.now(timezone.utc) - pub_date).days
# Truncate body to first 800 words for scoring — full text is expensive
body_words = post.content.split()
body_preview = " ".join(body_words[:800])
posts.append({
"file_path": str(md_file),
"slug": md_file.stem,
"title": post.metadata.get("title", md_file.stem),
"tags": post.metadata.get("tags", []),
"publish_date": pub_date.isoformat(),
"age_days": age_days,
"body_preview": body_preview,
"word_count": len(body_words),
})
except Exception as e:
print(f"Warning: skipping {md_file} — {e}")
continue
posts.sort(key=lambda x: x["age_days"], reverse=True)
return posts
if __name__ == "__main__":
posts = load_content_index("./content/posts")
print(f"Loaded {len(posts)} posts")
print(json.dumps(posts[:2], indent=2))
This function walks the directory recursively, handles four common frontmatter date key names, and falls back to file modification time if nothing's found. The age_days field is what drives the decay curve downstream.
Scoring Freshness with Claude API
This is the core of the system. I send each post's title, tags, and a body preview to Claude with a prompt that asks it to score two things: how time-sensitive the topic is, and how likely the specific claims in the preview have drifted.
import anthropic
import os
from dotenv import load_dotenv
import json
import time
load_dotenv()
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
SCORING_PROMPT = """You are evaluating whether a blog post needs to be refreshed for SEO and accuracy.
Given the post metadata and a preview of the content, return a JSON object with these fields:
- topic_volatility: float 0.0-1.0 — how fast does this topic area typically change?
(0.0 = timeless like "what is recursion", 1.0 = highly volatile like "best AI tools this year")
- claim_risk: float 0.0-1.0 — based on the content preview, how likely are specific claims,
prices, versions, or statistics to be outdated now?
- freshness_recommendation: string — one of: "no_action", "light_update", "full_refresh", "rewrite"
- reasoning: string — 1-2 sentences explaining your scores
Return only valid JSON. No markdown, no explanation outside the JSON object.
Post metadata:
Title: {title}
Tags: {tags}
Age: {age_days} days old
Content preview: {body_preview}"""
def score_content_freshness(post: dict, model: str = "claude-opus-4-5") -> dict:
"""
Send a post to Claude and get a structured freshness score back.
Returns the original post dict with scoring fields added.
"""
prompt = SCORING_PROMPT.format(
title=post["title"],
tags=", ".join(post["tags"]) if post["tags"] else "none",
age_days=post["age_days"],
body_preview=post["body_preview"][:2000], # Hard cap for token budget
)
message = client.messages.create(
model=model,
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
raw_response = message.content[0].text.strip()
try:
scores = json.loads(raw_response)
except json.JSONDecodeError:
# Claude occasionally wraps JSON in markdown — strip it
import re
json_match = re.search(r'\{.*\}', raw_response, re.DOTALL)
if json_match:
scores = json.loads(json_match.group())
else:
raise ValueError(f"Could not parse Claude response: {raw_response}")
# Composite freshness score — lower is staler
# Weight: 40% age factor, 35% claim risk, 25% topic volatility
age_factor = max(0.0, 1.0 - (post["age_days"] / 180)) # Decays to 0 over 6 months
composite_score = (
0.40 * age_factor +
0.35 * (1.0 - scores["claim_risk"]) +
0.25 * (1.0 - scores["topic_volatility"])
)
return {
**post,
"topic_volatility": scores["topic_volatility"],
"claim_risk": scores["claim_risk"],
"freshness_recommendation": scores["freshness_recommendation"],
"reasoning": scores["reasoning"],
"composite_freshness_score": round(composite_score, 3),
}
def batch_score_posts(posts: list[dict], max_posts: int = 50, delay_seconds: float = 0.5) -> list[dict]:
"""
Score a list of posts with rate limiting. Prioritizes oldest content first.
"""
# Only score posts older than 30 days — younger posts are fine
candidates = [p for p in posts if p["age_days"] >= 30][:max_posts]
scored = []
for i, post in enumerate(candidates):
print(f"Scoring {i+1}/{len(candidates)}: {post['title'][:60]}")
try:
scored_post = score_content_freshness(post)
scored.append(scored_post)
time.sleep(delay_seconds)
except Exception as e:
print(f" Error scoring {post['slug']}: {e}")
continue
return sorted(scored, key=lambda x: x["composite_freshness_score"])
The composite score formula is the part I tuned most. The age_factor decays linearly to zero at 180 days—you can adjust that window to match your content category. A post about Docker networking ages slower than a post about ChatGPT plugins.
The Bug I Hit
I ran into a json.JSONDecodeError because Claude was wrapping JSON responses in markdown code fences (`json) about 15% of the time depending on how the prompt was phrased. The regex fallback in score_content_freshness handles this, but it took me an embarrassing number of failed runs to figure out I needed it. Always add defensive JSON parsing when you're expecting structured output from an LLM.
Integration: Pushing Stale Content to a Refresh Queue
Once posts are scored, anything below a threshold gets queued for regeneration. I use Redis sorted sets here—the score becomes the sort key, so your workers can pull the "most stale" content first.
`python
import redis
import os
from datetime import datetime
def push_to_refresh_queue(
scored_posts: list[dict],
freshness_threshold: float = 0.45,
redis_url: str = None
) -> dict[str, int]:
"""
Push posts below the freshness threshold into a Redis sorted set.
Uses composite_freshness_score as the sort key (lower = higher priority to refresh).
Returns a summary dict with counts per recommendation type.
"""
redis_url = redis_url or os.environ.get("REDIS_URL", "redis://localhost:6379")
r = redis.from_url(redis_url)
stale_posts = [p for p in scored_posts if p["composite_freshness_score"] < freshness_threshold]
summary = {"queued": 0, "skipped": 0, "already_queued": 0}
pipe = r.pipeline()
for post in stale_posts:
queue_key = "content:refresh_queue"
metadata_key = f"content:refresh_meta:{post['slug']}"
# Check if already in queue
existing_score = r.zscore(queue_key, post["slug"])
if existing_score is not None:
summary["already_queued"] += 1
continue
# Add to sorted set — score is freshness (lower = more urgent)
pipe.zadd(queue_key, {post["slug"]: post["composite_freshness_score"]})
# Store metadata so the worker knows what to do
pipe.hset(metadata_key, mapping={
"file_path": post["file_path"],
"title": post["title"],
"recommendation": post["freshness_recommendation"],
"reasoning": post["reasoning"],
"age_days": post["age_days"],
"score": post["composite_freshness_score"],
"queued_at": datetime.utcnow().isoformat(),
})
# Expire metadata after 7 days — prevents stale queue entries
pipe.expire(metadata_key, 604800)
summary["queued"] += 1
print(f" Queued: {post['slug']} (score: {post['composite_freshness_score']}, action: {post['freshness_recommendation']})")
pipe.execute()
summary["skipped"] = len(scored_posts) - len(stale_posts)
return summary
def get_next_refresh_job(redis_url: str = None) -> Optional[dict]:
"""
Worker-side function: pop the most urgent refresh job from the queue.
Call this from your content regeneration worker.
"""
redis_url = redis_url or os.environ.get("REDIS_URL", "redis://localhost:6379")
r = redis.from_url(redis_url)
# Get the slug with lowest freshness score (most stale)
result = r.zpopmin("content:refresh_queue", count=1)
if not result:
return None
slug, score = result[0]
slug = slug.decode() if isinstance(slug, bytes) else slug
metadata = r.hgetall(f"content:refresh_meta:{slug}")
if not metadata:
return None
return {k.decode(): v.decode() for k, v in metadata.items()} | {"slug": slug, "queue_score": score}
`
The push_to_refresh_queue function uses a Redis pipeline to batch all writes into a single round trip. The zpopmin in get_next_refresh_job is atomic—multiple workers can call it without double-processing the same post.
Scheduling the Monitor
Run this on a cron job. I use GitHub Actions on a schedule for smaller sites, and a simple cron task on the server for larger ones.
`bash
Run every Monday at 6am UTC
Add to crontab with: crontab -e
0 6 * * 1 cd /app && python freshness_monitor.py --content-dir ./content/posts --threshold 0.45 >> /var/log/freshness_monitor.log 2>&1
`
Complete Working Script
Copy this into freshness_monitor.py and run it directly. It wires together everything above with argparse so you can configure it without editing code.
`python
!/usr/bin/env python3
"""
freshness_monitor.py — Content Freshness Monitor
Usage: python freshness_monitor.py --content-dir ./posts --threshold 0.45 --max-posts 100
"""
import argparse
import json
import os
import sys
from dotenv import load_dotenv
load_dotenv()
Import all functions defined in the sections above
In production: move each section into its own module and import them
def main():
parser = argparse.ArgumentParser(description="Score content freshness and queue stale posts for refresh")
parser
Follow for more practical AI and productivity content.
Top comments (0)