What Is an Atomic Transaction?
Before we begin, let’s define atomic transaction clearly:
“It is a protective wrapper around multiple state updates that guarantees the whole operation either succeeds completely or has no effect at all.”
Inside an atomic transaction, you can perform multiple set() calls, and even cross multiple await boundaries. Only when the entire operation succeeds do we commit everything at once and rerun effects with a single flush. If anything fails halfway through, all touched signals are restored to their pre-transaction values, and the error state is never pushed outward.
This is different from a regular batch / transaction, which only coalesces reruns. An atomic transaction adds rollback semantics:
“Commit everything once on success; undo everything on failure.”
At the same time, it does not change the mental model of our core runtime: computed stays lazy, and the dependency graph remains intact.
Behavior Definition
Now that the concept is clear, let’s define the behavior we want to implement:
Success
All signal.set() calls inside the transaction are committed, and when the outermost level exits, we call flushJobs() once, so our effects rerun only once.
Failure (throw / reject)
All affected signals are restored to the values they had when entering the current transaction level. We do not flush, so invalid intermediate state is never pushed into effects. Downstream computed nodes are marked as stale and will lazily recompute on the next read.
Nested transactions
Each level maintains its own write log.
- If an inner transaction succeeds, its log is merged into the outer transaction, while preserving the outermost original value.
- If an inner transaction fails, only the inner transaction is rolled back. The outer transaction may either catch the error and continue, or let it propagate upward.
Equality semantics
Consistent with the core runtime, Object.is(prev, next) means “no change”: no log entry, no scheduling.
API and File Structure
Add the following to scheduler.ts:
-
atomic(fn): atomic transaction, supports both sync and async flows -
inAtomic(): whether execution is currently inside any atomic level -
recordAtomicWrite(node, prev): records the previous value the first time a node is written in the current level -
scheduleJob: reuse the existing batch / transaction gating logic, usingbatchDepthto decide whether to defer the microtask
And in signal.set(), at the exact moment when the equality check passes and we are certain the write will happen, insert:
if (inAtomic()) recordAtomicWrite(node, prev)
Extending scheduler.ts
import { markStale } from "./computed.js";
import type { Node } from "./graph.js";
export interface Schedulable { run(): void; disposed?: boolean }
// Internal node shape used by signal/computed
export type InternalNode<T = unknown> = { value: T };
// Write log for atomic transactions
type WriteLog = Map<(Node & InternalNode<unknown>), unknown>;
const queue = new Set<Schedulable>();
let scheduled = false;
// > 0 means we are inside batch/transaction mode (delay microtask flushing)
let batchDepth = 0;
// Atomic transaction depth and log stack
let atomicDepth = 0;
const atomicLogs: WriteLog[] = [];
// Mute scheduling during rollback to prevent scheduleJob from creating new work
let muted = 0;
export function scheduleJob(job: Schedulable) {
if (job.disposed) return;
queue.add(job);
if (!scheduled && batchDepth === 0) {
scheduled = true;
queueMicrotask(flushJobs);
}
}
export function batch<T>(fn: () => T): T {
batchDepth++;
try {
return fn();
} finally {
batchDepth--;
if (batchDepth === 0) flushJobs();
}
}
// Promise detection
function isPromiseLike<T = unknown>(v: any): v is PromiseLike<T> {
return v != null && typeof v.then === "function";
}
export function transaction<T>(fn: () => T): T;
export function transaction<T>(fn: () => Promise<T>): Promise<T>;
export function transaction<T>(fn: () => T | Promise<T>): T | Promise<T> {
batchDepth++;
try {
const out = fn();
if (isPromiseLike<T>(out)) {
return Promise.resolve(out).finally(() => {
batchDepth--;
if (batchDepth === 0) flushJobs();
});
}
batchDepth--;
if (batchDepth === 0) flushJobs();
return out as T;
} catch (e) {
batchDepth--;
if (batchDepth === 0) flushJobs();
throw e;
}
}
// Atomic transaction (with rollback)
export function inAtomic() {
return atomicDepth > 0;
}
// Record the "first write in this level"; called by signal.set() when a write is confirmed
export function recordAtomicWrite<T>(node: Node & InternalNode<T>, prevValue: T) {
const log = atomicLogs[atomicLogs.length - 1];
if (!log) return; // safety guard: no active atomic layer
if (!log.has(node)) log.set(node, prevValue);
}
function writeNodeValue<T>(node: Node & InternalNode<T>, v: T) {
if ("value" in node) (node as { value: T }).value = v;
}
function mergeChildIntoParent(child: WriteLog, parent: WriteLog) {
for (const [node, prev] of child) {
if (!parent.has(node)) parent.set(node, prev);
}
}
export function atomic<T>(fn: () => T): T;
export function atomic<T>(fn: () => Promise<T>): Promise<T>;
export function atomic<T>(fn: () => T | Promise<T>): T | Promise<T> {
// Enter atomic layer: suppress flushing (shared batchDepth), start write logging
batchDepth++;
atomicDepth++;
atomicLogs.push(new Map<(Node & InternalNode<unknown>), unknown>());
const exitCommit = () => {
const log = atomicLogs.pop()!;
atomicDepth--;
// Inner success -> merge first-seen old values into parent
if (atomicDepth > 0) {
mergeChildIntoParent(log, atomicLogs[atomicLogs.length - 1]!);
}
// Only flush when the outermost layer exits
batchDepth--;
if (batchDepth === 0) flushJobs();
};
const exitRollback = () => {
const log = atomicLogs.pop()!;
atomicDepth--;
// Silent rollback: avoid scheduling while restoring values
muted++;
try {
for (const [node, prev] of log) {
writeNodeValue(node, prev);
if ((node as Node).kind === "signal") {
for (const sub of (node as Node).subs) {
if (sub.kind === "computed") markStale(sub);
// sub.kind === "effect" does not need scheduling
// muted blocks it, and we also do not flush afterward
}
}
}
queue.clear(); // clear jobs created during this level
scheduled = false;
} finally {
muted--;
}
// No flush on failure; just exit batch/atomic depth
batchDepth--;
};
try {
const out = fn();
if (isPromiseLike<T>(out)) {
return Promise.resolve(out).then(
(v) => { exitCommit(); return v; },
(err) => { exitRollback(); throw err; }
);
}
// Synchronous success
exitCommit();
return out as T;
} catch (e) {
// Synchronous failure -> rollback
exitRollback();
throw e;
}
}
export function flushSync() {
if (!scheduled && queue.size === 0) return;
flushJobs();
}
function flushJobs() {
scheduled = false;
let guard = 0;
while (queue.size) {
const list = Array.from(queue);
queue.clear();
for (const job of list) job.run();
if (++guard > 10000) throw new Error("Infinite update loop");
}
}
Consistency Guarantees
When an atomic transaction fails:
- all affected
computednodes are marked as stale - on the next read after rollback, they lazily recompute based on the latest signal values
- the UI never sees the invalid snapshot, and no flush happens during rollback
Adjustments in signal.ts
import { markStale } from "./computed.js";
import { link, track, unlink, type Node } from "./graph.js";
import { SymbolRegistry as Effects } from "./registry.js";
import { inAtomic, recordAtomicWrite, type InternalNode } from "./scheduler.js";
type Comparator<T> = (a: T, b: T) => boolean;
const defaultEquals = Object.is;
export function signal<T>(initial: T, equals: Comparator<T> = defaultEquals) {
const node: Node & InternalNode<T> & { kind: "signal"; equals: Comparator<T> } = {
kind: "signal",
deps: new Set(),
subs: new Set(),
value: initial,
equals,
};
const get = () => {
track(node);
return node.value;
};
const set = (next: T | ((prev: T) => T)) => {
const prev = node.value;
const nxtVal = typeof next === "function" ? (next as (p: T) => T)(node.value) : next;
if (node.equals(node.value, nxtVal)) return;
// Atomic hook: record the previous value only when the write is confirmed,
// and only the first time this level touches the node
if (inAtomic()) recordAtomicWrite(node, prev);
// Perform the actual write
node.value = nxtVal;
// No downstream subscribers -> exit early to avoid unnecessary work
if (node.subs.size === 0) return;
// Has downstream subscribers -> follow the original propagation logic
for (const sub of node.subs) {
if (sub.kind === "effect") {
Effects.get(sub)?.schedule();
} else if (sub.kind === "computed") {
markStale(sub);
}
}
};
const subscribe = (observer: Node) => {
if (observer.kind === "signal") {
throw new Error("A signal cannot subscribe to another node");
}
link(observer, node);
return () => unlink(observer, node);
};
return { get, set, subscribe, peek: () => node.value };
}
Usage Scenarios
Inner transaction fails, outer transaction continues
(only the inner level rolls back)
const a = signal(0);
const b = signal(0);
await atomic(async () => { // outer
a.set(1); // OK
try {
await atomic(async () => { // inner
b.set(1);
throw new Error("boom"); // inner fails -> rollback b to 0
});
} catch {}
// At this point: a = 1, b = 0
}); // outer succeeds -> one flush
Outer transaction fails
(everything rolls back)
const a = signal(0);
const b = signal(0);
try {
await atomic(async () => {
a.set(1);
await Promise.resolve();
b.set(2);
throw new Error("oops"); // entire transaction fails -> rollback both a and b
});
} catch {}
// a = 0, b = 0
// and there was no flush in this transaction
React / Vue Examples
Using Atomic Transactions with Rollback in React
Scenario: editing a title. When the user clicks Save, we use atomic for an optimistic write. Only a successful request commits the update; failure rolls everything back.
import { useState, useEffect } from "react";
import { signal } from "../core/signal.js";
import { atomic } from "../core/scheduler.js";
import {
useSignalValue,
useSignalState,
useComputed,
} from "../hook/react_adapter.js";
// ---- mock API ----
async function postTitle(v: string, shouldFail = false) {
await new Promise((r) => setTimeout(r, 300)); // simulate latency
if (shouldFail) throw new Error("server says no");
return true;
}
// ---- state ----
const titleSig = signal("Hello");
// for unit test
export type EditorTestProps = { __sig?: ReturnType<typeof signal<string>> };
export function Editor({ __sig }: EditorTestProps = {}) {
const sig = __sig ?? titleSig; // default: module-scope signal; tests can override it
const committed = useSignalValue(sig); // read snapshot from external signal
const [draft, setDraft] = useSignalState(committed); // local draft state
useEffect(() => setDraft(committed), [committed]);
const len = useComputed(() => titleSig.get().length); // ✅ hook returns a value
const [saving, setSaving] = useState(false);
const [error, setError] = useState<string | null>(null);
const [shouldFail, setShouldFail] = useState(false);
const save = async () => {
setSaving(true);
setError(null);
try {
await atomic(async () => {
sig.set(draft); // optimistic write (does not flush immediately)
await postTitle(draft, shouldFail); // may throw -> rollback + no flush
});
// Success: only flush when atomic exits, so committed/len update together
} catch (e: any) {
setError(e?.message ?? "save failed");
} finally {
setSaving(false);
}
};
return (
<section>
<input
value={draft}
onChange={(e) => setDraft(e.target.value)}
disabled={saving}
/>
<button onClick={save} disabled={saving}>
{saving ? "Saving..." : "Save"}
</button>
<label style={{ marginLeft: 8 }}>
<input
type="checkbox"
checked={shouldFail}
onChange={(e) => setShouldFail(e.target.checked)}
/>
simulate failure
</label>
<hr />
<p>
Committed title: <b>{committed}</b>
</p>
<p>
Derived length (computed): <b>{len}</b>
</p>
{error && <p style={{ color: "crimson" }}>Error: {error}</p>}
</section>
);
}
Success
Click Save → wait 300ms → committed and len update in the same cycle with one flush.
Failure
Click Save with simulate failure checked → the old value remains visible. The affected computed is marked as stale, and the next read (for example, after the next successful update or any later reactive access) lazily recomputes to a consistent snapshot.
Using Atomic Transactions with Rollback in Vue
Same scenario, but as an SFC. We bridge into Vue using useSignalRef / useComputedRef.
<script setup lang="ts">
import { ref, watch } from "vue";
import { signal } from "../core/signal.js";
import { atomic } from "../core/scheduler.js";
import { useSignalRef, useComputedRef } from "../hook/vue_adapter.js";
// ---- mock API ----
async function postTitle(v: string, shouldFail = false) {
await new Promise((r) => setTimeout(r, 300));
if (shouldFail) throw new Error("server says no");
return true;
}
// ---- state ----
const titleSig = signal("Hello");
const committed = useSignalRef(titleSig);
const titleLen = useComputedRef(() => titleSig.get().length);
const draft = ref(committed.value);
watch(committed, (v) => (draft.value = v)); // sync draft when external value changes
const saving = ref(false);
const error = ref<string | null>(null);
const shouldFail = ref(false);
async function save() {
saving.value = true;
error.value = null;
try {
await atomic(async () => {
// optimistic write, but no immediate flush
titleSig.set(draft.value);
await postTitle(draft.value, shouldFail.value); // may throw
});
// Success: only flush after atomic exits -> one template update
} catch (e: any) {
// Failure: rollback, no flush -> template keeps showing the old value
error.value = e?.message ?? "save failed";
} finally {
saving.value = false;
}
}
</script>
<template>
<section>
<div>
<label>
Draft:
<input v-model="draft" :disabled="saving" />
</label>
<button @click="save" :disabled="saving">
{{ saving ? "Saving..." : "Save" }}
</button>
<label style="margin-left: 8px">
<input type="checkbox" v-model="shouldFail" :disabled="saving" />
simulate failure
</label>
</div>
<hr />
<p>
Committed title: <b>{{ committed }}</b>
</p>
<p>
Derived length (computed): <b>{{ titleLen }}</b>
</p>
<p v-if="error" style="color: crimson">Error: {{ error }}</p>
</section>
</template>
Success
After clicking Save, committed and titleLen update together in the same patch cycle.
Failure
The UI keeps showing the previous value. Because rollback already marked the affected computed nodes as stale, any future read will lazily recompute them back to the correct state.
Execution Timeline (Success vs Failure)
Closing Thoughts
With atomic, our state update model becomes more complete. Even when async work fails, the system can preserve the previous stable state, fully delivering on the original idea:
“It is a protective wrapper around multiple state updates that guarantees the whole operation either succeeds completely or has no effect at all.”
In the next article, we’ll move on to more advanced topics around the Scheduler.

Top comments (0)