Transform your content ideas into professional videos automatically with Python and the Sora 2 API. This comprehensive guide walks you through building an automated video generation pipeline that turns text descriptions into high-quality visual content without manual intervention.
Why Automate Video Production?
Creating video content for platforms like TikTok, YouTube, Instagram, and Facebook can drain both time and resources. Traditional video production requires extensive effort, from scripting to filming to editing. The Sora 2 API changes this paradigm by enabling:
- Scalable Content Creation: Process dozens of video concepts simultaneously without quality compromise
- Real-time Progress Monitoring: Track each video's generation status from submission to completion
- Streamlined Workflow: Automatically store all generated content with organized metadata for easy access
- Watermark-Free Output: All generated videos come without watermarks, ready for direct publishing
What you'll build: A Python automation tool that accepts creative text prompts, manages API interactions, monitors generation progress, and organizes output files—all without requiring human supervision during execution.
Getting Started
System Requirements
- Python version 3.7 or newer installed on your system
- Defapi API key
API Credentials Setup
To access the Sora 2 video generation service:
- Navigate to DefAPI Platform and complete registration
- Generate your personal API credentials through the dashboard
- Security Note: Never embed API keys directly in source code or commit them to version control systems
Best practice for credential management involves environment variables:
# For Unix-based systems (Linux/macOS)
export SORA_API_KEY="your-api-key-here"
# For Windows Command Prompt
set SORA_API_KEY=your-api-key-here
# For Windows PowerShell users
$env:SORA_API_KEY="your-api-key-here"
Required Python Packages
The implementation needs only one external library for HTTP communication:
pip install requests
Understanding the API Architecture
Video Creation Endpoint
API Path: POST https://api.defapi.org/api/sora2/gen
Authorization Method: Include Authorization: Bearer YOUR_API_KEY
in request headers
Request Body Structure:
{
"prompt": "Descriptive text for video generation",
"images": ["https://example.com/reference.jpg"], // Optional reference image (limit: 1)
"callback_url": "https://example.com/callback" // Optional webhook for completion notification
}
API Response Format:
{
"code": 0,
"message": "ok",
"data": {
"task_id": "ta12345678-1234-1234-1234-123456789abc"
}
}
Status Polling Endpoint
API Path: GET https://api.defapi.org/api/task/query?task_id=xxx
Status Values Explained:
-
pending
: Task queued, awaiting processing -
submitted
: Request acknowledged by system -
in_progress
: Video generation underway -
success
: Video successfully created -
failed
: Generation encountered an error
Successful Completion Response:
{
"code": 0,
"message": "ok",
"data": {
"task_id": "ta823dfb-eaac-44fd-aec2-3e2c7ba8e071",
"status": "success",
"result": {
"video": "https://example.com/generated-video.mp4"
},
"consumed": "0.00100000",
"created_at": "2025-08-03T10:22:20.010Z"
}
}
Building the Automation System
Phase 1: Module Imports
import os
import time
import json
import requests
from typing import List, Dict, Optional
Phase 2: Authentication Configuration
# Retrieve credentials from environment (secure method - never hardcode secrets)
SORA_API_KEY = os.getenv("SORA_API_KEY")
if not SORA_API_KEY:
raise ValueError("SORA_API_KEY environment variable must be configured")
# Define API connection settings
API_BASE_URL = "https://api.defapi.org"
REQUEST_HEADERS = {
"Authorization": f"Bearer {SORA_API_KEY}",
"Content-Type": "application/json"
}
Phase 3: Building Core Functions
def submit_video_generation(text_prompt: str, ref_images: Optional[List[str]] = None) -> Optional[str]:
"""
Initiates video creation request
Parameters:
text_prompt: Textual description for the video
ref_images: Optional list of reference image URLs (maximum 1)
Returns:
Unique task identifier for tracking
"""
endpoint = f"{API_BASE_URL}/api/sora2/gen"
request_body = {"prompt": text_prompt}
if ref_images:
request_body["images"] = ref_images[:1] # Limit to single image
try:
api_response = requests.post(endpoint, headers=REQUEST_HEADERS, json=request_body, timeout=30)
api_response.raise_for_status()
response_data = api_response.json()
if response_data.get("code") == 0:
job_id = response_data["data"]["task_id"]
print(f"✓ Submitted successfully: {text_prompt[:30]}... [ID: {job_id}]")
return job_id
else:
print(f"✗ Submission error: {response_data.get('message')}")
return None
except requests.exceptions.RequestException as error:
print(f"✗ Network error: {error}")
return None
def fetch_task_status(job_id: str) -> Dict:
"""
Retrieves current task information
Parameters:
job_id: Unique task identifier
Returns:
Dictionary containing task details
"""
endpoint = f"{API_BASE_URL}/api/task/query"
query_params = {"task_id": job_id}
try:
api_response = requests.get(endpoint, headers=REQUEST_HEADERS, params=query_params, timeout=30)
api_response.raise_for_status()
response_data = api_response.json()
if response_data.get("code") == 0:
return response_data["data"]
else:
print(f"✗ Status check failed: {response_data.get('message')}")
return {}
except requests.exceptions.RequestException as error:
print(f"✗ Query error: {error}")
return {}
def monitor_until_complete(job_id: str, timeout_seconds: int = 600) -> Dict:
"""
Continuously polls task until completion or timeout
Parameters:
job_id: Task identifier to monitor
timeout_seconds: Maximum waiting duration (default: 10 minutes)
Returns:
Final task status and results
"""
start_timestamp = time.time()
check_interval = 5 # Poll every 5 seconds
while True:
time_elapsed = time.time() - start_timestamp
if time_elapsed > timeout_seconds:
print(f"✗ Timeout reached [{job_id}]")
return {"status": "timeout", "task_id": job_id}
task_data = fetch_task_status(job_id)
if not task_data:
time.sleep(check_interval)
continue
current_status = task_data.get("status")
if current_status == "success":
output_url = task_data.get("result", {}).get("video")
print(f"✓ Completed successfully [{job_id}]")
print(f" Download link: {output_url}")
return task_data
elif current_status == "failed":
error_details = task_data.get("status_reason", {}).get("message", "Unspecified error")
print(f"✗ Generation failed [{job_id}]: {error_details}")
return task_data
elif current_status in ["pending", "submitted", "in_progress"]:
print(f"⏳ Processing... [{job_id}] ({int(time_elapsed)}s elapsed)")
time.sleep(check_interval)
else:
print(f"? Unexpected status: {current_status}")
time.sleep(check_interval)
Phase 4: Orchestrating Batch Operations
def process_video_batch(prompt_list: List[str], output_file: str = "video_results.json"):
"""
Executes bulk video generation workflow
Parameters:
prompt_list: Collection of video descriptions
output_file: Destination file for results
"""
print(f"\n{'='*60}")
print(f"Initiating batch job: {len(prompt_list)} videos queued")
print(f"{'='*60}\n")
# Phase A: Submit all generation requests
job_queue = []
for index, description in enumerate(prompt_list, 1):
print(f"[{index}/{len(prompt_list)}] Queuing request...")
job_id = submit_video_generation(description)
if job_id:
job_queue.append({"task_id": job_id, "prompt": description})
time.sleep(1) # Rate limiting protection
print(f"\n{len(job_queue)} tasks successfully queued\n")
# Phase B: Monitor all jobs to completion
final_results = []
for index, job_entry in enumerate(job_queue, 1):
print(f"\n[{index}/{len(job_queue)}] Monitoring task progress...")
print(f"Description: {job_entry['prompt'][:50]}...")
completion_data = monitor_until_complete(job_entry["task_id"])
final_results.append({
"prompt": job_entry["prompt"],
"task_id": job_entry["task_id"],
"status": completion_data.get("status"),
"video_url": completion_data.get("result", {}).get("video"),
"consumed": completion_data.get("consumed"),
"created_at": completion_data.get("created_at")
})
# Phase C: Persist results to disk
with open(output_file, "w", encoding="utf-8") as output:
json.dump(final_results, output, ensure_ascii=False, indent=2)
# Generate summary statistics
successful_jobs = sum(1 for result in final_results if result["status"] == "success")
print(f"\n{'='*60}")
print(f"Batch processing complete! {successful_jobs}/{len(final_results)} succeeded")
print(f"Output saved to: {output_file}")
print(f"{'='*60}\n")
return final_results
Full Implementation Script
#!/usr/bin/env python3
"""
Automated Video Generation Platform
Leveraging Sora 2 API for scalable content production
"""
import os
import time
import json
import requests
from typing import List, Dict, Optional
# ==================== Setup & Configuration ====================
# Secure credential loading from environment
SORA_API_KEY = os.getenv("SORA_API_KEY")
if not SORA_API_KEY:
raise ValueError("SORA_API_KEY environment variable must be configured")
API_BASE_URL = "https://api.defapi.org"
REQUEST_HEADERS = {
"Authorization": f"Bearer {SORA_API_KEY}",
"Content-Type": "application/json"
}
# ==================== API Integration Layer ====================
def submit_video_generation(text_prompt: str, ref_images: Optional[List[str]] = None) -> Optional[str]:
"""Initiate video creation workflow"""
endpoint = f"{API_BASE_URL}/api/sora2/gen"
request_body = {"prompt": text_prompt}
if ref_images:
request_body["images"] = ref_images[:1]
try:
api_response = requests.post(endpoint, headers=REQUEST_HEADERS, json=request_body, timeout=30)
api_response.raise_for_status()
response_data = api_response.json()
if response_data.get("code") == 0:
job_id = response_data["data"]["task_id"]
print(f"✓ Submitted: {text_prompt[:30]}... [ID: {job_id}]")
return job_id
else:
print(f"✗ Error: {response_data.get('message')}")
return None
except requests.exceptions.RequestException as error:
print(f"✗ Network issue: {error}")
return None
def fetch_task_status(job_id: str) -> Dict:
"""Retrieve task progress information"""
endpoint = f"{API_BASE_URL}/api/task/query"
query_params = {"task_id": job_id}
try:
api_response = requests.get(endpoint, headers=REQUEST_HEADERS, params=query_params, timeout=30)
api_response.raise_for_status()
response_data = api_response.json()
if response_data.get("code") == 0:
return response_data["data"]
else:
print(f"✗ Fetch error: {response_data.get('message')}")
return {}
except requests.exceptions.RequestException as error:
print(f"✗ Status query failed: {error}")
return {}
def monitor_until_complete(job_id: str, timeout_seconds: int = 600) -> Dict:
"""Poll task status until completion"""
start_timestamp = time.time()
check_interval = 5
while True:
time_elapsed = time.time() - start_timestamp
if time_elapsed > timeout_seconds:
print(f"✗ Timeout [{job_id}]")
return {"status": "timeout", "task_id": job_id}
task_data = fetch_task_status(job_id)
if not task_data:
time.sleep(check_interval)
continue
current_status = task_data.get("status")
if current_status == "success":
output_url = task_data.get("result", {}).get("video")
print(f"✓ Success [{job_id}]")
print(f" Link: {output_url}")
return task_data
elif current_status == "failed":
error_details = task_data.get("status_reason", {}).get("message", "Unspecified")
print(f"✗ Failed [{job_id}]: {error_details}")
return task_data
elif current_status in ["pending", "submitted", "in_progress"]:
print(f"⏳ Processing [{job_id}] ({int(time_elapsed)}s)")
time.sleep(check_interval)
else:
print(f"? Unknown state: {current_status}")
time.sleep(check_interval)
def process_video_batch(prompt_list: List[str], output_file: str = "video_results.json"):
"""Execute batch video generation pipeline"""
print(f"\n{'='*60}")
print(f"Batch job initiated: {len(prompt_list)} videos")
print(f"{'='*60}\n")
# Queue all generation tasks
job_queue = []
for index, description in enumerate(prompt_list, 1):
print(f"[{index}/{len(prompt_list)}] Submitting...")
job_id = submit_video_generation(description)
if job_id:
job_queue.append({"task_id": job_id, "prompt": description})
time.sleep(1)
print(f"\n{len(job_queue)} tasks in queue\n")
# Monitor each task to completion
final_results = []
for index, job_entry in enumerate(job_queue, 1):
print(f"\n[{index}/{len(job_queue)}] Monitoring...")
print(f"Content: {job_entry['prompt'][:50]}...")
completion_data = monitor_until_complete(job_entry["task_id"])
final_results.append({
"prompt": job_entry["prompt"],
"task_id": job_entry["task_id"],
"status": completion_data.get("status"),
"video_url": completion_data.get("result", {}).get("video"),
"consumed": completion_data.get("consumed"),
"created_at": completion_data.get("created_at")
})
# Write results to file
with open(output_file, "w", encoding="utf-8") as output:
json.dump(final_results, output, ensure_ascii=False, indent=2)
successful_jobs = sum(1 for r in final_results if r["status"] == "success")
print(f"\n{'='*60}")
print(f"Job complete! {successful_jobs}/{len(final_results)} successful")
print(f"Saved to: {output_file}")
print(f"{'='*60}\n")
return final_results
# ==================== Application Entry Point ====================
if __name__ == "__main__":
# Sample creative prompts for video generation
creative_scripts = [
"Developer deep in concentration at a cafe workspace, urban skyline twinkling beyond glass windows, ambient lighting casting warm shadows",
"Culinary content creator assembling a gourmet pastry, macro lens catching rich chocolate drizzle in slow motion",
"Senior citizen performing tai chi movements at dawn in the park, morning sun creating patterns through foliage",
"Innovation team collaborating in a modern workspace, idea-filled whiteboard dominating the frame",
"Romantic silhouettes strolling along shoreline at golden hour, ocean waves rhythmically meeting sand"
]
# Execute batch processing
output_data = process_video_batch(creative_scripts, "my_videos.json")
# Display successful outputs
print("\nGenerated content URLs:")
for index, entry in enumerate(output_data, 1):
if entry["status"] == "success" and entry["video_url"]:
print(f"{index}. {entry['video_url']}")
print(f" Description: {entry['prompt'][:40]}...")
print(f" Cost: {entry['consumed']}\n")
Top comments (0)