Introduction
What if your backend could have typed errors that compile down to HTTP status codes, automatic request batching, fiber-based concurrency with semaphores and queues, and auto-provisioned databases, PubSub, and object storage... all in TypeScript?
That's what happens when you combine Effect.ts and Encore.ts.
Effect.ts gives you a functional programming toolkit for TypeScript: composable effects, typed errors, dependency injection via layers, streams, concurrency primitives (fibers, semaphores, queues), and a schema system that makes Zod look like a toy.
Encore.ts gives you infrastructure-as-code: declare a SQLDatabase, Topic, Bucket, or CronJob in your TypeScript code, and Encore provisions it automatically. Locally, in preview environments, and in production. No Terraform(Must read: The Last Year of Terraform), no Docker Compose, no YAML.
Together, they're remarkably complementary. Effect handles the how (composition, error handling, concurrency), Encore handles the what (databases, messaging, auth, deployment).
In this article, I'll walk you through a Task Management System (think mini Jira/Linear) we built with 7 microservices, covering virtually every major feature of both frameworks. This isn't a toy example. It's a reference architecture you can study, fork, and build on.
System Architecture
Here's the bird's-eye view of our system:
Client Request
│
▼
┌─────────┐ ┌──────┐
│ Gateway │────▶│ auth │ validates JWT token
└────┬────┘ └──────┘
│
▼
┌────────────┐
│ Middleware │ enriches request (timing/tracing)
└─────┬──────┘
│
├──▶ [user] ◀── service-to-service ── [project], [auth]
│ ├── users DB
│ └── Effect Cache (permissions)
│
├──▶ [project]
│ └── projects DB + project_members
│
├──▶ [task] ──▶ publishes to PubSub topics
│ ├── tasks DB + comments
│ ├── CronJob (stale task check)
│ └── WebSocket streams (activity feed)
│
├──▶ [notification] ◀── subscribes to PubSub topics
│ ├── notifications DB
│ ├── Queue + Deferred (async dispatch)
│ └── WebSocket stream (real-time push)
│
├──▶ [analytics] ◀── subscribes to PubSub topics
│ ├── analytics DB (events + daily_stats)
│ ├── Encore Metrics (Counter, Gauge)
│ └── Effect Stream/Sink (aggregation pipelines)
│
└──▶ [storage]
├── Encore Bucket (object storage)
└── Effect Semaphore (rate limiting)
Each service has its own encore.service.ts, its own database (auto-provisioned by Encore), and communicates via PubSub topics for async events or direct service-to-service calls for synchronous operations.
The Bridge: Connecting Effect.ts to Encore.ts
This is the single most important pattern in the entire codebase. Encore expects your API handlers to be async functions that return Promises. Effect programs return Effect<A, E> values. We need a bridge.
Here's runEffect. Every Encore endpoint calls this:
import { Cause, Chunk, Effect, Exit, Match, pipe } from "effect"
import { APIError } from "encore.dev/api"
import type { AppError } from "./errors.js"
export const runEffect = async <A, E extends AppError>(
effect: Effect.Effect<A, E>,
): Promise<A> => {
const exit = await Effect.runPromiseExit(
effect.pipe(
Effect.withSpan("encore-handler"),
Effect.annotateLogs("runtime", "encore"),
),
)
if (Exit.isSuccess(exit)) {
return exit.value
}
const cause = exit.cause
// Expected errors → translate to APIError
if (Cause.isFailType(cause)) {
throw effectErrorToAPIError(cause.error as AppError)
}
// Fiber interruption → aborted
if (Cause.isInterruptType(cause)) {
throw APIError.aborted("Request was interrupted")
}
// Defects (bugs) → internal error
if (Cause.isDieType(cause)) {
console.error("Unexpected defect:", cause.defect)
throw APIError.internal("Internal server error")
}
// Composite causes → extract first failure
const failures = Chunk.toArray(Cause.failures(cause))
if (failures.length > 0) {
throw effectErrorToAPIError(failures[0] as AppError)
}
throw APIError.internal("Unknown error")
}
The key insight: Effect's Exit type distinguishes between expected failures (your domain errors), defects (bugs/panics), and interruptions (cancelled fibers). We handle each differently.
The error translation uses Effect's Match module for exhaustive mapping:
export const effectErrorToAPIError = (error: AppError): APIError =>
pipe(
error,
Match.value,
Match.tag("NotFoundError", (e) =>
APIError.notFound(`${e.entity} '${e.id}' not found`)),
Match.tag("ValidationError", (e) =>
APIError.invalidArgument(`${e.field}: ${e.message}`)),
Match.tag("AuthenticationError", (e) =>
APIError.unauthenticated(e.reason)),
Match.tag("AuthorizationError", (e) =>
APIError.permissionDenied(
`User '${e.userId}' cannot '${e.action}' on '${e.resource}'`)),
Match.tag("RateLimitError", (e) =>
APIError.resourceExhausted(`Rate limited. Retry after ${e.retryAfter}s`)),
Match.tag("DatabaseError", (e) =>
APIError.internal(`Database error in '${e.operation}'`)),
Match.tag("InvalidStateTransitionError", (e) =>
APIError.failedPrecondition(
`Cannot transition ${e.entity} from '${e.from}' to '${e.to}'`)),
// ... all 10 error types mapped
Match.exhaustive, // Compiler error if you miss one!
)
Match.exhaustive is the magic. If you add a new error type to AppError but forget to handle it here, TypeScript won't compile. No more silent 500s from unhandled error cases.
And this is how every Encore endpoint looks:
import { api } from "encore.dev/api"
import { Effect } from "effect"
import { TaskService, TaskServiceLive } from "./task-service.js"
import { runEffect } from "../shared/effect-encore-bridge.js"
export const getTask = api(
{ expose: true, method: "GET", path: "/tasks/:id", auth: true },
async ({ id }: { id: string }): Promise<TaskResponse> =>
runEffect(
Effect.gen(function* () {
const service = yield* TaskService
return yield* service.getById(TaskId(id))
}).pipe(Effect.provide(TaskServiceLive))
)
)
Clean, consistent, and every error is automatically translated to the correct HTTP status code.
Type Safety with Branded Types
A UserId is a string. A ProjectId is also a string. But they are not the same thing. Effect's Brand module prevents you from accidentally passing one where the other is expected:
import { Brand, Order, Equivalence, Equal, Hash } from "effect"
export type UserId = string & Brand.Brand<"UserId">
export const UserId = Brand.nominal<UserId>()
export type ProjectId = string & Brand.Brand<"ProjectId">
export const ProjectId = Brand.nominal<ProjectId>()
export type TaskId = string & Brand.Brand<"TaskId">
export const TaskId = Brand.nominal<TaskId>()
Now service.getById(TaskId("abc")) compiles, but service.getById(UserId("abc")) is a type error. This catches entire classes of bugs at compile time. Imagine an accidental WHERE user_id = $projectId query that silently returns nothing.
We also get custom ordering for sorting:
export type TaskPriority = "critical" | "high" | "medium" | "low"
const priorityRank: Record<TaskPriority, number> = {
critical: 0, high: 1, medium: 2, low: 3,
}
export const TaskPriorityOrder: Order.Order<TaskPriority> =
Order.mapInput(Order.number, (p: TaskPriority) => priorityRank[p])
And custom Equal + Hash implementations for use in HashMap/HashSet:
export class ProjectMemberKey implements Equal.Equal {
constructor(readonly projectId: ProjectId, readonly userId: UserId) {}
[Equal.symbol](that: Equal.Equal): boolean {
if (that instanceof ProjectMemberKey) {
return this.projectId === that.projectId && this.userId === that.userId
}
return false
}
[Hash.symbol](): number {
return Hash.combine(Hash.hash(this.projectId))(Hash.hash(this.userId))
}
}
Typed Error Hierarchy
Every error in the system extends Data.TaggedError, making them yieldable inside Effect.gen generators:
import { Data } from "effect"
export class NotFoundError extends Data.TaggedError("NotFoundError")<{
readonly entity: string
readonly id: string
}> {}
export class ValidationError extends Data.TaggedError("ValidationError")<{
readonly field: string
readonly message: string
}> {}
export class InvalidStateTransitionError extends Data.TaggedError(
"InvalidStateTransitionError"
)<{
readonly entity: string
readonly from: string
readonly to: string
}> {}
// ... 10 total error types
export type AppError =
| DatabaseError | NotFoundError | ValidationError
| AuthenticationError | AuthorizationError | ConflictError
| RateLimitError | ExternalServiceError | TimeoutError
| InvalidStateTransitionError
The discriminated union AppError is what makes Match.exhaustive work in the bridge. Each error carries structured context (not just a message string), so the API response can be precise:
| TaggedError | HTTP Status | Example Message |
|---|---|---|
NotFoundError |
404 | Task 'abc-123' not found |
ValidationError |
400 | title: Must be at least 1 character |
AuthenticationError |
401 | Invalid or expired token |
AuthorizationError |
403 | User 'u1' cannot 'delete' on 'Project' |
ConflictError |
409 | User: Email already exists |
RateLimitError |
429 | Rate limited. Retry after 30s |
InvalidStateTransitionError |
400 | Cannot transition Task from 'done' to 'in_progress' |
DatabaseError |
500 | Database error in 'queryRow' |
ExternalServiceError |
503 | Service 'object-storage' unavailable |
TimeoutError |
504 | 'queryRow' timed out after 5000ms |
Schema: Bidirectional Validation
Effect Schema handles two things that Encore's built-in validation doesn't: domain object validation and database row transformation.
Encore validates at the HTTP boundary (email format, string length). Effect Schema validates at the domain boundary and transforms between database rows (snake_case) and domain objects (camelCase):
import { Schema } from "effect"
export class TaskSchema extends Schema.Class<TaskSchema>("TaskSchema")({
id: Schema.String.pipe(Schema.brand("TaskId")),
title: Schema.String.pipe(
Schema.minLength(1), Schema.maxLength(200),
Schema.annotations({ description: "Task title" })
),
status: Schema.optionalWith(TaskStatusSchema, {
default: () => "backlog" as const
}),
priority: Schema.optionalWith(TaskPrioritySchema, {
default: () => "medium" as const
}),
storyPoints: Schema.OptionFromNullOr(
Schema.Number.pipe(Schema.int(), Schema.between(1, 21))
),
// ...
}) {}
The bidirectional transform between DB rows and domain objects:
export const TaskFromRow = Schema.transform(
// From: database row (snake_case)
Schema.Struct({
id: Schema.String,
title: Schema.String,
assignee_id: Schema.NullOr(Schema.String),
project_id: Schema.String,
created_at: Schema.String,
// ...
}),
// To: domain object (camelCase + branded)
Schema.Struct({
id: Schema.String.pipe(Schema.brand("TaskId")),
title: Schema.String,
assigneeId: Schema.NullOr(Schema.String.pipe(Schema.brand("UserId"))),
projectId: Schema.String.pipe(Schema.brand("ProjectId")),
createdAt: Schema.String,
// ...
}),
{
strict: true,
decode: (row) => ({
id: row.id as any,
title: row.title,
assigneeId: row.assignee_id as any,
projectId: row.project_id as any,
createdAt: row.created_at,
}),
encode: (task) => ({
id: task.id,
title: task.title,
assignee_id: task.assigneeId,
project_id: task.projectId,
created_at: task.createdAt,
}),
}
)
Passwords use Schema.Redacted so they never appear in logs:
export const PasswordHashSchema = Schema.Redacted(Schema.String)
Resilience: Retry, Timeout, Schedule
Every database query in the system goes through a retry + timeout wrapper:
import { Effect, Schedule, Duration, pipe, Option, Chunk } from "effect"
// Exponential backoff (100ms, 200ms, 400ms) + jitter + max 3 retries
export const dbRetrySchedule = Schedule.exponential("100 millis").pipe(
Schedule.intersect(Schedule.recurs(3)),
Schedule.jittered
)
export const queryRowEffect = <T>(
db: SQLDatabase,
queryFn: () => Promise<T | null>
): Effect.Effect<Option.Option<T>, DatabaseError> =>
pipe(
Effect.tryPromise({
try: () => queryFn(),
catch: (e) => new DatabaseError({ operation: "queryRow", cause: e }),
}),
Effect.map(Option.fromNullable),
Effect.retry(dbRetrySchedule),
Effect.timeout(Duration.seconds(5)),
Effect.catchTag("TimeoutException", () =>
Effect.fail(new DatabaseError({
operation: "queryRow",
cause: "Query timed out after 5s"
}))
)
)
Schedule.intersect combines two schedules: "exponential backoff" AND "max 3 attempts". Schedule.jittered adds randomness to prevent thundering herd problems when multiple requests retry simultaneously.
The task scheduler demonstrates more advanced Schedule combinators:
import { CronJob } from "encore.dev/cron"
import { Schedule } from "effect"
// Encore CronJob triggers the endpoint every hour
const _ = new CronJob("stale-task-check", {
title: "Check for stale tasks",
every: "1h",
endpoint: checkStaleTasks,
})
// Effect Schedule for retry within the job
const staleCheckSchedule = Schedule.exponential("1 second").pipe(
Schedule.jittered,
Schedule.intersect(Schedule.recurs(3)),
Schedule.union(Schedule.spaced(Duration.seconds(30))),
)
State Machine with Match
Task status transitions follow strict rules. You can't go from backlog directly to done. The state machine is implemented with Effect's Match module:
import { Data, Match } from "effect"
class StateTransition extends Data.Class<{
readonly from: string
readonly to: string
}> {}
const isValidTransition = (from: string, to: string): boolean =>
Match.value(new StateTransition({ from, to })).pipe(
Match.when({ from: "backlog", to: "todo" }, () => true),
Match.when({ from: "todo", to: "in_progress" }, () => true),
Match.when({ from: "todo", to: "cancelled" }, () => true),
Match.when({ from: "in_progress", to: "in_review" }, () => true),
Match.when({ from: "in_progress", to: "todo" }, () => true),
Match.when({ from: "in_review", to: "done" }, () => true),
Match.when({ from: "in_review", to: "in_progress" }, () => true),
Match.when({ from: "done", to: "todo" }, () => true), // reopen
Match.orElse(() => false),
)
Used in the transition method:
transition: (id, newStatus) =>
Effect.gen(function* () {
const task = yield* repo.findById(id)
const fromStatus = task.status
if (!isValidTransition(fromStatus, newStatus)) {
return yield* Effect.fail(
new InvalidStateTransitionError({
entity: "Task", from: fromStatus, to: newStatus,
})
)
}
const updated = yield* repo.update(id, { status: newStatus })
return { task: updated, fromStatus }
}).pipe(Effect.withSpan("TaskService.transition")),
Bulk updates use error accumulation. Each update is independent, and failures don't block successes:
bulkUpdateStatus: (updates) =>
Effect.forEach(
updates,
(u) =>
pipe(
Effect.gen(function* () {
const task = yield* repo.findById(u.id)
if (!isValidTransition(task.status, u.status)) {
return yield* Effect.fail(
new InvalidStateTransitionError({
entity: "Task", from: task.status, to: u.status,
})
)
}
return yield* repo.update(u.id, { status: u.status })
}),
Effect.either, // Convert to Either: Right(task) | Left(error)
),
{ concurrency: 5 }, // Process 5 at a time
),
The API returns { succeeded: Task[], failed: { id, error }[] } so the caller sees exactly what worked and what didn't.
Concurrency and Rate Limiting
Semaphore: Controlling Upload Concurrency
The storage service limits concurrent uploads to 10 using an Effect Semaphore:
import { Effect, Duration, pipe } from "effect"
export const makeUploadLimiter = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(10)
return {
withLimit: <A, E>(
effect: Effect.Effect<A, E>
): Effect.Effect<A, E | RateLimitError> =>
pipe(
semaphore.withPermits(1)(effect),
Effect.timeout(Duration.seconds(30)),
Effect.catchTag("TimeoutException", () =>
Effect.fail(new RateLimitError({ retryAfter: 30 }))
)
),
}
})
If all 10 permits are taken, the 11th upload waits. If it waits more than 30 seconds, it gets a 429 Too Many Requests response.
Queue + Deferred: Async Notification Dispatch
The notification service uses a bounded queue with completion signaling:
import { Deferred, Effect, Fiber, Queue } from "effect"
export const makeNotificationDispatcher = Effect.gen(function* () {
const queue = yield* Queue.bounded<PendingNotification>(1000)
const startGate = yield* Deferred.make<void>()
// Background consumer fiber
const consumer = yield* Effect.fork(
Effect.gen(function* () {
yield* Deferred.await(startGate) // Wait until ready
yield* Effect.forever(
Effect.gen(function* () {
const notification = yield* Queue.take(queue)
const result = yield* pipe(
Effect.logInfo(`Dispatching to ${notification.userId}`),
Effect.either,
)
// Signal completion back to the producer
if (result._tag === "Right") {
yield* Deferred.succeed(notification.completion, undefined)
} else {
yield* Deferred.fail(notification.completion, /* error */)
}
}),
)
}),
)
return {
enqueue: (input) =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<void, ExternalServiceError>()
yield* Queue.offer(queue, { ...input, completion: deferred })
return yield* Deferred.await(deferred) // Wait for processing
}),
start: Deferred.succeed(startGate, undefined),
shutdown: pipe(Queue.shutdown(queue), Effect.flatMap(() => Fiber.interrupt(consumer))),
}
})
Three primitives working together:
- Queue.bounded(1000): Backpressure. If 1000 notifications are pending, producers wait.
- Deferred: One-shot promise. Each notification gets a completion signal.
- Fiber: Background consumer runs independently, interrupted on shutdown.
Automatic Batching with RequestResolver
When you need to load 50 tasks in a list view, you don't want 50 separate SQL queries. Effect's RequestResolver automatically batches them:
import { Effect, Request, RequestResolver } from "effect"
interface GetTaskById extends Request.Request<Task, NotFoundError> {
readonly _tag: "GetTaskById"
readonly id: TaskId
}
const GetTaskById = Request.tagged<GetTaskById>("GetTaskById")
const TaskBatchResolver = RequestResolver.makeBatched(
(requests: ReadonlyArray<GetTaskById>) =>
Effect.gen(function* () {
const repo = yield* TaskRepo
const ids = requests.map((r) => r.id)
// ONE query instead of N
const tasks = yield* repo.findByIds(ids)
const taskMap = new Map(tasks.map((t) => [t.id, t]))
for (const req of requests) {
const task = taskMap.get(req.id)
if (task) yield* Request.succeed(req, task)
else yield* Request.fail(req,
new NotFoundError({ entity: "Task", id: req.id }))
}
}),
)
// Usage: these calls are automatically batched!
export const getTasksBatched = (ids: TaskId[]) =>
Effect.forEach(ids, (id) =>
Effect.request(GetTaskById({ id }), TaskBatchResolver),
{ concurrency: "unbounded", batching: true }
)
Call getTasksBatched(["a", "b", "c", ..., "z"]) and Effect will execute one SELECT ... WHERE id IN (...) query instead of 26 individual queries. The batching is transparent to the caller.
Reactive State: Ref, SynchronizedRef, SubscriptionRef
The task board maintains real-time state using three flavors of mutable references:
import {
Data, Effect, HashMap, HashSet,
Ref, SubscriptionRef, SynchronizedRef
} from "effect"
export class BoardState extends Data.Class<{
readonly tasksByStatus: HashMap.HashMap<string, HashSet.HashSet<string>>
readonly activeViewers: HashSet.HashSet<string>
readonly lastUpdated: number
}> {}
export const makeBoardState = Effect.gen(function* () {
// SynchronizedRef: atomic effectful updates
const boardRef = yield* SynchronizedRef.make(
new BoardState({
tasksByStatus: HashMap.empty(),
activeViewers: HashSet.empty(),
lastUpdated: Date.now(),
}),
)
// SubscriptionRef: observable — clients subscribe to changes
const activityRef = yield* SubscriptionRef.make<ActivityEvent>(/* initial */)
// Ref: simple counter
const opsCounterRef = yield* Ref.make(0)
return {
moveTask: (taskId, fromStatus, toStatus) =>
pipe(
SynchronizedRef.updateEffect(boardRef, (state) =>
Effect.gen(function* () {
const fromSet = HashMap.get(state.tasksByStatus, fromStatus)
.pipe(opt => opt._tag === "Some" ? opt.value : HashSet.empty())
const toSet = HashMap.get(state.tasksByStatus, toStatus)
.pipe(opt => opt._tag === "Some" ? opt.value : HashSet.empty())
return new BoardState({
tasksByStatus: pipe(
state.tasksByStatus,
HashMap.set(fromStatus, HashSet.remove(fromSet, taskId)),
HashMap.set(toStatus, HashSet.add(toSet, taskId)),
),
activeViewers: state.activeViewers,
lastUpdated: Date.now(),
})
}),
),
Effect.tap(() => Ref.update(opsCounterRef, (n) => n + 1)),
),
// Stream of changes for WebSocket push
activityStream: activityRef.changes,
}
})
-
Ref: Simple mutable value (likeuseStatebut for Effect). -
SynchronizedRef: Effectful atomic updates. The update function can do async work. -
SubscriptionRef: Like SynchronizedRef, but consumers can subscribe to.changes, which exposes aStreamof every update.
Streams and Sinks for Data Processing
The analytics service processes events through Stream pipelines and collects results with Sinks:
import { Chunk, Stream, Schedule, Duration, Effect, pipe } from "effect"
export const aggregateEventsStream = (
events: Stream.Stream<AnalyticsEvent>,
) =>
pipe(
events,
Stream.grouped(100), // Batch into chunks of 100
Stream.map((chunk) => { // Aggregate each batch
const byProject = new Map<string, { created: number; completed: number }>()
for (const event of chunk) {
const projectId = event.metadata.projectId ?? "unknown"
const current = byProject.get(projectId) ?? { created: 0, completed: 0 }
if (event.eventType === "task_created") current.created++
byProject.set(projectId, current)
}
return Array.from(byProject.entries()).map(([projectId, stats]) => ({
projectId, ...stats,
}))
}),
Stream.mapConcat((arr) => arr), // Flatten arrays
)
// Running totals with stateful transformation
export const runningTotalsStream = (events: Stream.Stream<AnalyticsEvent>) =>
Stream.mapAccum(events, 0, (total, event) => {
const newTotal = total + 1
return [newTotal, { total: newTotal, eventType: event.eventType }]
})
Sinks consume streams and produce final results:
import { Sink, Effect, pipe } from "effect"
// Count all events
export const countEventsSink = Sink.foldLeft(
0, (count, _event: AnalyticsEvent) => count + 1
)
// Aggregate by type
export const aggregateByTypeSink = Sink.foldLeft(
new Map<string, number>(),
(acc, event: AnalyticsEvent) => {
acc.set(event.eventType, (acc.get(event.eventType) ?? 0) + 1)
return acc
}
)
// Compose: count then log
export const countAndLogSink = pipe(
countEventsSink,
Sink.zipRight(Sink.fromEffect(Effect.logInfo("Finished counting events"))),
)
Metrics at Two Levels
We track metrics at two levels. Encore metrics get exported to your cloud provider's monitoring. Effect metrics handle internal runtime observability:
import { Effect, Metric, MetricBoundaries, pipe } from "effect"
import { Counter, Gauge } from "encore.dev/metrics"
// Encore: infrastructure-level metrics
export const tasksCreatedTotal = new Counter("tasks_created_total")
export const activeUsersGauge = new Gauge("active_users")
// Effect: runtime-level metrics
export const taskProcessingDuration = Metric.histogram(
"effect_task_processing_duration_ms",
MetricBoundaries.linear({ start: 0, width: 100, count: 10 }),
)
// Helper: measure any effect's duration
export const trackDuration = <A, E>(
effect: Effect.Effect<A, E>
): Effect.Effect<A, E> =>
pipe(
Effect.timed(effect),
Effect.tap(([duration]) =>
Metric.update(taskProcessingDuration,
Number(duration) / 1_000_000)),
Effect.map(([, result]) => result),
)
Encore metrics appear in your cloud dashboard automatically. Effect metrics power internal observability: histogram boundaries, percentile tracking, fiber-level instrumentation.
Authentication: Gateway + Effect
Encore's authHandler + Gateway integrate well with Effect's Option and Either:
import { Effect, Either, Option, pipe } from "effect"
import { Gateway, type Header } from "encore.dev/api"
import { authHandler } from "encore.dev/auth"
const validateToken = (rawHeader: string) =>
Effect.gen(function* () {
// Extract Bearer token using Option
const token = pipe(
rawHeader,
Option.liftPredicate((h) => h.startsWith("Bearer ")),
Option.map((h) => h.slice(7)),
Option.getOrElse(() => rawHeader),
)
if (!token) {
return yield* new AuthenticationError({ reason: "Missing token" })
}
const tokenService = yield* TokenService
const payload = yield* tokenService.verify(token)
return { userID: payload.userId, role: payload.role, email: payload.email }
})
export const auth = authHandler<AuthParams, AuthData>(async (params) => {
const program = pipe(
validateToken(params.authorization),
Effect.provide(TokenServiceLive),
)
const result = await Effect.runPromise(Effect.either(program))
return Either.match(result, {
onLeft: (error) => { throw new Error(error.reason) },
onRight: (data) => data,
})
})
export const gateway = new Gateway({ authHandler: auth })
TokenService is a Context.Tag + Layer that wraps Encore's secret() for JWT signing:
import { secret } from "encore.dev/config"
const jwtSecret = secret("JWTSecret")
export const TokenServiceLive = Layer.succeed(TokenService, {
sign: (payload) => Effect.try(() => { /* HMAC-SHA256 signing */ }),
verify: (token) => Effect.try(() => { /* Validate + decode */ }),
})
PubSub: Cross-Service Events
Encore Topics define cross-service event channels. Subscribers in other services react to them:
// In task/task-api.ts — Topics MUST be defined in services
import { Topic } from "encore.dev/pubsub"
export const taskCreatedTopic = new Topic<TaskCreatedEvent>("task-created", {
deliveryGuarantee: "at-least-once",
})
// Publishing after creating a task:
yield* Effect.tryPromise({
try: () => taskCreatedTopic.publish({
taskId: task.id,
projectId: task.projectId,
userId: authData.userID,
title: task.title,
priority: task.priority,
}),
catch: () => new Error("publish failed"),
}).pipe(Effect.catchAll(() =>
Effect.logWarning("Failed to publish task created event")))
// In notification/notification-subscribers.ts
import { Subscription } from "encore.dev/pubsub"
import { taskCreatedTopic } from "../task/task-api.js"
const _ = new Subscription(taskCreatedTopic, "notify-task-created", {
handler: async (event) => {
// Create notification for assignees, watchers, etc.
await runEffect(
Effect.gen(function* () {
const service = yield* NotificationService
yield* service.createFromEvent({ /* ... */ })
}).pipe(Effect.provide(NotificationServiceLive))
)
},
retryPolicy: { minBackoff: "5s", maxBackoff: "1m", maxRetries: 5 },
})
Conclusion
Effect.ts and Encore.ts solve different problems that, together, cover the full stack of backend concerns:
| Concern | Effect.ts | Encore.ts |
|---|---|---|
| Error handling | Typed errors, Cause analysis | APIError + HTTP codes |
| Infrastructure | — | Auto-provisioned DBs, PubSub, Buckets |
| Concurrency | Fibers, Semaphores, Queues | — |
| Validation | Schema transforms, branded types | HTTP-level validation |
| Observability | Effect Metrics, withSpan | Counter, Gauge, traces |
| Streaming | Stream, Sink | api.streamOut, api.streamInOut |
| Scheduling | Schedule combinators | CronJob |
| Auth | — | authHandler, Gateway |
| State | Ref, SynchronizedRef | — |
| Batching | RequestResolver | — |
The bridge pattern (runEffect) is the key that unlocks the combination. Once you have that, every Encore endpoint becomes a thin shell around an Effect program, and you get the best of both worlds.
The full source code is available at GitHub repo link. Clone it, run encore run, and you'll have 7 services with auto-provisioned PostgreSQL databases, PubSub topics, and object storage running locally in seconds.
If you're building TypeScript backends and haven't tried either of these frameworks, start with one. If you're already using one, adding the other is surprisingly smooth.
*Have questions or want to discuss the architecture? Drop a comment below or find me on https://x.com/leomarciano *
Built with Effect.ts v3.19.19 and Encore.ts v1.55.0



Top comments (3)
🙌
Beautiful
this changes everything