I built TinyWorkflow to solve a problem I kept encountering: developers wanting to learn workflow orchestration concepts without the complexity of production-grade infrastructure. After watching colleagues struggle with setup complexity when they just wanted to experiment with workflow patterns, I decided to create something different.
TinyWorkflow is a Python-first workflow library designed specifically for learning, prototyping, and lightweight automation. It's ~2,000 lines of readable code that you can understand in an afternoon.
What is TinyWorkflow?
TinyWorkflow is a workflow orchestration library that lets you:
- Define workflows using Python decorators
- Chain activities with automatic retry logic
- Run tasks in parallel with fan-out/fan-in patterns
- Persist state to SQLite, PostgreSQL, or MySQL
- Schedule workflows with cron expressions
- Monitor execution through a built-in web UI
- Handle failures with exponential backoff
All without external services, Docker containers, or complex configurations.
A Quick Example
Here's a complete workflow in TinyWorkflow:
import asyncio
from tinyworkflow import workflow, activity, WorkflowContext, TinyWorkflowClient
@activity(name="fetch_data")
async def fetch_data(url: str):
# Your data fetching logic
return {"data": "example_data"}
@activity(name="process_data")
async def process_data(data: dict):
# Your processing logic
return {"result": "processed"}
@workflow(name="data_pipeline")
async def data_pipeline(ctx: WorkflowContext):
url = ctx.get_input("url")
# Execute activities in sequence
data = await ctx.execute_activity(fetch_data, url)
result = await ctx.execute_activity(process_data, data)
return result
# Run the workflow
async def main():
async with TinyWorkflowClient() as client:
run_id = await client.start_workflow(
"data_pipeline",
input_data={"url": "https://api.example.com"}
)
print(f"Workflow started: {run_id}")
asyncio.run(main())
Installation is straightforward:
pip install tinyworkflow
python your_workflow.py
No additional setup required.
Core Features
1. Pure Python API
TinyWorkflow uses Python decorators to define workflows and activities. No YAML, no XML, no DSL. Just Python functions:
from tinyworkflow import workflow, activity, RetryPolicy
@activity(
name="api_call",
retry_policy=RetryPolicy(max_retries=3, initial_delay=1.0),
timeout=30.0
)
async def api_call(endpoint: str):
# Your API call logic
return response
@workflow(name="my_workflow")
async def my_workflow(ctx: WorkflowContext):
result = await ctx.execute_activity(api_call, "/users")
return result
2. Automatic State Persistence
Every workflow execution is automatically persisted to a database. TinyWorkflow supports:
- SQLite (default, zero configuration)
- PostgreSQL (for team environments)
- MySQL (if that's your preference)
# SQLite (default)
async with TinyWorkflowClient() as client:
pass
# PostgreSQL
async with TinyWorkflowClient(
database_url="postgresql+asyncpg://user:pass@localhost/tinyworkflow"
) as client:
pass
State persistence includes:
- Workflow inputs and outputs
- Activity execution results
- Event logs (complete audit trail)
- Retry attempts and failure reasons
3. Retry Logic with Exponential Backoff
Activities can automatically retry on failure with configurable backoff:
@activity(
name="flaky_service",
retry_policy=RetryPolicy(
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0,
jitter=True
)
)
async def flaky_service():
# Retries: 1s → 2s → 4s → 8s → 16s → 32s
return result
The retry system includes:
- Exponential backoff
- Jitter to prevent thundering herd
- Configurable delay limits
- Per-activity timeout settings
4. Parallel Execution
Execute multiple activities concurrently using the parallel execution pattern:
@workflow(name="parallel_pipeline")
async def parallel_pipeline(ctx: WorkflowContext):
user_id = ctx.get_input("user_id")
# Run three activities in parallel
profile, orders, preferences = await ctx.execute_parallel(
(fetch_user_profile, (user_id,), {}),
(fetch_user_orders, (user_id,), {}),
(fetch_user_preferences, (user_id,), {})
)
return {
"profile": profile,
"orders": orders,
"preferences": preferences
}
This is useful when activities are independent and can run concurrently.
5. Human-in-the-Loop Workflows
Workflows can pause and wait for human approval:
@workflow(name="approval_required")
async def approval_workflow(ctx: WorkflowContext):
amount = ctx.get_input("amount")
if amount > 1000:
# Pause and wait for approval
approved = await ctx.wait_for_approval(
"manager_approval",
timeout=3600 # 1 hour timeout
)
if not approved:
return {"status": "rejected"}
# Continue processing
result = await ctx.execute_activity(process_request, amount)
return {"status": "approved", "result": result}
Approvals can be managed through the CLI or web UI.
6. Workflow Scheduling
Schedule workflows to run on a cron schedule:
async with TinyWorkflowClient() as client:
# Run daily at 9 AM
await client.schedule_workflow("daily_report", "0 9 * * *")
# Run every hour
await client.schedule_workflow("hourly_sync", "0 * * * *")
Or schedule one-time delayed execution:
async with TinyWorkflowClient() as client:
# Run after 5 minutes
await client.schedule_delayed_workflow(
"cleanup_job",
delay_seconds=300,
input_data={"resource_id": "abc123"}
)
7. Built-in Web UI
TinyWorkflow includes a web interface for monitoring and management:
tinyworkflow server --import-workflows your_module.workflows --port 8080
The web UI provides:
- Dashboard with workflow statistics
- List of all workflow executions
- Detailed event logs for each workflow
- Ability to start new workflows
- Schedule management
- Approval queue for human-in-the-loop workflows
- Registry of available workflows and activities
Built with FastAPI, the UI is fast and can be extended if needed.
8. Command-Line Interface
Manage workflows from the terminal:
# Start a workflow
tinyworkflow start my_workflow --input '{"key": "value"}'
# Check workflow status
tinyworkflow status <run_id>
# List all workflows
tinyworkflow list
# View workflow events
tinyworkflow events <run_id>
# Schedule a workflow
tinyworkflow schedule my_workflow "0 9 * * *"
# Approve a workflow
tinyworkflow approve <run_id> --approve
# Start the web server
tinyworkflow server --import-workflows examples.workflows
9. Event Sourcing
Every state change is recorded as an event, providing a complete audit trail:
events = await client.get_workflow_events(run_id)
for event in events:
print(f"{event.timestamp}: {event.event_type}")
This is useful for:
- Debugging workflow failures
- Understanding execution history
- Compliance and audit requirements
- Analyzing workflow performance
Practical Use Cases
AI/ML Workflows
TinyWorkflow is well-suited for multi-step AI pipelines:
@workflow(name="ai_content_pipeline")
async def ai_content_pipeline(ctx: WorkflowContext):
prompt = ctx.get_input("prompt")
# Generate content with retry on API failures
content = await ctx.execute_activity(generate_ai_content, prompt)
# Analyze in parallel
sentiment, moderation = await ctx.execute_parallel(
(analyze_sentiment, (content,), {}),
(moderate_content, (content,), {})
)
# Conditional logic
if moderation["flagged"]:
return {"status": "rejected"}
return {"status": "approved", "content": content}
This pattern is useful for:
- Content generation and moderation
- Document processing and analysis
- Multi-step model inference
- Prompt experimentation and testing
Data Processing Pipelines
Build ETL workflows with automatic retries:
@workflow(name="etl_pipeline")
async def etl_pipeline(ctx: WorkflowContext):
# Extract
data = await ctx.execute_activity(extract_from_source)
# Transform
transformed = await ctx.execute_activity(transform_data, data)
# Load
await ctx.execute_activity(load_to_destination, transformed)
return {"records_processed": len(transformed)}
Scheduled Automation
Run periodic tasks with cron scheduling:
@workflow(name="nightly_backup")
async def nightly_backup(ctx: WorkflowContext):
databases = ctx.get_input("databases")
# Backup each database
results = await ctx.execute_parallel(
*[(backup_database, (db,), {}) for db in databases]
)
return {"backups_completed": len(results)}
# Schedule it
async with TinyWorkflowClient() as client:
await client.schedule_workflow("nightly_backup", "0 2 * * *")
Approval Workflows
Implement business processes requiring human intervention:
@workflow(name="purchase_approval")
async def purchase_approval(ctx: WorkflowContext):
order = ctx.get_input("order")
# Create order
order_id = await ctx.execute_activity(create_order, order)
# Large orders need approval
if order["total"] > 5000:
approved = await ctx.wait_for_approval("finance_approval")
if not approved:
await ctx.execute_activity(cancel_order, order_id)
return {"status": "rejected"}
# Process approved order
await ctx.execute_activity(process_order, order_id)
return {"status": "completed", "order_id": order_id}
Design Philosophy
TinyWorkflow is built with specific goals:
1. Learning-First Design
The codebase is intentionally small (~2,000 lines) and readable. You can:
- Read the entire source code to understand implementation
- Learn workflow patterns through clear examples
- Experiment without infrastructure overhead
- Understand trade-offs in workflow design
2. Minimal Dependencies
TinyWorkflow requires only:
- Python 3.9+
- SQLAlchemy (database abstraction)
- APScheduler (cron scheduling)
- FastAPI (web UI)
- Standard async/await libraries
No external services, containers, or orchestrators needed.
3. Database-Centric State Management
State is persisted to a database (SQLite by default). This means:
- No external state management service
- Simple backup and recovery (database dumps)
- Easy migration between databases
- Familiar tools for debugging (SQL queries)
4. Progressive Disclosure
Start simple, add complexity as needed:
- Basic workflow: Just activities in sequence
- Add retries: Configure RetryPolicy
- Add parallelism: Use execute_parallel
- Add approvals: Use wait_for_approval
- Add scheduling: Use cron expressions
Understanding the Limitations
TinyWorkflow makes specific trade-offs for simplicity. Here's what it doesn't provide:
No Workflow Replay
If a workflow fails, it restarts from the beginning, not from the failure point. This is acceptable for:
- Short workflows (under 30 minutes)
- Workflows where activities are idempotent
- Learning and experimentation scenarios
This limitation exists because implementing replay would require:
- Deterministic execution constraints
- Complex state management
- Significantly increased codebase complexity
No Deterministic Execution
You can use datetime.now(), random(), and uuid.uuid4() freely in workflows. This provides flexibility but means workflows may produce different results on retry.
No Durable Timers
Using asyncio.sleep() in workflows means timer state is lost if the process crashes. For production systems requiring durable timers, use external schedulers or production-grade orchestrators.
No Signal System
Running workflows cannot receive external events or signals. Communication with running workflows is limited to the approval mechanism.
No Automatic Compensation
There are no saga patterns or automatic rollback. If you need compensation logic, implement it explicitly in your activities.
Performance Boundaries
TinyWorkflow is tested and works well with:
- Up to 100 concurrent workflows
- Workflows under 1 hour duration
- Moderate throughput (not high-frequency trading)
For higher scales, production-grade orchestrators are more appropriate.
When to Use TinyWorkflow
Ideal Scenarios:
Learning and Education
- Understanding workflow orchestration concepts
- Teaching in courses or workshops
- Experimenting with workflow patterns
- Reading source code to learn implementation
Rapid Prototyping
- Testing workflow ideas quickly
- Building MVPs without infrastructure
- Weekend projects and hackathons
- Proof-of-concept implementations
AI/ML Experimentation
- Multi-step LLM pipelines
- Testing different prompts or models
- Document processing workflows
- Content generation and moderation
Lightweight Automation
- Scheduled data pipelines (< 1 hour)
- Internal tools and scripts
- Personal automation projects
- Small-scale document processing
Not Recommended For:
Production-Critical Systems
- Systems where downtime is costly
- Workflows requiring guaranteed execution
- Long-running processes (multiple hours/days)
- High-scale data processing (TB+)
Complex Enterprise Requirements
- Advanced workflow patterns (sagas, compensation)
- Strict compliance and audit requirements
- High-availability guarantees
- Enterprise support contracts
Getting Started
Installation
pip install tinyworkflow
Try the Examples
The repository includes 20 example workflows:
git clone https://github.com/scionoftech/tinyworkflow
cd tinyworkflow
pip install -e .
# Try examples
python examples/simple_workflow.py
python examples/parallel_workflow.py
python examples/approval_workflow.py
python examples/ai_content_pipeline.py
Start the Web UI
tinyworkflow server --import-workflows examples.workflows --port 8080
Open http://localhost:8080 to explore the interface.
Build Your First Workflow
Create a file my_workflow.py:
import asyncio
from tinyworkflow import workflow, activity, WorkflowContext, TinyWorkflowClient
@activity(name="hello")
async def hello(name: str):
return f"Hello, {name}!"
@workflow(name="greeting")
async def greeting_workflow(ctx: WorkflowContext):
name = ctx.get_input("name")
message = await ctx.execute_activity(hello, name)
return {"message": message}
async def main():
async with TinyWorkflowClient() as client:
run_id = await client.start_workflow(
"greeting",
input_data={"name": "World"}
)
# Wait a moment for completion
await asyncio.sleep(1)
# Check status
workflow = await client.get_workflow_status(run_id)
print(f"Status: {workflow.status}")
print(f"Result: {workflow.result}")
asyncio.run(main())
Run it:
python my_workflow.py
Documentation and Resources
GitHub Repository: github.com/scionoftech/tinyworkflow
The repository includes:
- Complete API documentation
- 20 example workflows
- Quick start guide
- Workflow registration guide
- Limitations documentation
Getting Help:
- GitHub Issues for bug reports
- GitHub Discussions for questions
- Example code in the repository
The Development Approach
I built TinyWorkflow with a few principles:
Readable Code
The entire codebase is ~2,000 lines. This isn't a limitation—it's intentional. Every decision favors clarity over cleverness.
Documented Trade-offs
The LIMITATIONS.md file explicitly documents what TinyWorkflow doesn't do and why. This helps users make informed decisions.
Real Examples
All 20 example workflows are complete, working code that demonstrates actual use cases, not toy problems.
Progressive Learning
Examples start simple and gradually introduce more complex patterns:
- Simple sequential workflow
- Workflow with retries
- Parallel execution
- Human-in-the-loop
- Scheduled workflows
- AI pipelines
Contributing
TinyWorkflow is open source (MIT License) and welcomes contributions:
Ways to Contribute:
- Report bugs or suggest features (GitHub Issues)
- Submit pull requests with improvements
- Share your use cases and examples
- Improve documentation
- Write tutorials or blog posts
Development:
git clone https://github.com/scionoftech/tinyworkflow
cd tinyworkflow
pip install -e ".[dev]"
pytest tests/
What's Next
Future development focuses on:
- More example workflows from real use cases
- Enhanced web UI features
- Better error messages and debugging tools
- Integration guides for popular libraries
- Performance optimizations
TinyWorkflow will remain focused on its core mission: making workflow orchestration concepts accessible through hands-on experimentation.
Conclusion
TinyWorkflow is a learning-focused workflow library that prioritizes simplicity and readability. It provides the core workflow patterns—activities, retries, parallel execution, state persistence—without the complexity of production-grade orchestrators.
If you're learning workflow orchestration, prototyping AI pipelines, or building lightweight automation, TinyWorkflow offers a straightforward path to getting started.
Install and try it:
pip install tinyworkflow
Explore the code:
github.com/scionoftech/tinyworkflow
Share your experience:
If you build something with TinyWorkflow, I'd love to hear about it. Open an issue or start a discussion on GitHub.
License: MIT
Python Version: 3.9+
Status: Active development, open to contributions
Top comments (0)