DEV Community

Cover image for Building Atomic Transactions with Rollback for Signals
Luciano0322
Luciano0322

Posted on

Building Atomic Transactions with Rollback for Signals

What Is an Atomic Transaction?

Before we begin, let’s define atomic transaction clearly:

“It is a protective wrapper around multiple state updates that guarantees the whole operation either succeeds completely or has no effect at all.”

Inside an atomic transaction, you can perform multiple set() calls, and even cross multiple await boundaries. Only when the entire operation succeeds do we commit everything at once and rerun effects with a single flush. If anything fails halfway through, all touched signals are restored to their pre-transaction values, and the error state is never pushed outward.

This is different from a regular batch / transaction, which only coalesces reruns. An atomic transaction adds rollback semantics:

“Commit everything once on success; undo everything on failure.”

At the same time, it does not change the mental model of our core runtime: computed stays lazy, and the dependency graph remains intact.

Behavior Definition

Now that the concept is clear, let’s define the behavior we want to implement:

Success

All signal.set() calls inside the transaction are committed, and when the outermost level exits, we call flushJobs() once, so our effects rerun only once.

Failure (throw / reject)

All affected signals are restored to the values they had when entering the current transaction level. We do not flush, so invalid intermediate state is never pushed into effects. Downstream computed nodes are marked as stale and will lazily recompute on the next read.

Nested transactions

Each level maintains its own write log.

  • If an inner transaction succeeds, its log is merged into the outer transaction, while preserving the outermost original value.
  • If an inner transaction fails, only the inner transaction is rolled back. The outer transaction may either catch the error and continue, or let it propagate upward.

Equality semantics

Consistent with the core runtime, Object.is(prev, next) means “no change”: no log entry, no scheduling.

API and File Structure

Add the following to scheduler.ts:

  • atomic(fn): atomic transaction, supports both sync and async flows
  • inAtomic(): whether execution is currently inside any atomic level
  • recordAtomicWrite(node, prev): records the previous value the first time a node is written in the current level
  • scheduleJob: reuse the existing batch / transaction gating logic, using batchDepth to decide whether to defer the microtask

And in signal.set(), at the exact moment when the equality check passes and we are certain the write will happen, insert:

if (inAtomic()) recordAtomicWrite(node, prev)
Enter fullscreen mode Exit fullscreen mode

Extending scheduler.ts

import { markStale } from "./computed.js";
import type { Node } from "./graph.js";

export interface Schedulable { run(): void; disposed?: boolean }

// Internal node shape used by signal/computed
export type InternalNode<T = unknown> = { value: T };

// Write log for atomic transactions
type WriteLog = Map<(Node & InternalNode<unknown>), unknown>;

const queue = new Set<Schedulable>();
let scheduled = false;

// > 0 means we are inside batch/transaction mode (delay microtask flushing)
let batchDepth = 0;

// Atomic transaction depth and log stack
let atomicDepth = 0;
const atomicLogs: WriteLog[] = [];

// Mute scheduling during rollback to prevent scheduleJob from creating new work
let muted = 0;

export function scheduleJob(job: Schedulable) {
  if (job.disposed) return;
  queue.add(job);
  if (!scheduled && batchDepth === 0) {
    scheduled = true;
    queueMicrotask(flushJobs);
  }
}

export function batch<T>(fn: () => T): T {
  batchDepth++;
  try {
    return fn();
  } finally {
    batchDepth--;
    if (batchDepth === 0) flushJobs();
  }
}

// Promise detection
function isPromiseLike<T = unknown>(v: any): v is PromiseLike<T> {
  return v != null && typeof v.then === "function";
}

export function transaction<T>(fn: () => T): T;
export function transaction<T>(fn: () => Promise<T>): Promise<T>;
export function transaction<T>(fn: () => T | Promise<T>): T | Promise<T> {
  batchDepth++;
  try {
    const out = fn();
    if (isPromiseLike<T>(out)) {
      return Promise.resolve(out).finally(() => {
        batchDepth--;
        if (batchDepth === 0) flushJobs();
      });
    }
    batchDepth--;
    if (batchDepth === 0) flushJobs();
    return out as T;
  } catch (e) {
    batchDepth--;
    if (batchDepth === 0) flushJobs();
    throw e;
  }
}

// Atomic transaction (with rollback)
export function inAtomic() {
  return atomicDepth > 0;
}

// Record the "first write in this level"; called by signal.set() when a write is confirmed
export function recordAtomicWrite<T>(node: Node & InternalNode<T>, prevValue: T) {
  const log = atomicLogs[atomicLogs.length - 1];
  if (!log) return; // safety guard: no active atomic layer
  if (!log.has(node)) log.set(node, prevValue);
}

function writeNodeValue<T>(node: Node & InternalNode<T>, v: T) {
  if ("value" in node) (node as { value: T }).value = v;
}

function mergeChildIntoParent(child: WriteLog, parent: WriteLog) {
  for (const [node, prev] of child) {
    if (!parent.has(node)) parent.set(node, prev);
  }
}

export function atomic<T>(fn: () => T): T;
export function atomic<T>(fn: () => Promise<T>): Promise<T>;
export function atomic<T>(fn: () => T | Promise<T>): T | Promise<T> {
  // Enter atomic layer: suppress flushing (shared batchDepth), start write logging
  batchDepth++;
  atomicDepth++;
  atomicLogs.push(new Map<(Node & InternalNode<unknown>), unknown>());

  const exitCommit = () => {
    const log = atomicLogs.pop()!;
    atomicDepth--;
    // Inner success -> merge first-seen old values into parent
    if (atomicDepth > 0) {
      mergeChildIntoParent(log, atomicLogs[atomicLogs.length - 1]!);
    }
    // Only flush when the outermost layer exits
    batchDepth--;
    if (batchDepth === 0) flushJobs();
  };

  const exitRollback = () => {
    const log = atomicLogs.pop()!;
    atomicDepth--;
    // Silent rollback: avoid scheduling while restoring values
    muted++;
    try {
      for (const [node, prev] of log) {
        writeNodeValue(node, prev);
        if ((node as Node).kind === "signal") {
          for (const sub of (node as Node).subs) {
            if (sub.kind === "computed") markStale(sub);
            // sub.kind === "effect" does not need scheduling
            // muted blocks it, and we also do not flush afterward
          }
        }
      }
      queue.clear(); // clear jobs created during this level
      scheduled = false;
    } finally {
      muted--;
    }
    // No flush on failure; just exit batch/atomic depth
    batchDepth--;
  };

  try {
    const out = fn();
    if (isPromiseLike<T>(out)) {
      return Promise.resolve(out).then(
        (v) => { exitCommit(); return v; },
        (err) => { exitRollback(); throw err; }
      );
    }
    // Synchronous success
    exitCommit();
    return out as T;
  } catch (e) {
    // Synchronous failure -> rollback
    exitRollback();
    throw e;
  }
}

export function flushSync() {
  if (!scheduled && queue.size === 0) return;
  flushJobs();
}

function flushJobs() {
  scheduled = false;
  let guard = 0;
  while (queue.size) {
    const list = Array.from(queue);
    queue.clear();
    for (const job of list) job.run();
    if (++guard > 10000) throw new Error("Infinite update loop");
  }
}
Enter fullscreen mode Exit fullscreen mode

Consistency Guarantees

When an atomic transaction fails:

  • all affected computed nodes are marked as stale
  • on the next read after rollback, they lazily recompute based on the latest signal values
  • the UI never sees the invalid snapshot, and no flush happens during rollback

Adjustments in signal.ts

import { markStale } from "./computed.js";
import { link, track, unlink, type Node } from "./graph.js";
import { SymbolRegistry as Effects } from "./registry.js";
import { inAtomic, recordAtomicWrite, type InternalNode } from "./scheduler.js";

type Comparator<T> = (a: T, b: T) => boolean;
const defaultEquals = Object.is;

export function signal<T>(initial: T, equals: Comparator<T> = defaultEquals) {
  const node: Node & InternalNode<T> & { kind: "signal"; equals: Comparator<T> } = {
    kind: "signal",
    deps: new Set(),
    subs: new Set(),
    value: initial,
    equals,
  };

  const get = () => {
    track(node);
    return node.value;
  };

  const set = (next: T | ((prev: T) => T)) => {
    const prev = node.value;
    const nxtVal = typeof next === "function" ? (next as (p: T) => T)(node.value) : next;
    if (node.equals(node.value, nxtVal)) return;

    // Atomic hook: record the previous value only when the write is confirmed,
    // and only the first time this level touches the node
    if (inAtomic()) recordAtomicWrite(node, prev);

    // Perform the actual write
    node.value = nxtVal;

    // No downstream subscribers -> exit early to avoid unnecessary work
    if (node.subs.size === 0) return;

    // Has downstream subscribers -> follow the original propagation logic
    for (const sub of node.subs) {
      if (sub.kind === "effect") {
        Effects.get(sub)?.schedule();
      } else if (sub.kind === "computed") {
        markStale(sub);
      }
    }
  };

  const subscribe = (observer: Node) => {
    if (observer.kind === "signal") {
      throw new Error("A signal cannot subscribe to another node");
    }
    link(observer, node);
    return () => unlink(observer, node);
  };

  return { get, set, subscribe, peek: () => node.value };
}
Enter fullscreen mode Exit fullscreen mode

Usage Scenarios

Inner transaction fails, outer transaction continues

(only the inner level rolls back)

const a = signal(0);
const b = signal(0);

await atomic(async () => { // outer
  a.set(1); // OK
  try {
    await atomic(async () => { // inner
      b.set(1);
      throw new Error("boom"); // inner fails -> rollback b to 0
    });
  } catch {}
  // At this point: a = 1, b = 0
}); // outer succeeds -> one flush
Enter fullscreen mode Exit fullscreen mode

Outer transaction fails

(everything rolls back)

const a = signal(0);
const b = signal(0);

try {
  await atomic(async () => {
    a.set(1);
    await Promise.resolve();
    b.set(2);
    throw new Error("oops"); // entire transaction fails -> rollback both a and b
  });
} catch {}

// a = 0, b = 0
// and there was no flush in this transaction
Enter fullscreen mode Exit fullscreen mode

React / Vue Examples

Using Atomic Transactions with Rollback in React

Scenario: editing a title. When the user clicks Save, we use atomic for an optimistic write. Only a successful request commits the update; failure rolls everything back.

import { useState, useEffect } from "react";
import { signal } from "../core/signal.js";
import { atomic } from "../core/scheduler.js";
import {
  useSignalValue,
  useSignalState,
  useComputed,
} from "../hook/react_adapter.js";

// ---- mock API ----
async function postTitle(v: string, shouldFail = false) {
  await new Promise((r) => setTimeout(r, 300)); // simulate latency
  if (shouldFail) throw new Error("server says no");
  return true;
}

// ---- state ----
const titleSig = signal("Hello");

// for unit test
export type EditorTestProps = { __sig?: ReturnType<typeof signal<string>> };

export function Editor({ __sig }: EditorTestProps = {}) {
  const sig = __sig ?? titleSig; // default: module-scope signal; tests can override it

  const committed = useSignalValue(sig); // read snapshot from external signal
  const [draft, setDraft] = useSignalState(committed); // local draft state
  useEffect(() => setDraft(committed), [committed]);

  const len = useComputed(() => titleSig.get().length); // ✅ hook returns a value

  const [saving, setSaving] = useState(false);
  const [error, setError] = useState<string | null>(null);
  const [shouldFail, setShouldFail] = useState(false);

  const save = async () => {
    setSaving(true);
    setError(null);
    try {
      await atomic(async () => {
        sig.set(draft); // optimistic write (does not flush immediately)
        await postTitle(draft, shouldFail); // may throw -> rollback + no flush
      });
      // Success: only flush when atomic exits, so committed/len update together
    } catch (e: any) {
      setError(e?.message ?? "save failed");
    } finally {
      setSaving(false);
    }
  };

  return (
    <section>
      <input
        value={draft}
        onChange={(e) => setDraft(e.target.value)}
        disabled={saving}
      />
      <button onClick={save} disabled={saving}>
        {saving ? "Saving..." : "Save"}
      </button>
      <label style={{ marginLeft: 8 }}>
        <input
          type="checkbox"
          checked={shouldFail}
          onChange={(e) => setShouldFail(e.target.checked)}
        />
        simulate failure
      </label>

      <hr />
      <p>
        Committed title: <b>{committed}</b>
      </p>
      <p>
        Derived length (computed): <b>{len}</b>
      </p>
      {error && <p style={{ color: "crimson" }}>Error: {error}</p>}
    </section>
  );
}
Enter fullscreen mode Exit fullscreen mode

Success
Click Save → wait 300ms → committed and len update in the same cycle with one flush.

Failure
Click Save with simulate failure checked → the old value remains visible. The affected computed is marked as stale, and the next read (for example, after the next successful update or any later reactive access) lazily recomputes to a consistent snapshot.


Using Atomic Transactions with Rollback in Vue

Same scenario, but as an SFC. We bridge into Vue using useSignalRef / useComputedRef.

<script setup lang="ts">
import { ref, watch } from "vue";
import { signal } from "../core/signal.js";
import { atomic } from "../core/scheduler.js";
import { useSignalRef, useComputedRef } from "../hook/vue_adapter.js";

// ---- mock API ----
async function postTitle(v: string, shouldFail = false) {
  await new Promise((r) => setTimeout(r, 300));
  if (shouldFail) throw new Error("server says no");
  return true;
}

// ---- state ----
const titleSig = signal("Hello");
const committed = useSignalRef(titleSig);
const titleLen = useComputedRef(() => titleSig.get().length);

const draft = ref(committed.value);
watch(committed, (v) => (draft.value = v)); // sync draft when external value changes

const saving = ref(false);
const error = ref<string | null>(null);
const shouldFail = ref(false);

async function save() {
  saving.value = true;
  error.value = null;
  try {
    await atomic(async () => {
      // optimistic write, but no immediate flush
      titleSig.set(draft.value);
      await postTitle(draft.value, shouldFail.value); // may throw
    });
    // Success: only flush after atomic exits -> one template update
  } catch (e: any) {
    // Failure: rollback, no flush -> template keeps showing the old value
    error.value = e?.message ?? "save failed";
  } finally {
    saving.value = false;
  }
}
</script>

<template>
  <section>
    <div>
      <label>
        Draft:
        <input v-model="draft" :disabled="saving" />
      </label>
      <button @click="save" :disabled="saving">
        {{ saving ? "Saving..." : "Save" }}
      </button>
      <label style="margin-left: 8px">
        <input type="checkbox" v-model="shouldFail" :disabled="saving" />
        simulate failure
      </label>
    </div>

    <hr />

    <p>
      Committed title: <b>{{ committed }}</b>
    </p>
    <p>
      Derived length (computed): <b>{{ titleLen }}</b>
    </p>
    <p v-if="error" style="color: crimson">Error: {{ error }}</p>
  </section>
</template>
Enter fullscreen mode Exit fullscreen mode

Success
After clicking Save, committed and titleLen update together in the same patch cycle.

Failure
The UI keeps showing the previous value. Because rollback already marked the affected computed nodes as stale, any future read will lazily recompute them back to the correct state.

Execution Timeline (Success vs Failure)

atomic timeline

Closing Thoughts

With atomic, our state update model becomes more complete. Even when async work fails, the system can preserve the previous stable state, fully delivering on the original idea:

“It is a protective wrapper around multiple state updates that guarantees the whole operation either succeeds completely or has no effect at all.”

In the next article, we’ll move on to more advanced topics around the Scheduler.

Top comments (0)