DEV Community

pan0x12
pan0x12

Posted on

Batch Create Viral AI Video with Sora 2 API

Transform your content ideas into professional videos automatically with Python and the Sora 2 API. This comprehensive guide walks you through building an automated video generation pipeline that turns text descriptions into high-quality visual content without manual intervention.

Why Automate Video Production?

Creating video content for platforms like TikTok, YouTube, Instagram, and Facebook can drain both time and resources. Traditional video production requires extensive effort, from scripting to filming to editing. The Sora 2 API changes this paradigm by enabling:

  • Scalable Content Creation: Process dozens of video concepts simultaneously without quality compromise
  • Real-time Progress Monitoring: Track each video's generation status from submission to completion
  • Streamlined Workflow: Automatically store all generated content with organized metadata for easy access
  • Watermark-Free Output: All generated videos come without watermarks, ready for direct publishing

What you'll build: A Python automation tool that accepts creative text prompts, manages API interactions, monitors generation progress, and organizes output files—all without requiring human supervision during execution.

Getting Started

System Requirements

  • Python version 3.7 or newer installed on your system
  • Defapi API key

API Credentials Setup

To access the Sora 2 video generation service:

  1. Navigate to DefAPI Platform and complete registration
  2. Generate your personal API credentials through the dashboard
  3. Security Note: Never embed API keys directly in source code or commit them to version control systems

Best practice for credential management involves environment variables:

# For Unix-based systems (Linux/macOS)
export SORA_API_KEY="your-api-key-here"

# For Windows Command Prompt
set SORA_API_KEY=your-api-key-here

# For Windows PowerShell users
$env:SORA_API_KEY="your-api-key-here"
Enter fullscreen mode Exit fullscreen mode

Required Python Packages

The implementation needs only one external library for HTTP communication:

pip install requests
Enter fullscreen mode Exit fullscreen mode

Understanding the API Architecture

Video Creation Endpoint

API Path: POST https://api.defapi.org/api/sora2/gen

Authorization Method: Include Authorization: Bearer YOUR_API_KEY in request headers

Request Body Structure:

{
  "prompt": "Descriptive text for video generation",
  "images": ["https://example.com/reference.jpg"],  // Optional reference image (limit: 1)
  "callback_url": "https://example.com/callback"    // Optional webhook for completion notification
}
Enter fullscreen mode Exit fullscreen mode

API Response Format:

{
  "code": 0,
  "message": "ok",
  "data": {
    "task_id": "ta12345678-1234-1234-1234-123456789abc"
  }
}
Enter fullscreen mode Exit fullscreen mode

Status Polling Endpoint

API Path: GET https://api.defapi.org/api/task/query?task_id=xxx

Status Values Explained:

  • pending: Task queued, awaiting processing
  • submitted: Request acknowledged by system
  • in_progress: Video generation underway
  • success: Video successfully created
  • failed: Generation encountered an error

Successful Completion Response:

{
  "code": 0,
  "message": "ok",
  "data": {
    "task_id": "ta823dfb-eaac-44fd-aec2-3e2c7ba8e071",
    "status": "success",
    "result": {
      "video": "https://example.com/generated-video.mp4"
    },
    "consumed": "0.00100000",
    "created_at": "2025-08-03T10:22:20.010Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

Building the Automation System

Phase 1: Module Imports

import os
import time
import json
import requests
from typing import List, Dict, Optional
Enter fullscreen mode Exit fullscreen mode

Phase 2: Authentication Configuration

# Retrieve credentials from environment (secure method - never hardcode secrets)
SORA_API_KEY = os.getenv("SORA_API_KEY")
if not SORA_API_KEY:
    raise ValueError("SORA_API_KEY environment variable must be configured")

# Define API connection settings
API_BASE_URL = "https://api.defapi.org"
REQUEST_HEADERS = {
    "Authorization": f"Bearer {SORA_API_KEY}",
    "Content-Type": "application/json"
}
Enter fullscreen mode Exit fullscreen mode

Phase 3: Building Core Functions

def submit_video_generation(text_prompt: str, ref_images: Optional[List[str]] = None) -> Optional[str]:
    """
    Initiates video creation request

    Parameters:
        text_prompt: Textual description for the video
        ref_images: Optional list of reference image URLs (maximum 1)

    Returns:
        Unique task identifier for tracking
    """
    endpoint = f"{API_BASE_URL}/api/sora2/gen"

    request_body = {"prompt": text_prompt}
    if ref_images:
        request_body["images"] = ref_images[:1]  # Limit to single image

    try:
        api_response = requests.post(endpoint, headers=REQUEST_HEADERS, json=request_body, timeout=30)
        api_response.raise_for_status()

        response_data = api_response.json()
        if response_data.get("code") == 0:
            job_id = response_data["data"]["task_id"]
            print(f"✓ Submitted successfully: {text_prompt[:30]}... [ID: {job_id}]")
            return job_id
        else:
            print(f"✗ Submission error: {response_data.get('message')}")
            return None

    except requests.exceptions.RequestException as error:
        print(f"✗ Network error: {error}")
        return None


def fetch_task_status(job_id: str) -> Dict:
    """
    Retrieves current task information

    Parameters:
        job_id: Unique task identifier

    Returns:
        Dictionary containing task details
    """
    endpoint = f"{API_BASE_URL}/api/task/query"
    query_params = {"task_id": job_id}

    try:
        api_response = requests.get(endpoint, headers=REQUEST_HEADERS, params=query_params, timeout=30)
        api_response.raise_for_status()

        response_data = api_response.json()
        if response_data.get("code") == 0:
            return response_data["data"]
        else:
            print(f"✗ Status check failed: {response_data.get('message')}")
            return {}

    except requests.exceptions.RequestException as error:
        print(f"✗ Query error: {error}")
        return {}


def monitor_until_complete(job_id: str, timeout_seconds: int = 600) -> Dict:
    """
    Continuously polls task until completion or timeout

    Parameters:
        job_id: Task identifier to monitor
        timeout_seconds: Maximum waiting duration (default: 10 minutes)

    Returns:
        Final task status and results
    """
    start_timestamp = time.time()
    check_interval = 5  # Poll every 5 seconds

    while True:
        time_elapsed = time.time() - start_timestamp
        if time_elapsed > timeout_seconds:
            print(f"✗ Timeout reached [{job_id}]")
            return {"status": "timeout", "task_id": job_id}

        task_data = fetch_task_status(job_id)
        if not task_data:
            time.sleep(check_interval)
            continue

        current_status = task_data.get("status")

        if current_status == "success":
            output_url = task_data.get("result", {}).get("video")
            print(f"✓ Completed successfully [{job_id}]")
            print(f"  Download link: {output_url}")
            return task_data

        elif current_status == "failed":
            error_details = task_data.get("status_reason", {}).get("message", "Unspecified error")
            print(f"✗ Generation failed [{job_id}]: {error_details}")
            return task_data

        elif current_status in ["pending", "submitted", "in_progress"]:
            print(f"⏳ Processing... [{job_id}] ({int(time_elapsed)}s elapsed)")
            time.sleep(check_interval)

        else:
            print(f"? Unexpected status: {current_status}")
            time.sleep(check_interval)
Enter fullscreen mode Exit fullscreen mode

Phase 4: Orchestrating Batch Operations

def process_video_batch(prompt_list: List[str], output_file: str = "video_results.json"):
    """
    Executes bulk video generation workflow

    Parameters:
        prompt_list: Collection of video descriptions
        output_file: Destination file for results
    """
    print(f"\n{'='*60}")
    print(f"Initiating batch job: {len(prompt_list)} videos queued")
    print(f"{'='*60}\n")

    # Phase A: Submit all generation requests
    job_queue = []
    for index, description in enumerate(prompt_list, 1):
        print(f"[{index}/{len(prompt_list)}] Queuing request...")
        job_id = submit_video_generation(description)
        if job_id:
            job_queue.append({"task_id": job_id, "prompt": description})
        time.sleep(1)  # Rate limiting protection

    print(f"\n{len(job_queue)} tasks successfully queued\n")

    # Phase B: Monitor all jobs to completion
    final_results = []
    for index, job_entry in enumerate(job_queue, 1):
        print(f"\n[{index}/{len(job_queue)}] Monitoring task progress...")
        print(f"Description: {job_entry['prompt'][:50]}...")

        completion_data = monitor_until_complete(job_entry["task_id"])
        final_results.append({
            "prompt": job_entry["prompt"],
            "task_id": job_entry["task_id"],
            "status": completion_data.get("status"),
            "video_url": completion_data.get("result", {}).get("video"),
            "consumed": completion_data.get("consumed"),
            "created_at": completion_data.get("created_at")
        })

    # Phase C: Persist results to disk
    with open(output_file, "w", encoding="utf-8") as output:
        json.dump(final_results, output, ensure_ascii=False, indent=2)

    # Generate summary statistics
    successful_jobs = sum(1 for result in final_results if result["status"] == "success")
    print(f"\n{'='*60}")
    print(f"Batch processing complete! {successful_jobs}/{len(final_results)} succeeded")
    print(f"Output saved to: {output_file}")
    print(f"{'='*60}\n")

    return final_results
Enter fullscreen mode Exit fullscreen mode

Full Implementation Script

#!/usr/bin/env python3
"""
Automated Video Generation Platform
Leveraging Sora 2 API for scalable content production
"""

import os
import time
import json
import requests
from typing import List, Dict, Optional

# ==================== Setup & Configuration ====================

# Secure credential loading from environment
SORA_API_KEY = os.getenv("SORA_API_KEY")
if not SORA_API_KEY:
    raise ValueError("SORA_API_KEY environment variable must be configured")

API_BASE_URL = "https://api.defapi.org"
REQUEST_HEADERS = {
    "Authorization": f"Bearer {SORA_API_KEY}",
    "Content-Type": "application/json"
}

# ==================== API Integration Layer ====================

def submit_video_generation(text_prompt: str, ref_images: Optional[List[str]] = None) -> Optional[str]:
    """Initiate video creation workflow"""
    endpoint = f"{API_BASE_URL}/api/sora2/gen"
    request_body = {"prompt": text_prompt}
    if ref_images:
        request_body["images"] = ref_images[:1]

    try:
        api_response = requests.post(endpoint, headers=REQUEST_HEADERS, json=request_body, timeout=30)
        api_response.raise_for_status()
        response_data = api_response.json()

        if response_data.get("code") == 0:
            job_id = response_data["data"]["task_id"]
            print(f"✓ Submitted: {text_prompt[:30]}... [ID: {job_id}]")
            return job_id
        else:
            print(f"✗ Error: {response_data.get('message')}")
            return None
    except requests.exceptions.RequestException as error:
        print(f"✗ Network issue: {error}")
        return None


def fetch_task_status(job_id: str) -> Dict:
    """Retrieve task progress information"""
    endpoint = f"{API_BASE_URL}/api/task/query"
    query_params = {"task_id": job_id}

    try:
        api_response = requests.get(endpoint, headers=REQUEST_HEADERS, params=query_params, timeout=30)
        api_response.raise_for_status()
        response_data = api_response.json()

        if response_data.get("code") == 0:
            return response_data["data"]
        else:
            print(f"✗ Fetch error: {response_data.get('message')}")
            return {}
    except requests.exceptions.RequestException as error:
        print(f"✗ Status query failed: {error}")
        return {}


def monitor_until_complete(job_id: str, timeout_seconds: int = 600) -> Dict:
    """Poll task status until completion"""
    start_timestamp = time.time()
    check_interval = 5

    while True:
        time_elapsed = time.time() - start_timestamp
        if time_elapsed > timeout_seconds:
            print(f"✗ Timeout [{job_id}]")
            return {"status": "timeout", "task_id": job_id}

        task_data = fetch_task_status(job_id)
        if not task_data:
            time.sleep(check_interval)
            continue

        current_status = task_data.get("status")

        if current_status == "success":
            output_url = task_data.get("result", {}).get("video")
            print(f"✓ Success [{job_id}]")
            print(f"  Link: {output_url}")
            return task_data
        elif current_status == "failed":
            error_details = task_data.get("status_reason", {}).get("message", "Unspecified")
            print(f"✗ Failed [{job_id}]: {error_details}")
            return task_data
        elif current_status in ["pending", "submitted", "in_progress"]:
            print(f"⏳ Processing [{job_id}] ({int(time_elapsed)}s)")
            time.sleep(check_interval)
        else:
            print(f"? Unknown state: {current_status}")
            time.sleep(check_interval)


def process_video_batch(prompt_list: List[str], output_file: str = "video_results.json"):
    """Execute batch video generation pipeline"""
    print(f"\n{'='*60}")
    print(f"Batch job initiated: {len(prompt_list)} videos")
    print(f"{'='*60}\n")

    # Queue all generation tasks
    job_queue = []
    for index, description in enumerate(prompt_list, 1):
        print(f"[{index}/{len(prompt_list)}] Submitting...")
        job_id = submit_video_generation(description)
        if job_id:
            job_queue.append({"task_id": job_id, "prompt": description})
        time.sleep(1)

    print(f"\n{len(job_queue)} tasks in queue\n")

    # Monitor each task to completion
    final_results = []
    for index, job_entry in enumerate(job_queue, 1):
        print(f"\n[{index}/{len(job_queue)}] Monitoring...")
        print(f"Content: {job_entry['prompt'][:50]}...")

        completion_data = monitor_until_complete(job_entry["task_id"])
        final_results.append({
            "prompt": job_entry["prompt"],
            "task_id": job_entry["task_id"],
            "status": completion_data.get("status"),
            "video_url": completion_data.get("result", {}).get("video"),
            "consumed": completion_data.get("consumed"),
            "created_at": completion_data.get("created_at")
        })

    # Write results to file
    with open(output_file, "w", encoding="utf-8") as output:
        json.dump(final_results, output, ensure_ascii=False, indent=2)

    successful_jobs = sum(1 for r in final_results if r["status"] == "success")
    print(f"\n{'='*60}")
    print(f"Job complete! {successful_jobs}/{len(final_results)} successful")
    print(f"Saved to: {output_file}")
    print(f"{'='*60}\n")

    return final_results


# ==================== Application Entry Point ====================

if __name__ == "__main__":
    # Sample creative prompts for video generation
    creative_scripts = [
        "Developer deep in concentration at a cafe workspace, urban skyline twinkling beyond glass windows, ambient lighting casting warm shadows",
        "Culinary content creator assembling a gourmet pastry, macro lens catching rich chocolate drizzle in slow motion",
        "Senior citizen performing tai chi movements at dawn in the park, morning sun creating patterns through foliage",
        "Innovation team collaborating in a modern workspace, idea-filled whiteboard dominating the frame",
        "Romantic silhouettes strolling along shoreline at golden hour, ocean waves rhythmically meeting sand"
    ]

    # Execute batch processing
    output_data = process_video_batch(creative_scripts, "my_videos.json")

    # Display successful outputs
    print("\nGenerated content URLs:")
    for index, entry in enumerate(output_data, 1):
        if entry["status"] == "success" and entry["video_url"]:
            print(f"{index}. {entry['video_url']}")
            print(f"   Description: {entry['prompt'][:40]}...")
            print(f"   Cost: {entry['consumed']}\n")
Enter fullscreen mode Exit fullscreen mode

Top comments (0)