DEV Community

Cover image for Building Production ETL Pipelines in Node.js with HazelJS Data
Muhammad Arslan
Muhammad Arslan

Posted on

Building Production ETL Pipelines in Node.js with HazelJS Data

A comprehensive guide to the HazelJS Data Starter—decorator-based ETL, Schema validation, and data quality in TypeScript


Introduction

Data pipelines are the backbone of modern applications. Whether you're ingesting e-commerce orders, processing user profiles, or streaming events to analytics, you need reliable ETL (Extract, Transform, Load) with validation, quality checks, and a clean API.

HazelJS is a decorator-first Node.js framework that provides @hazeljs/data—a module for pipeline orchestration, Schema validation, batch and stream processing, and data quality. In this post, we'll walk through the HazelJS Data Starter—a real-world example with order processing, user ingestion, and quality checks.


Why ETL in Node.js?

Node.js excels at I/O-bound workloads. ETL pipelines often involve:

  • REST APIs receiving and responding to data
  • Database reads/writes for persistence
  • Stream processing for real-time events
  • Validation before data enters your system

Keeping ETL in the same runtime as your API simplifies deployment, reduces latency, and lets you reuse TypeScript types and validation logic. @hazeljs/data provides decorators (@Pipeline, @Transform, @Validate, @Stream) and services (ETLService, StreamService, QualityService) for exactly this pattern.


What is @hazeljs/data?

@hazeljs/data extends HazelJS with:

Component Purpose
@Pipeline Mark classes as ETL pipelines with sequential step execution
@Transform Data transformation steps with ordering
@validate Schema validation at specific steps
@Stream Streaming pipelines for Kafka/Flink-style processing
Schema Fluent validation API (string, number, object, array, email, oneOf)
ETLService Executes pipeline steps sequentially
StreamService Batch and stream processing
QualityService Data quality checks (completeness, notNull, custom)
PipelineBuilder Programmatic pipeline composition
FlinkService Apache Flink job deployment (optional)

You can explore the Data package source and the HazelJS homepage for the full API.


Architecture of the HazelJS Data Starter

The HazelJS Data Starter implements two production-ready pipelines plus supporting infrastructure:

Component Responsibility
OrderProcessingPipeline E-commerce order ETL: normalize → validate → enrich (subtotal, tax, total)
UserIngestionPipeline User profile ETL: normalize → validate → sanitize
DataModule Registers ETLService, StreamService, QualityService, and more
DataController REST endpoints for pipeline execution and quality checks
DataBootstrap Registers quality checks on startup
SchemaValidator Validates data against Schema definitions

The starter repository includes sample data and a CLI script for programmatic execution.


Getting Started

Prerequisites

Installation

# Clone or navigate to the starter
cd hazeljs-data-starter

# Install dependencies
npm install

# Build
npm run build

# Start the server
npm start
Enter fullscreen mode Exit fullscreen mode

The API is available at http://localhost:3001. For development with hot reload, use npm run dev. Full setup is in the starter README.


The @Pipeline, @Transform, and @validate Decorators

@hazeljs/data uses three core decorators to define ETL pipelines:

@Pipeline

Marks a class as an ETL pipeline. Optionally provide a name for logging and introspection:

@Pipeline('order-processing')
@Injectable()
export class OrderProcessingPipeline extends PipelineBase {
  // ...
}
Enter fullscreen mode Exit fullscreen mode

The Pipeline decorator source shows the full options.

@Transform

Marks a method as a transformation step. Steps run in order; output of step N feeds into step N+1:

@Transform({ step: 1, name: 'normalize' })
async normalize(data: unknown): Promise<RawOrder> {
  // Trim, lowercase, type coercion
  return normalized;
}

@Transform({ step: 3, name: 'enrich' })
async enrich(data: RawOrder): Promise<ProcessedOrder> {
  // Add computed fields
  return enriched;
}
Enter fullscreen mode Exit fullscreen mode

The Transform decorator defines the metadata used by ETLService.

@validate

Marks a step as validation. The SchemaValidator validates data against the provided schema before the method runs:

@Validate({
  step: 2,
  name: 'validate',
  schema: OrderSchema,
})
async validate(data: unknown): Promise<RawOrder> {
  return data; // Validation happens automatically before this runs
}
Enter fullscreen mode Exit fullscreen mode

The Validate decorator ties validation to the pipeline lifecycle.


OrderProcessingPipeline Deep Dive

The OrderProcessingPipeline demonstrates a typical e-commerce ETL flow:

Step 1: Normalize

Trim strings, lowercase status, ensure numeric types:

@Transform({ step: 1, name: 'normalize' })
async normalize(data: unknown): Promise<RawOrder> {
  const raw = data as RawOrder;
  return {
    id: (raw.id ?? '').toString().trim(),
    customerId: (raw.customerId ?? '').toString().trim(),
    items: (raw.items ?? []).map((item) => ({
      sku: (item.sku ?? '').toString().trim(),
      qty: Math.max(0, Number(item.qty) || 0),
      price: Math.max(0, Number(item.price) || 0),
    })),
    status: (raw.status ?? 'pending').toString().toLowerCase().trim(),
    createdAt: (raw.createdAt ?? '').toString().trim(),
  };
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Validate

Schema ensures required fields and types:

const OrderSchema = Schema.object({
  id: Schema.string().min(1),
  customerId: Schema.string().min(1),
  items: Schema.array(OrderItemSchema),
  status: Schema.string().oneOf(['pending', 'paid', 'shipped', 'delivered', 'cancelled']),
  createdAt: Schema.string().min(1),
});
Enter fullscreen mode Exit fullscreen mode

Step 3: Enrich

Add subtotal, tax (10%), total, and processedAt:

@Transform({ step: 3, name: 'enrich' })
async enrich(data: RawOrder): Promise<ProcessedOrder> {
  const items = data.items.map((item) => ({
    ...item,
    subtotal: item.qty * item.price,
  }));
  const subtotal = items.reduce((sum, i) => sum + i.subtotal, 0);
  const tax = Math.round(subtotal * 0.1 * 100) / 100;
  return { ...data, total: subtotal + tax, tax, processedAt: new Date().toISOString() };
}
Enter fullscreen mode Exit fullscreen mode

UserIngestionPipeline

The UserIngestionPipeline shows user profile ingestion:

  1. normalize – Lowercase email, trim name, clamp age 0–150
  2. validate – Email format, name length, role enum
  3. sanitize – Remove internal fields, add ingestedAt

This pattern is reusable for customer data, CRM imports, and identity pipelines.


REST API Walkthrough

Process Single Order

curl -X POST http://localhost:3001/data/pipeline/orders \
  -H "Content-Type: application/json" \
  -d '{
    "id": "ord-001",
    "customerId": "cust-123",
    "items": [{ "sku": "WIDGET-A", "qty": 2, "price": 29.99 }],
    "status": "paid",
    "createdAt": "2025-02-20T10:00:00Z"
  }'
Enter fullscreen mode Exit fullscreen mode

Response:

{
  "result": {
    "id": "ord-001",
    "customerId": "cust-123",
    "items": [{ "sku": "WIDGET-A", "qty": 2, "price": 29.99, "subtotal": 59.98 }],
    "status": "paid",
    "total": 65.98,
    "tax": 6,
    "createdAt": "2025-02-20T10:00:00Z",
    "processedAt": "2025-02-23T12:00:00.000Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

Batch Process Orders

curl -X POST http://localhost:3001/data/pipeline/orders/batch \
  -H "Content-Type: application/json" \
  -d '{"orders": [ { "id": "ord-1", "customerId": "c1", "items": [{ "sku": "A", "qty": 1, "price": 10 }], "status": "paid", "createdAt": "2025-01-01" } ]}'
Enter fullscreen mode Exit fullscreen mode

The StreamService.processBatch processes each item through the pipeline.

Process User

curl -X POST http://localhost:3001/data/pipeline/users \
  -H "Content-Type: application/json" \
  -d '{"email": "alice@example.com", "name": "Alice Smith", "age": 28, "role": "user"}'
Enter fullscreen mode Exit fullscreen mode

Quality Checks

GET (sample data):

curl http://localhost:3001/data/quality
Enter fullscreen mode Exit fullscreen mode

POST (custom data):

curl -X POST http://localhost:3001/data/quality \
  -H "Content-Type: application/json" \
  -d '{
    "dataset": "orders",
    "data": [{ "id": "1", "customerId": "c1", "items": [], "status": "paid", "createdAt": "2025-01-01" }]
  }'
Enter fullscreen mode Exit fullscreen mode

The QualityService runs registered checks (completeness, notNull) and returns a DataQualityReport.


Schema API Reference

@hazeljs/data provides a fluent Schema builder:

import { Schema } from '@hazeljs/data';

Schema.string()        // .min(n), .max(n), .email(), .uuid(), .oneOf([...])
Schema.number()        // .min(n), .max(n)
Schema.date()
Schema.object(shape)   // shape: Record<string, BaseSchema>
Schema.array(itemSchema)
Enter fullscreen mode Exit fullscreen mode

Example:

const UserSchema = Schema.object({
  email: Schema.string().email(),
  name: Schema.string().min(1).max(200),
  age: Schema.number().min(0).max(150),
  role: Schema.string().oneOf(['user', 'admin', 'moderator', 'guest']),
});
Enter fullscreen mode Exit fullscreen mode

Validation throws SchemaValidationException with structured errors when data fails.


Programmatic Pipeline Execution

Run pipelines without the HTTP server:

npm run run:sample
Enter fullscreen mode Exit fullscreen mode

This loads sample-orders.json and sample-users.json and processes them via the run-sample-pipelines script. Useful for:

  • CI/CD data validation jobs
  • Batch imports
  • Local testing

Extending to Production

1. Flink Integration

Configure DataModule.forRoot() with a Flink client:

DataModule.forRoot({
  flink: {
    url: process.env.FLINK_REST_URL || 'http://localhost:8081',
    timeout: 30000,
  },
})
Enter fullscreen mode Exit fullscreen mode

Use FlinkService and @Stream for Kafka/Flink-style streaming.

2. @Stream Pipelines

Add @Stream({ name, source, sink, parallelism }) for real-time processing. The Stream decorator and StreamProcessor support async iterables and batch processing.

3. PipelineBuilder

Compose pipelines programmatically with PipelineBuilder and PipelineStepConfig.

4. Built-in Transformers

Use TransformerService and built-in transformers: trimString, toLowerCase, toUpperCase, parseJson, stringifyJson, pick, omit, renameKeys.

5. Custom Quality Checks

Register custom checks with QualityService.registerCheck(). Use completeness(requiredFields) and notNull(fields) or pass custom functions.

6. Integrate with @hazeljs/ml

Combine @hazeljs/data with @hazeljs/ml for ML training pipelines—use data pipelines to prepare training data before passing to TrainerService.


Summary

The HazelJS Data Starter demonstrates how to build production ETL pipelines in Node.js with:

  • Decorator-based pipelines via @hazeljs/data
  • OrderProcessingPipeline and UserIngestionPipeline as real-world examples
  • Schema validation with a fluent Schema API
  • Batch processing via StreamService
  • Data quality checks with QualityService
  • REST API for pipeline execution and quality reports

You can use it as a template for order processing, user ingestion, event streaming, or any ETL workload that fits the same pattern.


Links and Resources


This blog post was created for the HazelJS Data Starter. For questions and contributions, visit the HazelJS GitHub repository or community.

Top comments (0)