DEV Community

Paul H
Paul H

Posted on

Building Type-Safe ETL Pipelines in TypeScript

A practical guide to implementing Extract, Transform, Load workflows with full type safety


Introduction

ETL (Extract, Transform, Load) is a fundamental pattern in data engineering. Whether you're syncing CRM data to a warehouse, migrating between databases, or integrating third-party APIs, ETL pipelines are everywhere. But building them in a type-safe, maintainable way has traditionally been challenging.

In this article, we'll explore how to implement ETL pipelines in TypeScript using OpenETL, a modern framework that brings type safety and abstraction to data integration.

What is ETL?

ETL consists of three phases:

  1. Extract: Pull data from a source (database, API, file)
  2. Transform: Clean, reshape, or enrich the data
  3. Load: Write the data to a destination
┌─────────┐    ┌───────────┐    ┌─────────┐
│ Extract │ ── │ Transform │ ── │  Load   │
└─────────┘    └───────────┘    └─────────┘
    │                │               │
    ▼                ▼               ▼
 Database         Rename           Data
   API            Filter         Warehouse
   File           Merge             API
Enter fullscreen mode Exit fullscreen mode

The Challenge with Traditional ETL

Traditional ETL implementations often suffer from:

  • Tight coupling: Source-specific code mixed with business logic
  • No type safety: Runtime errors from schema mismatches
  • Code duplication: Similar patterns repeated for each integration
  • Hard to test: Database connections embedded in business logic

The Adapter Pattern Solution

OpenETL solves these problems with the adapter pattern. Each data source implements a common interface, allowing you to swap sources without changing your pipeline logic.

// The adapter interface - all sources implement this
interface AdapterInstance {
  connect(): Promise<void>;
  disconnect(): Promise<void>;
  download(options: PageOptions): Promise<{ data: any[] }>;
  upload(data: any[]): Promise<void>;
}
Enter fullscreen mode Exit fullscreen mode

This abstraction means your pipeline code doesn't care whether data comes from PostgreSQL, MongoDB, or a REST API.

Getting Started

Installation

npm install openetl
npm install @openetl/postgresql  # or any adapter you need
Enter fullscreen mode Exit fullscreen mode

Basic Pipeline

Here's a simple pipeline that downloads data from PostgreSQL:

import { Orchestrator, Vault, Pipeline } from 'openetl';
import { postgresql } from '@openetl/postgresql';

// 1. Define credentials in a vault
const vault: Vault = {
  'my-database': {
    id: 'my-database',
    type: 'basic',
    credentials: {
      host: 'localhost',
      port: '5432',
      database: 'myapp',
      username: 'user',
      password: 'secret',
    },
  },
};

// 2. Register adapters
const adapters = { postgresql };

// 3. Define the pipeline
const pipeline: Pipeline = {
  id: 'export-users',
  source: {
    id: 'source',
    adapter_id: 'postgresql',
    endpoint_id: 'table_query',
    credential_id: 'my-database',
    fields: ['id', 'email', 'name', 'created_at'],
    config: {
      schema: 'public',
      table: 'users',
    },
  },
};

// 4. Execute
const etl = Orchestrator(vault, adapters);
const result = await etl.runPipeline(pipeline);

console.log(`Exported ${result.data.length} users`);
Enter fullscreen mode Exit fullscreen mode

Working with Multiple Data Sources

The power of abstraction becomes clear when working with multiple sources. The same pipeline structure works regardless of the source:

import { postgresql } from '@openetl/postgresql';
import { mysql } from '@openetl/mysql';
import { mongodb } from '@openetl/mongodb';
import { hubspot } from '@openetl/hubspot';

const adapters = { postgresql, mysql, mongodb, hubspot };

// PostgreSQL source
const postgresSource = {
  adapter_id: 'postgresql',
  endpoint_id: 'table_query',
  config: { schema: 'public', table: 'customers' },
};

// MySQL source - same structure, different adapter
const mysqlSource = {
  adapter_id: 'mysql',
  endpoint_id: 'table_query',
  config: { database: 'sales', table: 'customers' },
};

// MongoDB source - still the same pattern
const mongoSource = {
  adapter_id: 'mongodb',
  endpoint_id: 'collection_query',
  config: { table: 'customers' },
};

// HubSpot API - REST API, same interface
const hubspotSource = {
  adapter_id: 'hubspot',
  endpoint_id: 'contacts',
};
Enter fullscreen mode Exit fullscreen mode

Data Transformations

OpenETL includes 12 built-in transformation types. Transformations are applied in sequence during the pipeline execution:

const pipeline: Pipeline = {
  id: 'transform-contacts',
  source: {
    // ... source config
    transform: [
      // Combine first and last name
      {
        type: 'concat',
        options: {
          properties: ['first_name', 'last_name'],
          glue: ' ',
          to: 'full_name',
        },
      },
      // Normalize email to lowercase
      {
        type: 'lowercase',
        options: { field: 'email' },
      },
      // Remove whitespace
      {
        type: 'trim',
        options: { field: 'full_name' },
      },
      // Extract domain from email
      {
        type: 'extract',
        options: {
          field: 'email',
          pattern: '@(.+)$',
          to: 'email_domain',
        },
      },
    ],
  },
};
Enter fullscreen mode Exit fullscreen mode

Available Transformations

Type Description Example
concat Combine multiple fields ['first', 'last']'full_name'
renameKey Copy/rename a field 'old_field''new_field'
uppercase Convert to uppercase 'hello''HELLO'
lowercase Convert to lowercase 'HELLO''hello'
trim Remove whitespace ' text ''text'
split Split into array 'a,b,c'['a','b','c']
replace Regex replacement 'foo''bar'
addPrefix Add prefix '123''ID-123'
addSuffix Add suffix 'file''file.txt'
toNumber Parse as number '42'42
extract Extract substring 'user@example.com''example.com'
mergeObjects Combine fields into object {a, b}{merged: {a, b}}

Filtering and Sorting

Apply filters and sorting at the source level for efficient data retrieval:

const pipeline: Pipeline = {
  id: 'active-premium-users',
  source: {
    // ...
    filters: [
      { field: 'status', operator: '=', value: 'active' },
      { field: 'plan', operator: '=', value: 'premium' },
      { field: 'created_at', operator: '>=', value: '2024-01-01' },
    ],
    sort: [
      { field: 'created_at', type: 'desc' },
      { field: 'name', type: 'asc' },
    ],
    limit: 1000,
  },
};
Enter fullscreen mode Exit fullscreen mode

Filter Groups

For complex conditions, use filter groups with AND/OR logic:

filters: [
  {
    op: 'OR',
    filters: [
      { field: 'status', operator: '=', value: 'active' },
      { field: 'status', operator: '=', value: 'pending' },
    ],
  },
  { field: 'email_verified', operator: '=', value: 'true' },
]
// SQL: WHERE (status = 'active' OR status = 'pending') AND email_verified = true
Enter fullscreen mode Exit fullscreen mode

Source to Target Pipelines

Move data between systems by defining both source and target:

const pipeline: Pipeline = {
  id: 'sync-crm-to-warehouse',
  source: {
    id: 'hubspot-source',
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hubspot-api',
    fields: ['email', 'firstname', 'lastname', 'company'],
    pagination: { type: 'cursor', itemsPerPage: 100 },
  },
  target: {
    id: 'postgres-target',
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'warehouse-db',
    fields: ['email', 'first_name', 'last_name', 'company'],
    config: {
      schema: 'analytics',
      table: 'crm_contacts',
    },
  },
};

const result = await etl.runPipeline(pipeline);
console.log(`Synced ${result.data.length} contacts to warehouse`);
Enter fullscreen mode Exit fullscreen mode

Pagination Strategies

Different APIs use different pagination methods. OpenETL supports three strategies:

// Offset-based (SQL databases)
pagination: { type: 'offset', itemsPerPage: 100 }

// Cursor-based (modern APIs like HubSpot, Stripe)
pagination: { type: 'cursor', itemsPerPage: 100 }

// Page-based (traditional REST APIs)
pagination: { type: 'page', itemsPerPage: 50 }
Enter fullscreen mode Exit fullscreen mode

Error Handling

Configure retry behavior for resilient pipelines:

const pipeline: Pipeline = {
  id: 'resilient-sync',
  source: { /* ... */ },
  error_handling: {
    max_retries: 3,
    retry_interval: 1000,  // ms between retries
    fail_on_error: true,   // stop on first error
  },
};
Enter fullscreen mode Exit fullscreen mode

OpenETL uses exponential backoff with jitter to prevent thundering herd problems when retrying.

Pipeline Validation

Validate pipelines before execution to catch configuration errors early:

import { validatePipeline } from 'openetl';

const result = validatePipeline(pipeline, adapters, vault);

if (!result.valid) {
  console.error('Pipeline validation failed:');
  result.errors.forEach(err => console.error(`  - ${err}`));
} else {
  // Safe to run
  await etl.runPipeline(pipeline);
}
Enter fullscreen mode Exit fullscreen mode

Building Custom Adapters

Create adapters for any data source by implementing the adapter interface:

import { Adapter, AdapterInstance, Connector, AuthConfig } from 'openetl';

const MyCustomAdapter = {
  id: 'my-adapter',
  name: 'My Custom Adapter',
  type: 'http',
  action: ['download', 'upload'],
  credential_type: 'api_key',
  base_url: 'https://api.example.com',
  endpoints: [
    {
      id: 'users',
      path: '/users',
      method: 'GET',
      description: 'Fetch users',
      supported_actions: ['download'],
    },
  ],
};

function myAdapter(connector: Connector, auth: AuthConfig): AdapterInstance {
  return {
    async connect() {
      // Validate connection
    },

    async disconnect() {
      // Cleanup
    },

    async download({ limit, offset }) {
      const response = await fetch(
        `https://api.example.com/users?limit=${limit}&offset=${offset}`,
        { headers: { 'Authorization': `Bearer ${auth.credentials.api_key}` } }
      );
      const data = await response.json();
      return { data };
    },

    async upload(data) {
      await fetch('https://api.example.com/users', {
        method: 'POST',
        body: JSON.stringify(data),
      });
    },
  };
}

export { myAdapter, MyCustomAdapter };
Enter fullscreen mode Exit fullscreen mode

Type Safety Benefits

TypeScript provides compile-time checking for your pipelines:

// Type error: 'invalid' is not a valid pagination type
const pipeline: Pipeline = {
  source: {
    pagination: { type: 'invalid' }, // ❌ TypeScript error
  },
};

// Type error: missing required field
const auth: BasicAuth = {
  id: 'db',
  type: 'basic',
  credentials: {
    username: 'user',
    // password: missing  // ❌ TypeScript error
  },
};
Enter fullscreen mode Exit fullscreen mode

Conclusion

OpenETL brings the benefits of TypeScript to ETL development:

  • Type safety catches configuration errors at compile time
  • Adapter abstraction decouples pipelines from data sources
  • Built-in transformations handle common data manipulation
  • Flexible pagination works with any API pattern
  • Error handling with exponential backoff for resilience

By using these patterns, you can build maintainable, testable ETL pipelines that scale with your data integration needs.

Resources


OpenETL is open source under the MIT license. Contributions welcome!

Top comments (0)