DEV Community

Wandering Animation
Wandering Animation

Posted on

Beauty Content Pipeline – Automated Social Media Content Collector

This project is an automation pipeline that finds, downloads, organizes, and prepares beauty-related video content from YouTube Shorts, Instagram, and TikTok for repurposing and posting. The script uses platform-specific scrapers to fetch videos by keywords or hashtags, ensures all downloads are saved as MP4 files (ready for upload), and organizes them into separate folders for each social media platform.

Once collected, the content can be optionally archived into a ZIP file for backup or uploaded to a Google Drive Shared Drive for storage. The idea is to save time and effort by automating the content collection process, ensuring that videos are already in the correct format and structure, ready to be posted or scheduled to multiple platforms.

Core Features:

Search & download videos from YouTube Shorts, Instagram, and TikTok.

Automatically convert and store videos as MP4.

Organize downloads into separate folders by platform and hashtag/query.

Optional ZIP archiving for backups.

Upload archives to Google Drive for easy sharing or cloud storage.

!/usr/bin/env python

"""
Beauty Content Pipeline (fixed, single-file)

  • Download YouTube Shorts (MP4)
  • Download Instagram hashtag posts (needs IG login; video only)
  • (Optional) TikTok: gracefully skipped unless session set up
  • Zip data/ into beauty_content.zip
  • Upload ZIP to Google Drive (Shared Drive) via Service Account
  • Upload latest MP4 to YouTube (OAuth Installed App)
  • Publish Instagram Reel by hosting the MP4 on Drive (public URL) then using IG Graph API """

import os
import json
import argparse
import logging
import zipfile
import asyncio
import time
from pathlib import Path

Third‑party

import requests
import yt_dlp
import instaloader

Google APIs

from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow

Try SSL (some libs require it)

try:
import ssl # noqa
except ImportError:
ssl = None

───────────────────────────── Configuration ─────────────────────────────

State file remembers what you've already downloaded

STATE_FILE = "state.json"

Service Account JSON for Google Drive (Shared Drive upload)

SERVICE_ACCOUNT_FILE = r"C:\Users\jer93\OneDrive\Desktop\beauty-content-pipeline-67720b3e86da.json"

A folder ID that lives inside a Shared Drive you created and shared with the service account

DRIVE_FOLDER_ID = "0ANcAitTsxK65Uk9PVA" # <- update if you change folders

OAuth client secret for YouTube uploads (Installed App)

Download from Google Cloud Console (OAuth client) and point to it here:

YT_CLIENT_SECRET = r"C:\Path\to\client_secret_oauth.json" # <-- CHANGE THIS
YT_TOKEN_FILE = "yt_token.json"
YT_SCOPES = ["https://www.googleapis.com/auth/youtube.upload"]

Instagram Graph API (Reels publishing)

IG_ACCESS_TOKEN = os.getenv("IG_ACCESS_TOKEN", "") # long‑lived token from FB developer
IG_USER_ID = os.getenv("IG_USER_ID", "") # your Instagram Business numeric ID

Google Drive file scopes

SCOPES = ["https://www.googleapis.com/auth/drive.file"]

Instagram scraping login (Instaloader)

IG_LOGIN_USER = "lunara.beautycare" # you can move these to a .env later
IG_LOGIN_PASS = "53e2.PvB,jURfRL"

───────────────────────────── Helpers ─────────────────────────────

def load_state():
if os.path.isfile(STATE_FILE):
with open(STATE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"youtube": [], "instagram": [], "tiktok": []}

def save_state(state):
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)

MP4 finder

def pick_latest_mp4(folder):
if not os.path.isdir(folder):
return None
cands = [os.path.join(folder, f) for f in os.listdir(folder) if f.lower().endswith(".mp4")]
if not cands:
return None
return max(cands, key=os.path.getmtime)

───────────────────────────── YouTube Download (MP4) ─────────────────────────────

def download_youtube_shorts(query, limit, seen_ids, save_path="data/YouTube"):
"""Search Shorts and download new ones as MP4."""
new_ids = []
os.makedirs(save_path, exist_ok=True)
opts = {
"format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4",
"merge_output_format": "mp4",
"outtmpl": os.path.join(save_path, "%(id)s.%(ext)s"),
"noplaylist": True,
"quiet": True,
"postprocessors": [{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"}],
}
try:
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(f"ytsearch{limit}:{query} shorts", download=False)
for entry in info.get("entries", []):
vid = entry.get("id")
if vid and vid not in seen_ids:
ydl.download([entry["webpage_url"]])
new_ids.append(vid)
logging.info(f"YouTube: downloaded {len(new_ids)} new shorts")
except Exception as e:
logging.error(f"YouTube error: {e}")
return new_ids

───────────────────────────── Instagram Download (video only) ─────────────────────────────

def scrape_instagram_tag(tag, limit, seen_ids, save_base="data/Instagram"):
"""Download latest video posts for a hashtag (login required)."""
new_ids = []
os.makedirs(save_base, exist_ok=True)
save_path = os.path.join(save_base, tag)
os.makedirs(save_path, exist_ok=True)

loader = instaloader.Instaloader(download_comments=False, save_metadata=False, dirname_pattern=save_path)
try:
    loader.login(IG_LOGIN_USER, IG_LOGIN_PASS)  # required to avoid 403 on hashtags
    ht = instaloader.Hashtag.from_name(loader.context, tag)
    count = 0
    for post in ht.get_posts():
        if count >= limit:
            break
        if not post.is_video:
            continue
        sc = post.shortcode
        if sc not in seen_ids:
            loader.download_post(post, target=save_path)
            new_ids.append(sc)
            count += 1
    logging.info(f"Instagram: downloaded {len(new_ids)} new posts (video only)")
except Exception as e:
    logging.error(f"Instagram error: {e}")
return new_ids
Enter fullscreen mode Exit fullscreen mode

───────────────────────────── TikTok (graceful skip) ─────────────────────────────

def download_tiktok_hashtag(tag, limit, seen_ids, save_path="data/TikTok"):
"""Currently skipped unless you set up a working TikTokApi session."""
try:
from TikTokApi import TikTokApi # local import so the rest of the file still runs if missing
except Exception:
logging.warning("TikTok scraper unavailable—skipping.")
return []

new_ids = []
os.makedirs(save_path, exist_ok=True)
try:
    api = getattr(TikTokApi, "get_instance", TikTokApi)()
    gen = api.by_hashtag(tag, count=limit) if hasattr(api, "by_hashtag") else api.hashtag(name=tag).videos(count=limit)

    async def collect_async(gen_):
        items = []
        async for item in gen_:
            items.append(item)
        return items

    videos = asyncio.run(collect_async(gen))
    for v in videos:
        vid = v.get("id")
        if not vid or vid in seen_ids:
            continue
        try:
            data = api.video(id=vid)
            path = os.path.join(save_path, f"{vid}.mp4")
            with open(path, "wb") as f:
                f.write(data)
            new_ids.append(vid)
        except Exception as e:
            logging.warning(f"Failed to save TikTok {vid}: {e}")
    logging.info(f"TikTok: downloaded {len(new_ids)} new videos")
except Exception as e:
    logging.warning(f"TikTok skipped: {e}")
    return []
return new_ids
Enter fullscreen mode Exit fullscreen mode

───────────────────────────── ZIP Utility ─────────────────────────────

def zip_directory(src, dst):
"""Zip entire folder src into archive dst."""
target_dir = os.path.dirname(dst)
if target_dir:
os.makedirs(target_dir, exist_ok=True)
with zipfile.ZipFile(dst, "w", zipfile.ZIP_DEFLATED) as archive:
for root, _, files in os.walk(src):
for fname in files:
full_path

!/usr/bin/env python3

"""
Instagram (Graph API) Token Helper

  • Guides you through Facebook OAuth
  • Exchanges for a short-lived user token
  • Upgrades to long-lived token
  • Finds your IG Business/Creator user ID connected to your Facebook Page
  • Saves values to a .env line you can paste

Prereqs:
1) You must have a Facebook App (in "Live" or "Development" mode) with:

  • Valid OAuth Redirect URIs (add your REDIRECT_URI below to the app)
  • App permissions requested: pages_show_list, instagram_basic, instagram_content_publish, pages_read_engagement, pages_manage_metadata, instagram_manage_insights, business_management

2) Your Instagram account must be a Business or Creator account and connected to a Facebook Page.
"""

import os
import sys
import urllib.parse as urlparse
from urllib.parse import parse_qs
import webbrowser
import requests

FB_API_VERSION = "v19.0"

Read from env or prompt

APP_ID = os.getenv("FB_APP_ID") or input("Enter your Facebook APP_ID: ").strip()
APP_SECRET = os.getenv("FB_APP_SECRET") or input("Enter your Facebook APP_SECRET: ").strip()
REDIRECT_URI = os.getenv("FB_REDIRECT_URI") or input("Enter your OAuth REDIRECT_URI (must match App settings): ").strip()

The scopes you need for IG publishing

SCOPES = [
"pages_show_list",
"instagram_basic",
"instagram_content_publish",
"pages_read_engagement",
"pages_manage_metadata",
"instagram_manage_insights",
"business_management"
]

def build_auth_url():
base = f"https://www.facebook.com/{FB_API_VERSION}/dialog/oauth"
params = {
"client_id": APP_ID,
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"scope": ",".join(SCOPES)
}
return f"{base}?{urlparse.urlencode(params)}"

def exchange_code_for_short_token(code: str):
token_url = f"https://graph.facebook.com/{FB_API_VERSION}/oauth/access_token"
r = requests.get(token_url, params={
"client_id": APP_ID,
"redirect_uri": REDIRECT_URI,
"client_secret": APP_SECRET,
"code": code
}, timeout=30)
r.raise_for_status()
data = r.json()
return data["access_token"]

def exchange_for_long_lived_token(short_token: str):
token_url = f"https://graph.facebook.com/{FB_API_VERSION}/oauth/access_token"
r = requests.get(token_url, params={
"grant_type": "fb_exchange_token",
"client_id": APP_ID,
"client_secret": APP_SECRET,
"fb_exchange_token": short_token
}, timeout=30)
r.raise_for_status()
data = r.json()
return data["access_token"], data.get("expires_in")

def list_pages(long_token: str):
url = f"https://graph.facebook.com/{FB_API_VERSION}/me/accounts"
r = requests.get(url, params={"access_token": long_token}, timeout=30)
r.raise_for_status()
return r.json().get("data", [])

def get_ig_business_id_for_page(page_id: str, long_token: str):
url = f"https://graph.facebook.com/{FB_API_VERSION}/{page_id}"
r = requests.get(url, params={
"fields": "instagram_business_account",
"access_token": long_token
}, timeout=30)
r.raise_for_status()
data = r.json()
ig = data.get("instagram_business_account")
return ig.get("id") if ig else None

def main():
print("\n=== Instagram Token Helper ===\n")

# Step 1: Send user to Facebook Login
auth_url = build_auth_url()
print("1) Opening browser for Facebook login/consent…")
print("   If browser does not open, copy this URL manually:\n")
print(auth_url, "\n")
try:
    webbrowser.open(auth_url, new=1)
except Exception:
    pass

# Step 2: After login, Facebook will redirect to your REDIRECT_URI with ?code=…
print("2) After login, you will be redirected to your REDIRECT_URI.")
print("   Copy the FULL redirect URL from your browser’s address bar and paste it here.")
redirect_full = input("\nPaste FULL redirect URL: ").strip()
if "code=" not in redirect_full:
    print("Did not find ?code= in the URL. Make sure you pasted the entire redirect URL.")
    sys.exit(1)

parsed = urlparse.urlparse(redirect_full)
qs = parse_qs(parsed.query)
code = qs.get("code", [None])[0]
if not code:
    print("No 'code' found in querystring. Aborting.")
    sys.exit(1)

# Step 3: Exchange code -> short-lived user token
print("\n3) Exchanging code for a short-lived user token…")
try:
    short_token = exchange_code_for_short_token(code)
    print("   Short-lived token acquired.")
except requests.HTTPError as e:
    print("   ERROR exchanging code for token:", e.response.text)
    sys.exit(1)

# Step 4: Exchange short token -> long-lived user token
print("\n4) Exchanging short-lived token for a long-lived token…")
try:
    long_token, expires_in = exchange_for_long_lived_token(short_token)
    print(f"   Long-lived token acquired (expires_in ~ {expires_in} seconds).")
except requests.HTTPError as e:
    print("   ERROR upgrading token:", e.response.text)
    sys.exit(1)

# Step 5: Find your Page(s), then the connected IG Business account
print("\n5) Looking up your Facebook Pages and connected Instagram account…")
try:
    pages = list_pages(long_token)
    if not pages:
        print("   No pages found. Make sure your FB user manages a Page linked to your IG account.")
        sys.exit(1)

    ig_user_id = None
    for p in pages:
        pid = p.get("id")
        name = p.get("name")
        print(f"   Checking page: {name} ({pid})")
        ig_user_id = get_ig_business_id_for_page(pid, long_token)
        if ig_user_id:
            print(f"   ✔ Found IG Business Account ID: {ig_user_id} for page: {name}")
            break

    if not ig_user_id:
        print("   Could not find any Instagram Business account connected to your pages.")
        print("   Make sure your IG is a Business/Creator account and linked to one of these Facebook Pages.")
        sys.exit(1)

except requests.HTTPError as e:
    print("   ERROR while fetching pages/IG account:", e.response.text)
    sys.exit(1)

# Output result + ready-to-paste .env lines
print("\n=== SUCCESS ===")
print("Add the following to your .env (or set as environment variables):\n")
print(f"IG_ACCESS_TOKEN={long_token}")
print(f"IG_USER_ID={ig_user_id}")
print("\nStore your APP secrets safely; do not commit them to Git.")
Enter fullscreen mode Exit fullscreen mode

if name == "main":
main()

Top comments (0)