A comprehensive guide to the HazelJS Data Starter—decorator-based ETL, Schema validation, and data quality in TypeScript
If you like it don't forget to Star HazelJS repository
Introduction
Data pipelines are the backbone of modern applications. Whether you're ingesting e-commerce orders, processing user profiles, or streaming events to analytics, you need reliable ETL (Extract, Transform, Load) with validation, quality checks, and a clean API.
HazelJS is a decorator-first Node.js framework that provides @hazeljs/data—a module for pipeline orchestration, Schema validation, batch and stream processing, and data quality. In this post, we'll walk through the HazelJS Data Starter—a real-world example with order processing, user ingestion, and quality checks.
Why ETL in Node.js?
Node.js excels at I/O-bound workloads. ETL pipelines often involve:
- REST APIs receiving and responding to data
- Database reads/writes for persistence
- Stream processing for real-time events
- Validation before data enters your system
Keeping ETL in the same runtime as your API simplifies deployment, reduces latency, and lets you reuse TypeScript types and validation logic. @hazeljs/data provides decorators (@Pipeline, @Transform, @Validate, @Stream) and services (ETLService, StreamService, QualityService) for exactly this pattern.
What is @hazeljs/data?
@hazeljs/data extends HazelJS with:
| Component | Purpose |
|---|---|
| @Pipeline | Mark classes as ETL pipelines with sequential step execution |
| @Transform | Data transformation steps with ordering |
| @validate | Schema validation at specific steps |
| @Stream | Streaming pipelines for Kafka/Flink-style processing |
| Schema | Fluent validation API (string, number, object, array, email, oneOf) |
| ETLService | Executes pipeline steps sequentially |
| StreamService | Batch and stream processing |
| QualityService | Data quality checks (completeness, notNull, custom) |
| PipelineBuilder | Programmatic pipeline composition |
| FlinkService | Apache Flink job deployment (optional) |
You can explore the Data package source and the HazelJS homepage for the full API.
Architecture of the HazelJS Data Starter
The HazelJS Data Starter implements two production-ready pipelines plus supporting infrastructure:
| Component | Responsibility |
|---|---|
| OrderProcessingPipeline | E-commerce order ETL: normalize → validate → enrich (subtotal, tax, total) |
| UserIngestionPipeline | User profile ETL: normalize → validate → sanitize |
| DataModule | Registers ETLService, StreamService, QualityService, and more |
| DataController | REST endpoints for pipeline execution and quality checks |
| DataBootstrap | Registers quality checks on startup |
| SchemaValidator | Validates data against Schema definitions |
The starter repository includes sample data and a CLI script for programmatic execution.
Getting Started
Prerequisites
- Node.js 18+
- HazelJS and @hazeljs/data installed
Installation
# Clone or navigate to the starter
cd hazeljs-data-starter
# Install dependencies
npm install
# Build
npm run build
# Start the server
npm start
The API is available at http://localhost:3001. For development with hot reload, use npm run dev. Full setup is in the starter README.
The @Pipeline, @Transform, and @validate Decorators
@hazeljs/data uses three core decorators to define ETL pipelines:
@Pipeline
Marks a class as an ETL pipeline. Optionally provide a name for logging and introspection:
@Pipeline('order-processing')
@Injectable()
export class OrderProcessingPipeline extends PipelineBase {
// ...
}
The Pipeline decorator source shows the full options.
@Transform
Marks a method as a transformation step. Steps run in order; output of step N feeds into step N+1:
@Transform({ step: 1, name: 'normalize' })
async normalize(data: unknown): Promise<RawOrder> {
// Trim, lowercase, type coercion
return normalized;
}
@Transform({ step: 3, name: 'enrich' })
async enrich(data: RawOrder): Promise<ProcessedOrder> {
// Add computed fields
return enriched;
}
The Transform decorator defines the metadata used by ETLService.
@validate
Marks a step as validation. The SchemaValidator validates data against the provided schema before the method runs:
@Validate({
step: 2,
name: 'validate',
schema: OrderSchema,
})
async validate(data: unknown): Promise<RawOrder> {
return data; // Validation happens automatically before this runs
}
The Validate decorator ties validation to the pipeline lifecycle.
OrderProcessingPipeline Deep Dive
The OrderProcessingPipeline demonstrates a typical e-commerce ETL flow:
Step 1: Normalize
Trim strings, lowercase status, ensure numeric types:
@Transform({ step: 1, name: 'normalize' })
async normalize(data: unknown): Promise<RawOrder> {
const raw = data as RawOrder;
return {
id: (raw.id ?? '').toString().trim(),
customerId: (raw.customerId ?? '').toString().trim(),
items: (raw.items ?? []).map((item) => ({
sku: (item.sku ?? '').toString().trim(),
qty: Math.max(0, Number(item.qty) || 0),
price: Math.max(0, Number(item.price) || 0),
})),
status: (raw.status ?? 'pending').toString().toLowerCase().trim(),
createdAt: (raw.createdAt ?? '').toString().trim(),
};
}
Step 2: Validate
Schema ensures required fields and types:
const OrderSchema = Schema.object({
id: Schema.string().min(1),
customerId: Schema.string().min(1),
items: Schema.array(OrderItemSchema),
status: Schema.string().oneOf(['pending', 'paid', 'shipped', 'delivered', 'cancelled']),
createdAt: Schema.string().min(1),
});
Step 3: Enrich
Add subtotal, tax (10%), total, and processedAt:
@Transform({ step: 3, name: 'enrich' })
async enrich(data: RawOrder): Promise<ProcessedOrder> {
const items = data.items.map((item) => ({
...item,
subtotal: item.qty * item.price,
}));
const subtotal = items.reduce((sum, i) => sum + i.subtotal, 0);
const tax = Math.round(subtotal * 0.1 * 100) / 100;
return { ...data, total: subtotal + tax, tax, processedAt: new Date().toISOString() };
}
UserIngestionPipeline
The UserIngestionPipeline shows user profile ingestion:
- normalize – Lowercase email, trim name, clamp age 0–150
- validate – Email format, name length, role enum
-
sanitize – Remove internal fields, add
ingestedAt
This pattern is reusable for customer data, CRM imports, and identity pipelines.
REST API Walkthrough
Process Single Order
curl -X POST http://localhost:3001/data/pipeline/orders \
-H "Content-Type: application/json" \
-d '{
"id": "ord-001",
"customerId": "cust-123",
"items": [{ "sku": "WIDGET-A", "qty": 2, "price": 29.99 }],
"status": "paid",
"createdAt": "2025-02-20T10:00:00Z"
}'
Response:
{
"result": {
"id": "ord-001",
"customerId": "cust-123",
"items": [{ "sku": "WIDGET-A", "qty": 2, "price": 29.99, "subtotal": 59.98 }],
"status": "paid",
"total": 65.98,
"tax": 6,
"createdAt": "2025-02-20T10:00:00Z",
"processedAt": "2025-02-23T12:00:00.000Z"
}
}
Batch Process Orders
curl -X POST http://localhost:3001/data/pipeline/orders/batch \
-H "Content-Type: application/json" \
-d '{"orders": [ { "id": "ord-1", "customerId": "c1", "items": [{ "sku": "A", "qty": 1, "price": 10 }], "status": "paid", "createdAt": "2025-01-01" } ]}'
The StreamService.processBatch processes each item through the pipeline.
Process User
curl -X POST http://localhost:3001/data/pipeline/users \
-H "Content-Type: application/json" \
-d '{"email": "alice@example.com", "name": "Alice Smith", "age": 28, "role": "user"}'
Quality Checks
GET (sample data):
curl http://localhost:3001/data/quality
POST (custom data):
curl -X POST http://localhost:3001/data/quality \
-H "Content-Type: application/json" \
-d '{
"dataset": "orders",
"data": [{ "id": "1", "customerId": "c1", "items": [], "status": "paid", "createdAt": "2025-01-01" }]
}'
The QualityService runs registered checks (completeness, notNull) and returns a DataQualityReport.
Schema API Reference
@hazeljs/data provides a fluent Schema builder:
import { Schema } from '@hazeljs/data';
Schema.string() // .min(n), .max(n), .email(), .uuid(), .oneOf([...])
Schema.number() // .min(n), .max(n)
Schema.date()
Schema.object(shape) // shape: Record<string, BaseSchema>
Schema.array(itemSchema)
Example:
const UserSchema = Schema.object({
email: Schema.string().email(),
name: Schema.string().min(1).max(200),
age: Schema.number().min(0).max(150),
role: Schema.string().oneOf(['user', 'admin', 'moderator', 'guest']),
});
Validation throws SchemaValidationException with structured errors when data fails.
Programmatic Pipeline Execution
Run pipelines without the HTTP server:
npm run run:sample
This loads sample-orders.json and sample-users.json and processes them via the run-sample-pipelines script. Useful for:
- CI/CD data validation jobs
- Batch imports
- Local testing
Extending to Production
1. Flink Integration
Configure DataModule.forRoot() with a Flink client:
DataModule.forRoot({
flink: {
url: process.env.FLINK_REST_URL || 'http://localhost:8081',
timeout: 30000,
},
})
Use FlinkService and @Stream for Kafka/Flink-style streaming.
2. @Stream Pipelines
Add @Stream({ name, source, sink, parallelism }) for real-time processing. The Stream decorator and StreamProcessor support async iterables and batch processing.
3. PipelineBuilder
Compose pipelines programmatically with PipelineBuilder and PipelineStepConfig.
4. Built-in Transformers
Use TransformerService and built-in transformers: trimString, toLowerCase, toUpperCase, parseJson, stringifyJson, pick, omit, renameKeys.
5. Custom Quality Checks
Register custom checks with QualityService.registerCheck(). Use completeness(requiredFields) and notNull(fields) or pass custom functions.
6. Integrate with @hazeljs/ml
Combine @hazeljs/data with @hazeljs/ml for ML training pipelines—use data pipelines to prepare training data before passing to TrainerService.
Summary
The HazelJS Data Starter demonstrates how to build production ETL pipelines in Node.js with:
- Decorator-based pipelines via @hazeljs/data
- OrderProcessingPipeline and UserIngestionPipeline as real-world examples
- Schema validation with a fluent Schema API
- Batch processing via StreamService
- Data quality checks with QualityService
- REST API for pipeline execution and quality reports
You can use it as a template for order processing, user ingestion, event streaming, or any ETL workload that fits the same pattern.
Links and Resources
| Resource | URL |
|---|---|
| HazelJS | https://hazeljs.ai |
| HazelJS GitHub | https://github.com/hazel-js/hazeljs |
| @hazeljs/data on npm | https://www.npmjs.com/package/@hazeljs/data |
| @hazeljs/core on npm | https://www.npmjs.com/package/@hazeljs/core |
| @hazeljs/ml on npm | https://www.npmjs.com/package/@hazeljs/ml |
| @hazeljs/ai on npm | https://www.npmjs.com/package/@hazeljs/ai |
| Data package source | packages/data |
| ML package source | packages/ml |
| ETLService | pipelines/etl.service.ts |
| PipelineBase | pipelines/pipeline.base.ts |
| Schema | schema/schema.ts |
| Apache Flink | https://flink.apache.org |
| Open Collective | https://opencollective.com/hazeljs |
This blog post was created for the HazelJS Data Starter. For questions and contributions, visit the HazelJS GitHub repository or community.
Top comments (0)