DEV Community

James Lee
James Lee

Posted on

client-go Deep Dive: DeltaFIFO — The Event Queue Behind Informer

In the previous article we saw how Reflector calls List/Watch and feeds events downstream. The immediate destination of those events is DeltaFIFO — the event queue that sits at the heart of the Informer framework.

Source: k8s.io/client-go/tools/cache/delta_fifo.go


1. What Is DeltaFIFO?

The name has two parts:

Part Meaning
FIFO First-In-First-Out queue — supports Add, Update, Delete, List, Pop operations
Delta Each entry stores not just the object, but also the type of change (Added / Updated / Deleted / Sync / Replaced)

Together, DeltaFIFO is a queue where each item carries both what changed and how it changed — giving downstream consumers the full context they need to act correctly.


2. The Data Structure

// k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
    lock sync.RWMutex
    cond sync.Cond             // used to block/unblock consumers

    items map[string]Deltas    // key → list of deltas for that object
    queue []string             // ordered list of keys (FIFO ordering)

    populated              bool
    initialPopulationCount int  // tracks how many items came from the initial List

    keyFunc      KeyFunc         // computes the key for a given object
    knownObjects KeyListerGetter // reference to Indexer (for deletion detection)

    closed                bool
    emitDeltaTypeReplaced bool
}

type Deltas []Delta

type Delta struct {
    Type   DeltaType    // Added | Updated | Deleted | Replaced | Sync
    Object interface{}  // the actual Kubernetes resource object
}
Enter fullscreen mode Exit fullscreen mode

The Three Core Fields

queue: ["pod/default/nginx", "pod/default/redis", "deploy/default/app"]
          ordered slice of object keys (FIFO order)

items: {
  "pod/default/nginx":  [ {Added, PodObj_v1}, {Updated, PodObj_v2} ],
  "pod/default/redis":  [ {Deleted, PodObj_v1} ],
  "deploy/default/app": [ {Added, DeployObj_v1} ],
}
          map: key  []Delta (all pending changes for that object)

Delta: { Type: "Updated", Object: <Pod nginx v2> }
          a single change event: what happened + the object snapshot
Enter fullscreen mode Exit fullscreen mode

Visualizing the Structure

┌─────────────────────────────────────────────────────────────┐
│                        DeltaFIFO                            │
│                                                             │
│  queue (FIFO order):                                        │
│  ┌──────────────────┬──────────────────┬─────────────────┐  │
│  │ "pod/default/    │ "pod/default/    │ "deploy/default │  │
│  │  nginx"          │  redis"          │  /app"          │  │
│  └──────────────────┴──────────────────┴─────────────────┘  │
│          │                   │                  │            │
│          ▼                   ▼                  ▼            │
│  items (map):                                               │
│  ┌──────────────────┐ ┌────────────────┐ ┌──────────────┐  │
│  │ [{Added, nginx}  │ │[{Deleted,      │ │[{Added,      │  │
│  │  {Updated,nginx}]│ │  redis}]       │ │  deploy}]    │  │
│  └──────────────────┘ └────────────────┘ └──────────────┘  │
└─────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Key design insight: queue maintains ordering (which object to process next), while items accumulates all pending deltas per object. Multiple changes to the same object are batched — but never reordered relative to other objects.


3. Producing Messages: queueActionLocked

Every time Reflector calls syncWith() (from List) or watchHandler() (from Watch), it ultimately calls queueActionLocked to enqueue an event:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // ① Compute the object's key (e.g. "default/nginx")
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }

    // ② Append new Delta to the existing list for this key
    oldDeltas := f.items[id]
    newDeltas := append(oldDeltas, Delta{actionType, obj})

    // ③ Deduplicate consecutive identical events
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        // ④ If this key is new, add it to the queue (maintain FIFO order)
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        // ⑤ Update items map with the new delta list
        f.items[id] = newDeltas

        // ⑥ Wake up all blocked consumers
        f.cond.Broadcast()
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Production flow:

Reflector (r.syncWith / r.watchHandler)
     │
     ▼
queueActionLocked(actionType, obj)
     │
     ├── ① KeyOf(obj)              → compute key (e.g. "default/nginx")
     ├── ② append Delta            → add {type, obj} to existing delta list
     ├── ③ dedupDeltas()           → remove redundant consecutive events
     ├── ④ queue = append(key)     → add key to FIFO queue (if new)
     ├── ⑤ items[key] = newDeltas  → store updated delta list
     └── ⑥ cond.Broadcast()        → wake up Pop() consumers
Enter fullscreen mode Exit fullscreen mode

Delta Types

const (
    Added    DeltaType = "Added"     // object newly appeared (from Watch or initial List)
    Updated  DeltaType = "Updated"   // object was modified
    Deleted  DeltaType = "Deleted"   // object was removed
    Replaced DeltaType = "Replaced"  // full re-list replaced the object (resync)
    Sync     DeltaType = "Sync"      // periodic resync — object unchanged but re-delivered
)
Enter fullscreen mode Exit fullscreen mode

Deduplication: dedupDeltas

dedupDeltas prevents redundant back-to-back Deleted events for the same object:

Before dedup:  [{Updated, obj}, {Deleted, obj}, {Deleted, obj}]
After dedup:   [{Updated, obj}, {Deleted, obj}]
Enter fullscreen mode Exit fullscreen mode

Only the last two deltas are considered for deduplication — older history is always preserved.


4. Consuming Messages: Pop

The consumer side is driven by the Informer's internal Controller, which calls Pop() in its processLoop:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for {
        // ① Block if queue is empty — wait for producer to signal
        for len(f.queue) == 0 {
            if f.closed {
                return nil, ErrFIFOClosed
            }
            f.cond.Wait()   // releases lock and sleeps until cond.Broadcast()
        }

        // ② Dequeue the first key (FIFO order)
        id := f.queue[0]
        f.queue = f.queue[1:]

        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }

        // ③ Retrieve and remove the delta list for this key
        item, ok := f.items[id]
        if !ok {
            continue  // key was already processed (race condition guard)
        }
        delete(f.items, id)

        // ④ Pass the delta list to the callback function (HandleDeltas)
        err := process(item)

        // ⑤ If processing failed, re-enqueue for retry
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }

        return item, err
    }
}
Enter fullscreen mode Exit fullscreen mode

Consumption flow:

Controller.processLoop()
     │
     └── DeltaFIFO.Pop(HandleDeltas)
           │
           ├── ① cond.Wait()       → sleep if queue empty
           ├── ② dequeue key       → take first key from FIFO queue
           ├── ③ delete(items,id)  → remove delta list from map
           ├── ④ process(item)     → call HandleDeltas callback
           └── ⑤ on error:
                 addIfNotPresent() → re-enqueue for retry
Enter fullscreen mode Exit fullscreen mode

5. HandleDeltas: Routing Events Downstream

Once Pop() delivers a delta list to the callback, HandleDeltas routes each event to two destinations:

// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    for _, d := range obj.(Deltas) {
        switch d.Type {

        case Sync, Replaced, Added, Updated:
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                // Object already in cache → Update
                s.indexer.Update(d.Object)

                isSync := (d.Type == Sync) ||
                    (d.Type == Replaced && sameResourceVersion(d.Object, old))

                // Route to ShareInformer listeners
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                // Object not in cache → Add
                s.indexer.Add(d.Object)
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }

        case Deleted:
            s.indexer.Delete(d.Object)
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

The Two Destinations

HandleDeltas(Deltas)
     
     ├──  Indexer (local cache)
          ├── Added/Updated/Replaced/Sync  indexer.Add() or indexer.Update()
          └── Deleted                      indexer.Delete()
     
     └──  processor.distribute() (ShareInformer fan-out)
           ├── addNotification     triggers OnAdd  callbacks
           ├── updateNotification  triggers OnUpdate callbacks
           └── deleteNotification  triggers OnDelete callbacks
                      
                      
             informer.AddEventHandler(ResourceEventHandlerFuncs{
                 AddFunc:    func(obj) { workqueue.Add(key) },
                 UpdateFunc: func(old, new) { workqueue.Add(key) },
                 DeleteFunc: func(obj) { workqueue.Add(key) },
             })
Enter fullscreen mode Exit fullscreen mode

6. The Complete DeltaFIFO Lifecycle

┌─────────────────────────────────────────────────────────────────┐
│                     DeltaFIFO Lifecycle                         │
│                                                                 │
│  PRODUCER (Reflector)                                           │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │  r.syncWith()      → queueActionLocked(Added, obj)       │   │
│  │  r.watchHandler()  → queueActionLocked(Updated/Deleted…) │   │
│  └──────────────────────────────────────────────────────────┘   │
│                          │ cond.Broadcast()                     │
│                          ▼                                      │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                    DeltaFIFO                             │   │
│  │  queue: [key1, key2, key3, ...]   (FIFO order)           │   │
│  │  items: {key1: [Δ,Δ], key2: [Δ], key3: [Δ,Δ,Δ]}        │   │
│  └──────────────────────────────────────────────────────────┘   │
│                          │ cond.Wait → unblock                  │
│                          ▼                                      │
│  CONSUMER (Controller.processLoop)                              │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │  Pop(HandleDeltas)                                       │   │
│  │    → HandleDeltas                                        │   │
│  │        ├── Indexer.Add/Update/Delete  (local cache)      │   │
│  │        └── processor.distribute()    (event callbacks)   │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

7. Summary

Concept Detail
queue Ordered slice of object keys — guarantees FIFO processing order
items Map of key → []Delta — accumulates all pending changes per object
Delta {Type, Object} — carries both the change type and the full object snapshot
queueActionLocked Producer entry point — appends delta, deduplicates, signals consumers
dedupDeltas Removes redundant back-to-back Deleted events
Pop Consumer entry point — blocks when empty, dequeues FIFO, calls HandleDeltas
HandleDeltas Routes each delta to Indexer (storage) AND processor.distribute (callbacks)
cond.Broadcast/Wait Efficient producer-consumer synchronization — no busy polling

DeltaFIFO is the reliable handoff point between Reflector (producer) and the Informer's internal Controller (consumer). Its dual structure — an ordered queue for sequencing plus a map for delta accumulation — ensures that no event is lost, no object is processed out of order, and downstream consumers always receive the full change context.


Next in this series: Indexer: Fast In-Memory Resource Storage with Thread-Safe Indexing (Part 4)


Follow the series for more deep dives into Kubernetes development.

Top comments (0)