DEV Community

Cover image for What's New in HazelJS Data: PipelineBuilder, PII Safety, and Advanced Quality
Muhammad Arslan
Muhammad Arslan

Posted on

What's New in HazelJS Data: PipelineBuilder, PII Safety, and Advanced Quality

A deep dive into the latest @hazeljs/data features—programmatic pipelines, conditional steps, PII decorators, profiling, and anomaly detection


Introduction

The HazelJS Data Starter has been updated with a host of new features that make ETL pipelines more expressive, safer, and easier to test. This post walks through each addition with code examples from the starter.


1. PipelineBuilder: Programmatic Pipelines

Not every pipeline needs decorators. Sometimes you want to compose transforms in code—branching on conditions, running steps in parallel, or recovering from errors. PipelineBuilder provides an immutable, fluent API for exactly that.

LogEnrichmentPipeline

The starter includes log-enrichment.pipeline.ts, a programmatic pipeline that demonstrates:

  • branch – Fork based on log level (error vs info)
  • parallel – Run multiple transforms concurrently and merge results
  • catch – Recover from step failures instead of throwing
import { PipelineBuilder } from '@hazeljs/data';

export const logEnrichmentPipeline = new PipelineBuilder('log-enrichment')
  .addTransform('normalize', (d: unknown) => {
    const obj = d as Record<string, unknown>;
    return {
      ...obj,
      level: String(obj.level ?? 'info').toLowerCase(),
      message: String(obj.message ?? '').trim(),
    };
  })
  .branch(
    'classify',
    (d) => (d as { level: string }).level === 'error',
    (b) => b.addTransform('enrichError', (d) => ({ ...d, severity: 'high', enrichedAt: new Date().toISOString() })),
    (b) => b.addTransform('enrichInfo', (d) => ({ ...d, severity: 'normal', enrichedAt: new Date().toISOString() }))
  )
  .parallel('metadata', [
    (d) => ({ host: (d as Record<string, unknown>).host ?? 'unknown' }),
    (d) => ({ timestamp: (d as Record<string, unknown>).timestamp ?? new Date().toISOString() }),
  ])
  .addValidate('validate', (d) => d)
  .catch((data, err) => ({ ...(data as object), error: err.message, recovered: true }));
Enter fullscreen mode Exit fullscreen mode

Try It

curl -X POST http://localhost:3001/data/pipeline/logs \
  -H "Content-Type: application/json" \
  -d '{"level": "error", "message": "Connection timeout", "host": "api-01"}'
Enter fullscreen mode Exit fullscreen mode

Response:

{
  "result": {
    "level": "error",
    "message": "Connection timeout",
    "host": "api-01",
    "severity": "high",
    "enrichedAt": "2025-03-06T12:00:00.000Z",
    "timestamp": "2025-03-06T12:00:00.000Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

2. Conditional Steps with when

Steps can now run only when a predicate returns true. The OrderProcessingPipeline uses this to skip enrichment for cancelled orders:

@Transform({
  step: 3,
  name: 'enrich',
  when: (d) => (d as RawOrder).status !== 'cancelled',
})
async enrich(data: RawOrder): Promise<ProcessedOrder> {
  // Add subtotal, tax, total—only for non-cancelled orders
  // ...
}
Enter fullscreen mode Exit fullscreen mode

A finalize step runs for all orders and adds processedAt (and default totals for cancelled orders). This keeps the pipeline flow clean while avoiding unnecessary computation.


3. PII Decorators: @Redact, @mask, @Encrypt

Sensitive data should never leak into logs or downstream systems. @hazeljs/data provides PII decorators that run before the decorated method executes.

@Redact

The UserIngestionPipeline uses @Redact to strip internalId from output:

@Transform({ step: 3, name: 'sanitize' })
@Redact({ fields: ['internalId'] })
async sanitize(data: RawUser): Promise<ProcessedUser> {
  return {
    email: data.email ?? '',
    name: data.name ?? '',
    age: data.age ?? 0,
    role: data.role ?? 'user',
    ingestedAt: new Date().toISOString(),
  };
}
Enter fullscreen mode Exit fullscreen mode

By the time sanitize runs, internalId has already been removed from the data. The method receives clean data and returns a safe ProcessedUser.

@mask and @Encrypt

  • @mask – Replaces field values with **** (or a custom replacement). Use showLast: 4 to reveal the last 4 characters (e.g. for card numbers).
  • @Encrypt – AES-256-GCM encrypts specified fields. Use with @Decrypt when reading back.

4. Enhanced Quality Checks

The starter now registers more built-in checks in data.bootstrap.ts:

Check Purpose
completeness Required fields present
notNull No null/undefined in specified fields
uniqueness No duplicate values in specified fields
range Numeric values within min/max
referentialIntegrity Values in allowed set (enum-like)

Example registration:

qualityService.registerCheck('order-uniqueness', qualityService.uniqueness(['id']));
qualityService.registerCheck('order-status-ref', qualityService.referentialIntegrity('status', [
  'pending', 'paid', 'shipped', 'delivered', 'cancelled',
]));
qualityService.registerCheck('user-age-range', qualityService.range('age', { min: 0, max: 150 }));
qualityService.registerCheck('user-role-ref', qualityService.referentialIntegrity('role', [
  'user', 'admin', 'moderator', 'guest',
]));
Enter fullscreen mode Exit fullscreen mode

Quality reports include a score 0–100 and per-check details.


5. Data Profiling

QualityService.profile() computes field-level statistics: null count, cardinality, min/max, mean, stddev, top values.

Endpoint

curl -X POST http://localhost:3001/data/quality/profile \
  -H "Content-Type: application/json" \
  -d '{
    "dataset": "users",
    "data": [
      { "name": "Alice", "age": 28 },
      { "name": "Bob", "age": 35 },
      { "name": "Carol", "age": 42 }
    ]
  }'
Enter fullscreen mode Exit fullscreen mode

Response:

{
  "profile": {
    "dataset": "users",
    "totalRows": 3,
    "fields": {
      "name": {
        "count": 3,
        "nullCount": 0,
        "nullPct": 0,
        "uniqueCount": 3,
        "cardinality": 1,
        "topValues": [{"value": "Alice", "count": 1}, ...]
      },
      "age": {
        "count": 3,
        "nullCount": 0,
        "mean": 35,
        "stddev": 7,
        "min": 28,
        "max": 42,
        ...
      }
    },
    "generatedAt": "2025-03-06T12:00:00.000Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

6. Anomaly Detection

QualityService.detectAnomalies() flags rows where numeric fields deviate beyond a z-score threshold from the mean.

Endpoint

curl -X POST http://localhost:3001/data/quality/anomalies \
  -H "Content-Type: application/json" \
  -d '{
    "data": [
      { "value": 10 },
      { "value": 11 },
      { "value": 12 },
      { "value": 1000 }
    ],
    "fields": ["value"],
    "threshold": 1.5
  }'
Enter fullscreen mode Exit fullscreen mode

Response:

{
  "anomalies": [
    {
      "field": "value",
      "rowIndex": 3,
      "value": 1000,
      "zScore": 2.89,
      "message": "Value 1000 is 2.89 stddev from mean"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Use this for outlier detection in metrics, prices, or sensor data.


7. SchemaFaker: Generate Test Data

SchemaFaker generates fake data that conforms to a schema—useful for seeding tests and demos.

The run-sample-pipelines script demonstrates it:

import { Schema, SchemaFaker } from '@hazeljs/data';

const SimpleUserSchema = Schema.object({
  name: Schema.string(),
  age: Schema.number().min(0).max(150),
});
const fakeUsers = SchemaFaker.generateMany(SimpleUserSchema, 2);
// [{ name: "x7k2m", age: 42 }, { name: "abc12", age: 89 }]
Enter fullscreen mode Exit fullscreen mode

Run npm run run:sample to see SchemaFaker output alongside pipeline results.


8. StreamProcessor: Windowing

StreamProcessor supports tumbling, sliding, and session windows, plus stream join. The sample script shows tumblingWindow:

import { StreamProcessor } from '@hazeljs/data';

async function* timestampedSource() {
  yield { value: 1, timestamp: 100 };
  yield { value: 2, timestamp: 150 };
  yield { value: 3, timestamp: 250 };
}

const processor = new StreamProcessor(etlService);
for await (const batch of processor.tumblingWindow(timestampedSource(), 100)) {
  console.log(batch);
  // { items: [1, 2], windowStart: 100, windowEnd: 200 }
  // { items: [3], windowStart: 200, windowEnd: 300 }
}
Enter fullscreen mode Exit fullscreen mode

Also available: slidingWindow, sessionWindow, joinStreams.


9. Pipeline Options: Retry, Timeout, DLQ

Decorator-based pipelines support per-step options:

Option Purpose
when Run step only when predicate returns true
retry Retry failed step with fixed or exponential backoff
timeoutMs Abort step after N milliseconds
dlq Route failed records to a handler instead of throwing

Example:

@Transform({
  step: 2,
  name: 'enrich',
  when: (d) => (d as { type: string }).type === 'order',
  retry: { attempts: 3, delay: 500, backoff: 'exponential' },
  timeoutMs: 5000,
  dlq: { handler: (item, err, step) => logger.error('DLQ', { item, err, step }) },
})
async enrich(data: unknown) {
  return { ...data, enriched: true };
}
Enter fullscreen mode Exit fullscreen mode

API Endpoints Summary

Endpoint Description
POST /data/pipeline/orders Process single order
POST /data/pipeline/orders/batch Batch process orders
POST /data/pipeline/users Process single user
POST /data/pipeline/logs Process log (PipelineBuilder)
POST /data/quality Run quality checks
POST /data/quality/profile Run data profiling
POST /data/quality/anomalies Detect anomalies
GET /data/quality Quick quality check with sample data

Running the Starter

cd hazeljs-data-starter
npm install
npm run build
npm start
Enter fullscreen mode Exit fullscreen mode

For programmatic execution (pipelines + SchemaFaker + StreamProcessor):

npm run run:sample
Enter fullscreen mode Exit fullscreen mode

Summary

The HazelJS Data Starter now showcases:

  • PipelineBuilder – Branch, parallel, catch without decorators
  • Conditional stepswhen for skip logic
  • PII decorators – @Redact, @mask, @Encrypt for sensitive data
  • Enhanced quality – Uniqueness, range, referentialIntegrity
  • Data profiling – Field stats and top values
  • Anomaly detection – Outlier flagging by z-score
  • SchemaFaker – Test data generation
  • StreamProcessor – Tumbling/sliding/session windows, join

These features make @hazeljs/data suitable for production ETL with robust validation, observability, and safety.


Links and Resources

Top comments (0)