DEV Community

Cover image for Encore.ts + Effect.ts : Building a Production-Ready Task Management System with Functional TypeScript
Leonardo Marciano
Leonardo Marciano

Posted on

Encore.ts + Effect.ts : Building a Production-Ready Task Management System with Functional TypeScript

Introduction

What if your backend could have typed errors that compile down to HTTP status codes, automatic request batching, fiber-based concurrency with semaphores and queues, and auto-provisioned databases, PubSub, and object storage... all in TypeScript?

That's what happens when you combine Effect.ts and Encore.ts.

Effect.ts gives you a functional programming toolkit for TypeScript: composable effects, typed errors, dependency injection via layers, streams, concurrency primitives (fibers, semaphores, queues), and a schema system that makes Zod look like a toy.

Encore.ts gives you infrastructure-as-code: declare a SQLDatabase, Topic, Bucket, or CronJob in your TypeScript code, and Encore provisions it automatically. Locally, in preview environments, and in production. No Terraform(Must read: The Last Year of Terraform), no Docker Compose, no YAML.

Together, they're remarkably complementary. Effect handles the how (composition, error handling, concurrency), Encore handles the what (databases, messaging, auth, deployment).

In this article, I'll walk you through a Task Management System (think mini Jira/Linear) we built with 7 microservices, covering virtually every major feature of both frameworks. This isn't a toy example. It's a reference architecture you can study, fork, and build on.


System Architecture

Here's the bird's-eye view of our system:

Client Request
    │
    ▼
┌─────────┐     ┌──────┐
│ Gateway │────▶│ auth │ validates JWT token
└────┬────┘     └──────┘
     │
     ▼
┌────────────┐
│ Middleware │ enriches request (timing/tracing)
└─────┬──────┘
      │
      ├──▶ [user]          ◀── service-to-service ── [project], [auth]
      │      ├── users DB
      │      └── Effect Cache (permissions)
      │
      ├──▶ [project]
      │      └── projects DB + project_members
      │
      ├──▶ [task]          ──▶ publishes to PubSub topics
      │      ├── tasks DB + comments
      │      ├── CronJob (stale task check)
      │      └── WebSocket streams (activity feed)
      │
      ├──▶ [notification]  ◀── subscribes to PubSub topics
      │      ├── notifications DB
      │      ├── Queue + Deferred (async dispatch)
      │      └── WebSocket stream (real-time push)
      │
      ├──▶ [analytics]     ◀── subscribes to PubSub topics
      │      ├── analytics DB (events + daily_stats)
      │      ├── Encore Metrics (Counter, Gauge)
      │      └── Effect Stream/Sink (aggregation pipelines)
      │
      └──▶ [storage]
             ├── Encore Bucket (object storage)
             └── Effect Semaphore (rate limiting)
Enter fullscreen mode Exit fullscreen mode

Each service has its own encore.service.ts, its own database (auto-provisioned by Encore), and communicates via PubSub topics for async events or direct service-to-service calls for synchronous operations.

Encore Service Catalog showing 7 services


The Bridge: Connecting Effect.ts to Encore.ts

This is the single most important pattern in the entire codebase. Encore expects your API handlers to be async functions that return Promises. Effect programs return Effect<A, E> values. We need a bridge.

Here's runEffect. Every Encore endpoint calls this:

import { Cause, Chunk, Effect, Exit, Match, pipe } from "effect"
import { APIError } from "encore.dev/api"
import type { AppError } from "./errors.js"

export const runEffect = async <A, E extends AppError>(
  effect: Effect.Effect<A, E>,
): Promise<A> => {
  const exit = await Effect.runPromiseExit(
    effect.pipe(
      Effect.withSpan("encore-handler"),
      Effect.annotateLogs("runtime", "encore"),
    ),
  )

  if (Exit.isSuccess(exit)) {
    return exit.value
  }

  const cause = exit.cause

  // Expected errors → translate to APIError
  if (Cause.isFailType(cause)) {
    throw effectErrorToAPIError(cause.error as AppError)
  }

  // Fiber interruption → aborted
  if (Cause.isInterruptType(cause)) {
    throw APIError.aborted("Request was interrupted")
  }

  // Defects (bugs) → internal error
  if (Cause.isDieType(cause)) {
    console.error("Unexpected defect:", cause.defect)
    throw APIError.internal("Internal server error")
  }

  // Composite causes → extract first failure
  const failures = Chunk.toArray(Cause.failures(cause))
  if (failures.length > 0) {
    throw effectErrorToAPIError(failures[0] as AppError)
  }

  throw APIError.internal("Unknown error")
}
Enter fullscreen mode Exit fullscreen mode

The key insight: Effect's Exit type distinguishes between expected failures (your domain errors), defects (bugs/panics), and interruptions (cancelled fibers). We handle each differently.

Service Catalog

The error translation uses Effect's Match module for exhaustive mapping:

export const effectErrorToAPIError = (error: AppError): APIError =>
  pipe(
    error,
    Match.value,
    Match.tag("NotFoundError", (e) =>
      APIError.notFound(`${e.entity} '${e.id}' not found`)),
    Match.tag("ValidationError", (e) =>
      APIError.invalidArgument(`${e.field}: ${e.message}`)),
    Match.tag("AuthenticationError", (e) =>
      APIError.unauthenticated(e.reason)),
    Match.tag("AuthorizationError", (e) =>
      APIError.permissionDenied(
        `User '${e.userId}' cannot '${e.action}' on '${e.resource}'`)),
    Match.tag("RateLimitError", (e) =>
      APIError.resourceExhausted(`Rate limited. Retry after ${e.retryAfter}s`)),
    Match.tag("DatabaseError", (e) =>
      APIError.internal(`Database error in '${e.operation}'`)),
    Match.tag("InvalidStateTransitionError", (e) =>
      APIError.failedPrecondition(
        `Cannot transition ${e.entity} from '${e.from}' to '${e.to}'`)),
    // ... all 10 error types mapped
    Match.exhaustive, // Compiler error if you miss one!
  )
Enter fullscreen mode Exit fullscreen mode

Match.exhaustive is the magic. If you add a new error type to AppError but forget to handle it here, TypeScript won't compile. No more silent 500s from unhandled error cases.

And this is how every Encore endpoint looks:

import { api } from "encore.dev/api"
import { Effect } from "effect"
import { TaskService, TaskServiceLive } from "./task-service.js"
import { runEffect } from "../shared/effect-encore-bridge.js"

export const getTask = api(
  { expose: true, method: "GET", path: "/tasks/:id", auth: true },
  async ({ id }: { id: string }): Promise<TaskResponse> =>
    runEffect(
      Effect.gen(function* () {
        const service = yield* TaskService
        return yield* service.getById(TaskId(id))
      }).pipe(Effect.provide(TaskServiceLive))
    )
)
Enter fullscreen mode Exit fullscreen mode

Clean, consistent, and every error is automatically translated to the correct HTTP status code.


Type Safety with Branded Types

A UserId is a string. A ProjectId is also a string. But they are not the same thing. Effect's Brand module prevents you from accidentally passing one where the other is expected:

import { Brand, Order, Equivalence, Equal, Hash } from "effect"

export type UserId = string & Brand.Brand<"UserId">
export const UserId = Brand.nominal<UserId>()

export type ProjectId = string & Brand.Brand<"ProjectId">
export const ProjectId = Brand.nominal<ProjectId>()

export type TaskId = string & Brand.Brand<"TaskId">
export const TaskId = Brand.nominal<TaskId>()
Enter fullscreen mode Exit fullscreen mode

Now service.getById(TaskId("abc")) compiles, but service.getById(UserId("abc")) is a type error. This catches entire classes of bugs at compile time. Imagine an accidental WHERE user_id = $projectId query that silently returns nothing.

We also get custom ordering for sorting:

export type TaskPriority = "critical" | "high" | "medium" | "low"

const priorityRank: Record<TaskPriority, number> = {
  critical: 0, high: 1, medium: 2, low: 3,
}

export const TaskPriorityOrder: Order.Order<TaskPriority> =
  Order.mapInput(Order.number, (p: TaskPriority) => priorityRank[p])
Enter fullscreen mode Exit fullscreen mode

And custom Equal + Hash implementations for use in HashMap/HashSet:

export class ProjectMemberKey implements Equal.Equal {
  constructor(readonly projectId: ProjectId, readonly userId: UserId) {}

  [Equal.symbol](that: Equal.Equal): boolean {
    if (that instanceof ProjectMemberKey) {
      return this.projectId === that.projectId && this.userId === that.userId
    }
    return false
  }

  [Hash.symbol](): number {
    return Hash.combine(Hash.hash(this.projectId))(Hash.hash(this.userId))
  }
}
Enter fullscreen mode Exit fullscreen mode

Typed Error Hierarchy

Every error in the system extends Data.TaggedError, making them yieldable inside Effect.gen generators:

import { Data } from "effect"

export class NotFoundError extends Data.TaggedError("NotFoundError")<{
  readonly entity: string
  readonly id: string
}> {}

export class ValidationError extends Data.TaggedError("ValidationError")<{
  readonly field: string
  readonly message: string
}> {}

export class InvalidStateTransitionError extends Data.TaggedError(
  "InvalidStateTransitionError"
)<{
  readonly entity: string
  readonly from: string
  readonly to: string
}> {}

// ... 10 total error types

export type AppError =
  | DatabaseError | NotFoundError | ValidationError
  | AuthenticationError | AuthorizationError | ConflictError
  | RateLimitError | ExternalServiceError | TimeoutError
  | InvalidStateTransitionError
Enter fullscreen mode Exit fullscreen mode

The discriminated union AppError is what makes Match.exhaustive work in the bridge. Each error carries structured context (not just a message string), so the API response can be precise:

TaggedError HTTP Status Example Message
NotFoundError 404 Task 'abc-123' not found
ValidationError 400 title: Must be at least 1 character
AuthenticationError 401 Invalid or expired token
AuthorizationError 403 User 'u1' cannot 'delete' on 'Project'
ConflictError 409 User: Email already exists
RateLimitError 429 Rate limited. Retry after 30s
InvalidStateTransitionError 400 Cannot transition Task from 'done' to 'in_progress'
DatabaseError 500 Database error in 'queryRow'
ExternalServiceError 503 Service 'object-storage' unavailable
TimeoutError 504 'queryRow' timed out after 5000ms

Schema: Bidirectional Validation

Effect Schema handles two things that Encore's built-in validation doesn't: domain object validation and database row transformation.

Encore validates at the HTTP boundary (email format, string length). Effect Schema validates at the domain boundary and transforms between database rows (snake_case) and domain objects (camelCase):

import { Schema } from "effect"

export class TaskSchema extends Schema.Class<TaskSchema>("TaskSchema")({
  id: Schema.String.pipe(Schema.brand("TaskId")),
  title: Schema.String.pipe(
    Schema.minLength(1), Schema.maxLength(200),
    Schema.annotations({ description: "Task title" })
  ),
  status: Schema.optionalWith(TaskStatusSchema, {
    default: () => "backlog" as const
  }),
  priority: Schema.optionalWith(TaskPrioritySchema, {
    default: () => "medium" as const
  }),
  storyPoints: Schema.OptionFromNullOr(
    Schema.Number.pipe(Schema.int(), Schema.between(1, 21))
  ),
  // ...
}) {}
Enter fullscreen mode Exit fullscreen mode

The bidirectional transform between DB rows and domain objects:

export const TaskFromRow = Schema.transform(
  // From: database row (snake_case)
  Schema.Struct({
    id: Schema.String,
    title: Schema.String,
    assignee_id: Schema.NullOr(Schema.String),
    project_id: Schema.String,
    created_at: Schema.String,
    // ...
  }),
  // To: domain object (camelCase + branded)
  Schema.Struct({
    id: Schema.String.pipe(Schema.brand("TaskId")),
    title: Schema.String,
    assigneeId: Schema.NullOr(Schema.String.pipe(Schema.brand("UserId"))),
    projectId: Schema.String.pipe(Schema.brand("ProjectId")),
    createdAt: Schema.String,
    // ...
  }),
  {
    strict: true,
    decode: (row) => ({
      id: row.id as any,
      title: row.title,
      assigneeId: row.assignee_id as any,
      projectId: row.project_id as any,
      createdAt: row.created_at,
    }),
    encode: (task) => ({
      id: task.id,
      title: task.title,
      assignee_id: task.assigneeId,
      project_id: task.projectId,
      created_at: task.createdAt,
    }),
  }
)
Enter fullscreen mode Exit fullscreen mode

Passwords use Schema.Redacted so they never appear in logs:

export const PasswordHashSchema = Schema.Redacted(Schema.String)
Enter fullscreen mode Exit fullscreen mode

Resilience: Retry, Timeout, Schedule

Every database query in the system goes through a retry + timeout wrapper:

import { Effect, Schedule, Duration, pipe, Option, Chunk } from "effect"

// Exponential backoff (100ms, 200ms, 400ms) + jitter + max 3 retries
export const dbRetrySchedule = Schedule.exponential("100 millis").pipe(
  Schedule.intersect(Schedule.recurs(3)),
  Schedule.jittered
)

export const queryRowEffect = <T>(
  db: SQLDatabase,
  queryFn: () => Promise<T | null>
): Effect.Effect<Option.Option<T>, DatabaseError> =>
  pipe(
    Effect.tryPromise({
      try: () => queryFn(),
      catch: (e) => new DatabaseError({ operation: "queryRow", cause: e }),
    }),
    Effect.map(Option.fromNullable),
    Effect.retry(dbRetrySchedule),
    Effect.timeout(Duration.seconds(5)),
    Effect.catchTag("TimeoutException", () =>
      Effect.fail(new DatabaseError({
        operation: "queryRow",
        cause: "Query timed out after 5s"
      }))
    )
  )
Enter fullscreen mode Exit fullscreen mode

Schedule.intersect combines two schedules: "exponential backoff" AND "max 3 attempts". Schedule.jittered adds randomness to prevent thundering herd problems when multiple requests retry simultaneously.

The task scheduler demonstrates more advanced Schedule combinators:

import { CronJob } from "encore.dev/cron"
import { Schedule } from "effect"

// Encore CronJob triggers the endpoint every hour
const _ = new CronJob("stale-task-check", {
  title: "Check for stale tasks",
  every: "1h",
  endpoint: checkStaleTasks,
})

// Effect Schedule for retry within the job
const staleCheckSchedule = Schedule.exponential("1 second").pipe(
  Schedule.jittered,
  Schedule.intersect(Schedule.recurs(3)),
  Schedule.union(Schedule.spaced(Duration.seconds(30))),
)
Enter fullscreen mode Exit fullscreen mode

State Machine with Match

Task status transitions follow strict rules. You can't go from backlog directly to done. The state machine is implemented with Effect's Match module:

import { Data, Match } from "effect"

class StateTransition extends Data.Class<{
  readonly from: string
  readonly to: string
}> {}

const isValidTransition = (from: string, to: string): boolean =>
  Match.value(new StateTransition({ from, to })).pipe(
    Match.when({ from: "backlog", to: "todo" }, () => true),
    Match.when({ from: "todo", to: "in_progress" }, () => true),
    Match.when({ from: "todo", to: "cancelled" }, () => true),
    Match.when({ from: "in_progress", to: "in_review" }, () => true),
    Match.when({ from: "in_progress", to: "todo" }, () => true),
    Match.when({ from: "in_review", to: "done" }, () => true),
    Match.when({ from: "in_review", to: "in_progress" }, () => true),
    Match.when({ from: "done", to: "todo" }, () => true), // reopen
    Match.orElse(() => false),
  )
Enter fullscreen mode Exit fullscreen mode

Used in the transition method:

transition: (id, newStatus) =>
  Effect.gen(function* () {
    const task = yield* repo.findById(id)
    const fromStatus = task.status

    if (!isValidTransition(fromStatus, newStatus)) {
      return yield* Effect.fail(
        new InvalidStateTransitionError({
          entity: "Task", from: fromStatus, to: newStatus,
        })
      )
    }

    const updated = yield* repo.update(id, { status: newStatus })
    return { task: updated, fromStatus }
  }).pipe(Effect.withSpan("TaskService.transition")),
Enter fullscreen mode Exit fullscreen mode

Bulk updates use error accumulation. Each update is independent, and failures don't block successes:

bulkUpdateStatus: (updates) =>
  Effect.forEach(
    updates,
    (u) =>
      pipe(
        Effect.gen(function* () {
          const task = yield* repo.findById(u.id)
          if (!isValidTransition(task.status, u.status)) {
            return yield* Effect.fail(
              new InvalidStateTransitionError({
                entity: "Task", from: task.status, to: u.status,
              })
            )
          }
          return yield* repo.update(u.id, { status: u.status })
        }),
        Effect.either, // Convert to Either: Right(task) | Left(error)
      ),
    { concurrency: 5 }, // Process 5 at a time
  ),
Enter fullscreen mode Exit fullscreen mode

The API returns { succeeded: Task[], failed: { id, error }[] } so the caller sees exactly what worked and what didn't.


Concurrency and Rate Limiting

Semaphore: Controlling Upload Concurrency

The storage service limits concurrent uploads to 10 using an Effect Semaphore:

import { Effect, Duration, pipe } from "effect"

export const makeUploadLimiter = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(10)

  return {
    withLimit: <A, E>(
      effect: Effect.Effect<A, E>
    ): Effect.Effect<A, E | RateLimitError> =>
      pipe(
        semaphore.withPermits(1)(effect),
        Effect.timeout(Duration.seconds(30)),
        Effect.catchTag("TimeoutException", () =>
          Effect.fail(new RateLimitError({ retryAfter: 30 }))
        )
      ),
  }
})
Enter fullscreen mode Exit fullscreen mode

If all 10 permits are taken, the 11th upload waits. If it waits more than 30 seconds, it gets a 429 Too Many Requests response.

Queue + Deferred: Async Notification Dispatch

The notification service uses a bounded queue with completion signaling:

import { Deferred, Effect, Fiber, Queue } from "effect"

export const makeNotificationDispatcher = Effect.gen(function* () {
  const queue = yield* Queue.bounded<PendingNotification>(1000)
  const startGate = yield* Deferred.make<void>()

  // Background consumer fiber
  const consumer = yield* Effect.fork(
    Effect.gen(function* () {
      yield* Deferred.await(startGate) // Wait until ready

      yield* Effect.forever(
        Effect.gen(function* () {
          const notification = yield* Queue.take(queue)
          const result = yield* pipe(
            Effect.logInfo(`Dispatching to ${notification.userId}`),
            Effect.either,
          )
          // Signal completion back to the producer
          if (result._tag === "Right") {
            yield* Deferred.succeed(notification.completion, undefined)
          } else {
            yield* Deferred.fail(notification.completion, /* error */)
          }
        }),
      )
    }),
  )

  return {
    enqueue: (input) =>
      Effect.gen(function* () {
        const deferred = yield* Deferred.make<void, ExternalServiceError>()
        yield* Queue.offer(queue, { ...input, completion: deferred })
        return yield* Deferred.await(deferred) // Wait for processing
      }),
    start: Deferred.succeed(startGate, undefined),
    shutdown: pipe(Queue.shutdown(queue), Effect.flatMap(() => Fiber.interrupt(consumer))),
  }
})
Enter fullscreen mode Exit fullscreen mode

Three primitives working together:

  • Queue.bounded(1000): Backpressure. If 1000 notifications are pending, producers wait.
  • Deferred: One-shot promise. Each notification gets a completion signal.
  • Fiber: Background consumer runs independently, interrupted on shutdown.

Automatic Batching with RequestResolver

When you need to load 50 tasks in a list view, you don't want 50 separate SQL queries. Effect's RequestResolver automatically batches them:

import { Effect, Request, RequestResolver } from "effect"

interface GetTaskById extends Request.Request<Task, NotFoundError> {
  readonly _tag: "GetTaskById"
  readonly id: TaskId
}

const GetTaskById = Request.tagged<GetTaskById>("GetTaskById")

const TaskBatchResolver = RequestResolver.makeBatched(
  (requests: ReadonlyArray<GetTaskById>) =>
    Effect.gen(function* () {
      const repo = yield* TaskRepo
      const ids = requests.map((r) => r.id)
      // ONE query instead of N
      const tasks = yield* repo.findByIds(ids)
      const taskMap = new Map(tasks.map((t) => [t.id, t]))

      for (const req of requests) {
        const task = taskMap.get(req.id)
        if (task) yield* Request.succeed(req, task)
        else yield* Request.fail(req,
          new NotFoundError({ entity: "Task", id: req.id }))
      }
    }),
)

// Usage: these calls are automatically batched!
export const getTasksBatched = (ids: TaskId[]) =>
  Effect.forEach(ids, (id) =>
    Effect.request(GetTaskById({ id }), TaskBatchResolver),
    { concurrency: "unbounded", batching: true }
  )
Enter fullscreen mode Exit fullscreen mode

Call getTasksBatched(["a", "b", "c", ..., "z"]) and Effect will execute one SELECT ... WHERE id IN (...) query instead of 26 individual queries. The batching is transparent to the caller.


Reactive State: Ref, SynchronizedRef, SubscriptionRef

The task board maintains real-time state using three flavors of mutable references:

import {
  Data, Effect, HashMap, HashSet,
  Ref, SubscriptionRef, SynchronizedRef
} from "effect"

export class BoardState extends Data.Class<{
  readonly tasksByStatus: HashMap.HashMap<string, HashSet.HashSet<string>>
  readonly activeViewers: HashSet.HashSet<string>
  readonly lastUpdated: number
}> {}

export const makeBoardState = Effect.gen(function* () {
  // SynchronizedRef: atomic effectful updates
  const boardRef = yield* SynchronizedRef.make(
    new BoardState({
      tasksByStatus: HashMap.empty(),
      activeViewers: HashSet.empty(),
      lastUpdated: Date.now(),
    }),
  )

  // SubscriptionRef: observable — clients subscribe to changes
  const activityRef = yield* SubscriptionRef.make<ActivityEvent>(/* initial */)

  // Ref: simple counter
  const opsCounterRef = yield* Ref.make(0)

  return {
    moveTask: (taskId, fromStatus, toStatus) =>
      pipe(
        SynchronizedRef.updateEffect(boardRef, (state) =>
          Effect.gen(function* () {
            const fromSet = HashMap.get(state.tasksByStatus, fromStatus)
              .pipe(opt => opt._tag === "Some" ? opt.value : HashSet.empty())
            const toSet = HashMap.get(state.tasksByStatus, toStatus)
              .pipe(opt => opt._tag === "Some" ? opt.value : HashSet.empty())
            return new BoardState({
              tasksByStatus: pipe(
                state.tasksByStatus,
                HashMap.set(fromStatus, HashSet.remove(fromSet, taskId)),
                HashMap.set(toStatus, HashSet.add(toSet, taskId)),
              ),
              activeViewers: state.activeViewers,
              lastUpdated: Date.now(),
            })
          }),
        ),
        Effect.tap(() => Ref.update(opsCounterRef, (n) => n + 1)),
      ),

    // Stream of changes for WebSocket push
    activityStream: activityRef.changes,
  }
})
Enter fullscreen mode Exit fullscreen mode
  • Ref: Simple mutable value (like useState but for Effect).
  • SynchronizedRef: Effectful atomic updates. The update function can do async work.
  • SubscriptionRef: Like SynchronizedRef, but consumers can subscribe to .changes, which exposes a Stream of every update.

Streams and Sinks for Data Processing

The analytics service processes events through Stream pipelines and collects results with Sinks:

import { Chunk, Stream, Schedule, Duration, Effect, pipe } from "effect"

export const aggregateEventsStream = (
  events: Stream.Stream<AnalyticsEvent>,
) =>
  pipe(
    events,
    Stream.grouped(100),          // Batch into chunks of 100
    Stream.map((chunk) => {       // Aggregate each batch
      const byProject = new Map<string, { created: number; completed: number }>()
      for (const event of chunk) {
        const projectId = event.metadata.projectId ?? "unknown"
        const current = byProject.get(projectId) ?? { created: 0, completed: 0 }
        if (event.eventType === "task_created") current.created++
        byProject.set(projectId, current)
      }
      return Array.from(byProject.entries()).map(([projectId, stats]) => ({
        projectId, ...stats,
      }))
    }),
    Stream.mapConcat((arr) => arr), // Flatten arrays
  )

// Running totals with stateful transformation
export const runningTotalsStream = (events: Stream.Stream<AnalyticsEvent>) =>
  Stream.mapAccum(events, 0, (total, event) => {
    const newTotal = total + 1
    return [newTotal, { total: newTotal, eventType: event.eventType }]
  })
Enter fullscreen mode Exit fullscreen mode

Sinks consume streams and produce final results:

import { Sink, Effect, pipe } from "effect"

// Count all events
export const countEventsSink = Sink.foldLeft(
  0, (count, _event: AnalyticsEvent) => count + 1
)

// Aggregate by type
export const aggregateByTypeSink = Sink.foldLeft(
  new Map<string, number>(),
  (acc, event: AnalyticsEvent) => {
    acc.set(event.eventType, (acc.get(event.eventType) ?? 0) + 1)
    return acc
  }
)

// Compose: count then log
export const countAndLogSink = pipe(
  countEventsSink,
  Sink.zipRight(Sink.fromEffect(Effect.logInfo("Finished counting events"))),
)
Enter fullscreen mode Exit fullscreen mode

Metrics at Two Levels

We track metrics at two levels. Encore metrics get exported to your cloud provider's monitoring. Effect metrics handle internal runtime observability:

import { Effect, Metric, MetricBoundaries, pipe } from "effect"
import { Counter, Gauge } from "encore.dev/metrics"

// Encore: infrastructure-level metrics
export const tasksCreatedTotal = new Counter("tasks_created_total")
export const activeUsersGauge = new Gauge("active_users")

// Effect: runtime-level metrics
export const taskProcessingDuration = Metric.histogram(
  "effect_task_processing_duration_ms",
  MetricBoundaries.linear({ start: 0, width: 100, count: 10 }),
)

// Helper: measure any effect's duration
export const trackDuration = <A, E>(
  effect: Effect.Effect<A, E>
): Effect.Effect<A, E> =>
  pipe(
    Effect.timed(effect),
    Effect.tap(([duration]) =>
      Metric.update(taskProcessingDuration,
        Number(duration) / 1_000_000)),
    Effect.map(([, result]) => result),
  )
Enter fullscreen mode Exit fullscreen mode

Encore metrics appear in your cloud dashboard automatically. Effect metrics power internal observability: histogram boundaries, percentile tracking, fiber-level instrumentation.


Authentication: Gateway + Effect

Encore's authHandler + Gateway integrate well with Effect's Option and Either:

import { Effect, Either, Option, pipe } from "effect"
import { Gateway, type Header } from "encore.dev/api"
import { authHandler } from "encore.dev/auth"

const validateToken = (rawHeader: string) =>
  Effect.gen(function* () {
    // Extract Bearer token using Option
    const token = pipe(
      rawHeader,
      Option.liftPredicate((h) => h.startsWith("Bearer ")),
      Option.map((h) => h.slice(7)),
      Option.getOrElse(() => rawHeader),
    )

    if (!token) {
      return yield* new AuthenticationError({ reason: "Missing token" })
    }

    const tokenService = yield* TokenService
    const payload = yield* tokenService.verify(token)
    return { userID: payload.userId, role: payload.role, email: payload.email }
  })

export const auth = authHandler<AuthParams, AuthData>(async (params) => {
  const program = pipe(
    validateToken(params.authorization),
    Effect.provide(TokenServiceLive),
  )

  const result = await Effect.runPromise(Effect.either(program))

  return Either.match(result, {
    onLeft: (error) => { throw new Error(error.reason) },
    onRight: (data) => data,
  })
})

export const gateway = new Gateway({ authHandler: auth })
Enter fullscreen mode Exit fullscreen mode

TokenService is a Context.Tag + Layer that wraps Encore's secret() for JWT signing:

import { secret } from "encore.dev/config"
const jwtSecret = secret("JWTSecret")

export const TokenServiceLive = Layer.succeed(TokenService, {
  sign: (payload) => Effect.try(() => { /* HMAC-SHA256 signing */ }),
  verify: (token) => Effect.try(() => { /* Validate + decode */ }),
})
Enter fullscreen mode Exit fullscreen mode

PubSub: Cross-Service Events

Encore Topics define cross-service event channels. Subscribers in other services react to them:

// In task/task-api.ts — Topics MUST be defined in services
import { Topic } from "encore.dev/pubsub"

export const taskCreatedTopic = new Topic<TaskCreatedEvent>("task-created", {
  deliveryGuarantee: "at-least-once",
})

// Publishing after creating a task:
yield* Effect.tryPromise({
  try: () => taskCreatedTopic.publish({
    taskId: task.id,
    projectId: task.projectId,
    userId: authData.userID,
    title: task.title,
    priority: task.priority,
  }),
  catch: () => new Error("publish failed"),
}).pipe(Effect.catchAll(() =>
  Effect.logWarning("Failed to publish task created event")))
Enter fullscreen mode Exit fullscreen mode
// In notification/notification-subscribers.ts
import { Subscription } from "encore.dev/pubsub"
import { taskCreatedTopic } from "../task/task-api.js"

const _ = new Subscription(taskCreatedTopic, "notify-task-created", {
  handler: async (event) => {
    // Create notification for assignees, watchers, etc.
    await runEffect(
      Effect.gen(function* () {
        const service = yield* NotificationService
        yield* service.createFromEvent({ /* ... */ })
      }).pipe(Effect.provide(NotificationServiceLive))
    )
  },
  retryPolicy: { minBackoff: "5s", maxBackoff: "1m", maxRetries: 5 },
})
Enter fullscreen mode Exit fullscreen mode

Conclusion

Effect.ts and Encore.ts solve different problems that, together, cover the full stack of backend concerns:

Concern Effect.ts Encore.ts
Error handling Typed errors, Cause analysis APIError + HTTP codes
Infrastructure Auto-provisioned DBs, PubSub, Buckets
Concurrency Fibers, Semaphores, Queues
Validation Schema transforms, branded types HTTP-level validation
Observability Effect Metrics, withSpan Counter, Gauge, traces
Streaming Stream, Sink api.streamOut, api.streamInOut
Scheduling Schedule combinators CronJob
Auth authHandler, Gateway
State Ref, SynchronizedRef
Batching RequestResolver

The bridge pattern (runEffect) is the key that unlocks the combination. Once you have that, every Encore endpoint becomes a thin shell around an Effect program, and you get the best of both worlds.

The full source code is available at GitHub repo link. Clone it, run encore run, and you'll have 7 services with auto-provisioned PostgreSQL databases, PubSub topics, and object storage running locally in seconds.

If you're building TypeScript backends and haven't tried either of these frameworks, start with one. If you're already using one, adding the other is surprisingly smooth.


*Have questions or want to discuss the architecture? Drop a comment below or find me on https://x.com/leomarciano *

Built with Effect.ts v3.19.19 and Encore.ts v1.55.0

Top comments (3)

Collapse
 
marcuskohlberg profile image
Marcus Kohlberg

🙌

Collapse
 
stefan_ekerfeldt_ddfe39d0 profile image
stefan ekerfeldt

Beautiful

Collapse
 
ivandotcodes profile image
Ivan

this changes everything