A practical guide to implementing Extract, Transform, Load workflows with full type safety
Introduction
ETL (Extract, Transform, Load) is a fundamental pattern in data engineering. Whether you're syncing CRM data to a warehouse, migrating between databases, or integrating third-party APIs, ETL pipelines are everywhere. But building them in a type-safe, maintainable way has traditionally been challenging.
In this article, we'll explore how to implement ETL pipelines in TypeScript using OpenETL, a modern framework that brings type safety and abstraction to data integration.
What is ETL?
ETL consists of three phases:
- Extract: Pull data from a source (database, API, file)
- Transform: Clean, reshape, or enrich the data
- Load: Write the data to a destination
┌─────────┐ ┌───────────┐ ┌─────────┐
│ Extract │ ── │ Transform │ ── │ Load │
└─────────┘ └───────────┘ └─────────┘
│ │ │
▼ ▼ ▼
Database Rename Data
API Filter Warehouse
File Merge API
The Challenge with Traditional ETL
Traditional ETL implementations often suffer from:
- Tight coupling: Source-specific code mixed with business logic
- No type safety: Runtime errors from schema mismatches
- Code duplication: Similar patterns repeated for each integration
- Hard to test: Database connections embedded in business logic
The Adapter Pattern Solution
OpenETL solves these problems with the adapter pattern. Each data source implements a common interface, allowing you to swap sources without changing your pipeline logic.
// The adapter interface - all sources implement this
interface AdapterInstance {
connect(): Promise<void>;
disconnect(): Promise<void>;
download(options: PageOptions): Promise<{ data: any[] }>;
upload(data: any[]): Promise<void>;
}
This abstraction means your pipeline code doesn't care whether data comes from PostgreSQL, MongoDB, or a REST API.
Getting Started
Installation
npm install openetl
npm install @openetl/postgresql # or any adapter you need
Basic Pipeline
Here's a simple pipeline that downloads data from PostgreSQL:
import { Orchestrator, Vault, Pipeline } from 'openetl';
import { postgresql } from '@openetl/postgresql';
// 1. Define credentials in a vault
const vault: Vault = {
'my-database': {
id: 'my-database',
type: 'basic',
credentials: {
host: 'localhost',
port: '5432',
database: 'myapp',
username: 'user',
password: 'secret',
},
},
};
// 2. Register adapters
const adapters = { postgresql };
// 3. Define the pipeline
const pipeline: Pipeline = {
id: 'export-users',
source: {
id: 'source',
adapter_id: 'postgresql',
endpoint_id: 'table_query',
credential_id: 'my-database',
fields: ['id', 'email', 'name', 'created_at'],
config: {
schema: 'public',
table: 'users',
},
},
};
// 4. Execute
const etl = Orchestrator(vault, adapters);
const result = await etl.runPipeline(pipeline);
console.log(`Exported ${result.data.length} users`);
Working with Multiple Data Sources
The power of abstraction becomes clear when working with multiple sources. The same pipeline structure works regardless of the source:
import { postgresql } from '@openetl/postgresql';
import { mysql } from '@openetl/mysql';
import { mongodb } from '@openetl/mongodb';
import { hubspot } from '@openetl/hubspot';
const adapters = { postgresql, mysql, mongodb, hubspot };
// PostgreSQL source
const postgresSource = {
adapter_id: 'postgresql',
endpoint_id: 'table_query',
config: { schema: 'public', table: 'customers' },
};
// MySQL source - same structure, different adapter
const mysqlSource = {
adapter_id: 'mysql',
endpoint_id: 'table_query',
config: { database: 'sales', table: 'customers' },
};
// MongoDB source - still the same pattern
const mongoSource = {
adapter_id: 'mongodb',
endpoint_id: 'collection_query',
config: { table: 'customers' },
};
// HubSpot API - REST API, same interface
const hubspotSource = {
adapter_id: 'hubspot',
endpoint_id: 'contacts',
};
Data Transformations
OpenETL includes 12 built-in transformation types. Transformations are applied in sequence during the pipeline execution:
const pipeline: Pipeline = {
id: 'transform-contacts',
source: {
// ... source config
transform: [
// Combine first and last name
{
type: 'concat',
options: {
properties: ['first_name', 'last_name'],
glue: ' ',
to: 'full_name',
},
},
// Normalize email to lowercase
{
type: 'lowercase',
options: { field: 'email' },
},
// Remove whitespace
{
type: 'trim',
options: { field: 'full_name' },
},
// Extract domain from email
{
type: 'extract',
options: {
field: 'email',
pattern: '@(.+)$',
to: 'email_domain',
},
},
],
},
};
Available Transformations
| Type | Description | Example |
|---|---|---|
concat |
Combine multiple fields |
['first', 'last'] → 'full_name'
|
renameKey |
Copy/rename a field |
'old_field' → 'new_field'
|
uppercase |
Convert to uppercase |
'hello' → 'HELLO'
|
lowercase |
Convert to lowercase |
'HELLO' → 'hello'
|
trim |
Remove whitespace |
' text ' → 'text'
|
split |
Split into array |
'a,b,c' → ['a','b','c']
|
replace |
Regex replacement |
'foo' → 'bar'
|
addPrefix |
Add prefix |
'123' → 'ID-123'
|
addSuffix |
Add suffix |
'file' → 'file.txt'
|
toNumber |
Parse as number |
'42' → 42
|
extract |
Extract substring |
'user@example.com' → 'example.com'
|
mergeObjects |
Combine fields into object |
{a, b} → {merged: {a, b}}
|
Filtering and Sorting
Apply filters and sorting at the source level for efficient data retrieval:
const pipeline: Pipeline = {
id: 'active-premium-users',
source: {
// ...
filters: [
{ field: 'status', operator: '=', value: 'active' },
{ field: 'plan', operator: '=', value: 'premium' },
{ field: 'created_at', operator: '>=', value: '2024-01-01' },
],
sort: [
{ field: 'created_at', type: 'desc' },
{ field: 'name', type: 'asc' },
],
limit: 1000,
},
};
Filter Groups
For complex conditions, use filter groups with AND/OR logic:
filters: [
{
op: 'OR',
filters: [
{ field: 'status', operator: '=', value: 'active' },
{ field: 'status', operator: '=', value: 'pending' },
],
},
{ field: 'email_verified', operator: '=', value: 'true' },
]
// SQL: WHERE (status = 'active' OR status = 'pending') AND email_verified = true
Source to Target Pipelines
Move data between systems by defining both source and target:
const pipeline: Pipeline = {
id: 'sync-crm-to-warehouse',
source: {
id: 'hubspot-source',
adapter_id: 'hubspot',
endpoint_id: 'contacts',
credential_id: 'hubspot-api',
fields: ['email', 'firstname', 'lastname', 'company'],
pagination: { type: 'cursor', itemsPerPage: 100 },
},
target: {
id: 'postgres-target',
adapter_id: 'postgresql',
endpoint_id: 'table_insert',
credential_id: 'warehouse-db',
fields: ['email', 'first_name', 'last_name', 'company'],
config: {
schema: 'analytics',
table: 'crm_contacts',
},
},
};
const result = await etl.runPipeline(pipeline);
console.log(`Synced ${result.data.length} contacts to warehouse`);
Pagination Strategies
Different APIs use different pagination methods. OpenETL supports three strategies:
// Offset-based (SQL databases)
pagination: { type: 'offset', itemsPerPage: 100 }
// Cursor-based (modern APIs like HubSpot, Stripe)
pagination: { type: 'cursor', itemsPerPage: 100 }
// Page-based (traditional REST APIs)
pagination: { type: 'page', itemsPerPage: 50 }
Error Handling
Configure retry behavior for resilient pipelines:
const pipeline: Pipeline = {
id: 'resilient-sync',
source: { /* ... */ },
error_handling: {
max_retries: 3,
retry_interval: 1000, // ms between retries
fail_on_error: true, // stop on first error
},
};
OpenETL uses exponential backoff with jitter to prevent thundering herd problems when retrying.
Pipeline Validation
Validate pipelines before execution to catch configuration errors early:
import { validatePipeline } from 'openetl';
const result = validatePipeline(pipeline, adapters, vault);
if (!result.valid) {
console.error('Pipeline validation failed:');
result.errors.forEach(err => console.error(` - ${err}`));
} else {
// Safe to run
await etl.runPipeline(pipeline);
}
Building Custom Adapters
Create adapters for any data source by implementing the adapter interface:
import { Adapter, AdapterInstance, Connector, AuthConfig } from 'openetl';
const MyCustomAdapter = {
id: 'my-adapter',
name: 'My Custom Adapter',
type: 'http',
action: ['download', 'upload'],
credential_type: 'api_key',
base_url: 'https://api.example.com',
endpoints: [
{
id: 'users',
path: '/users',
method: 'GET',
description: 'Fetch users',
supported_actions: ['download'],
},
],
};
function myAdapter(connector: Connector, auth: AuthConfig): AdapterInstance {
return {
async connect() {
// Validate connection
},
async disconnect() {
// Cleanup
},
async download({ limit, offset }) {
const response = await fetch(
`https://api.example.com/users?limit=${limit}&offset=${offset}`,
{ headers: { 'Authorization': `Bearer ${auth.credentials.api_key}` } }
);
const data = await response.json();
return { data };
},
async upload(data) {
await fetch('https://api.example.com/users', {
method: 'POST',
body: JSON.stringify(data),
});
},
};
}
export { myAdapter, MyCustomAdapter };
Type Safety Benefits
TypeScript provides compile-time checking for your pipelines:
// Type error: 'invalid' is not a valid pagination type
const pipeline: Pipeline = {
source: {
pagination: { type: 'invalid' }, // ❌ TypeScript error
},
};
// Type error: missing required field
const auth: BasicAuth = {
id: 'db',
type: 'basic',
credentials: {
username: 'user',
// password: missing // ❌ TypeScript error
},
};
Conclusion
OpenETL brings the benefits of TypeScript to ETL development:
- Type safety catches configuration errors at compile time
- Adapter abstraction decouples pipelines from data sources
- Built-in transformations handle common data manipulation
- Flexible pagination works with any API pattern
- Error handling with exponential backoff for resilience
By using these patterns, you can build maintainable, testable ETL pipelines that scale with your data integration needs.
Resources
- OpenETL GitHub Repository
- API Documentation
- Available Adapters
- JavaScript Spreadsheet
- JavaScript Calendar
OpenETL is open source under the MIT license. Contributions welcome!
Top comments (0)