In the previous article we saw how Reflector calls List/Watch and feeds events downstream. The immediate destination of those events is DeltaFIFO — the event queue that sits at the heart of the Informer framework.
Source: k8s.io/client-go/tools/cache/delta_fifo.go
1. What Is DeltaFIFO?
The name has two parts:
| Part | Meaning |
|---|---|
| FIFO | First-In-First-Out queue — supports Add, Update, Delete, List, Pop operations |
| Delta | Each entry stores not just the object, but also the type of change (Added / Updated / Deleted / Sync / Replaced) |
Together, DeltaFIFO is a queue where each item carries both what changed and how it changed — giving downstream consumers the full context they need to act correctly.
2. The Data Structure
// k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond // used to block/unblock consumers
items map[string]Deltas // key → list of deltas for that object
queue []string // ordered list of keys (FIFO ordering)
populated bool
initialPopulationCount int // tracks how many items came from the initial List
keyFunc KeyFunc // computes the key for a given object
knownObjects KeyListerGetter // reference to Indexer (for deletion detection)
closed bool
emitDeltaTypeReplaced bool
}
type Deltas []Delta
type Delta struct {
Type DeltaType // Added | Updated | Deleted | Replaced | Sync
Object interface{} // the actual Kubernetes resource object
}
The Three Core Fields
queue: ["pod/default/nginx", "pod/default/redis", "deploy/default/app"]
↑ ordered slice of object keys (FIFO order)
items: {
"pod/default/nginx": [ {Added, PodObj_v1}, {Updated, PodObj_v2} ],
"pod/default/redis": [ {Deleted, PodObj_v1} ],
"deploy/default/app": [ {Added, DeployObj_v1} ],
}
↑ map: key → []Delta (all pending changes for that object)
Delta: { Type: "Updated", Object: <Pod nginx v2> }
↑ a single change event: what happened + the object snapshot
Visualizing the Structure
┌─────────────────────────────────────────────────────────────┐
│ DeltaFIFO │
│ │
│ queue (FIFO order): │
│ ┌──────────────────┬──────────────────┬─────────────────┐ │
│ │ "pod/default/ │ "pod/default/ │ "deploy/default │ │
│ │ nginx" │ redis" │ /app" │ │
│ └──────────────────┴──────────────────┴─────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ items (map): │
│ ┌──────────────────┐ ┌────────────────┐ ┌──────────────┐ │
│ │ [{Added, nginx} │ │[{Deleted, │ │[{Added, │ │
│ │ {Updated,nginx}]│ │ redis}] │ │ deploy}] │ │
│ └──────────────────┘ └────────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
Key design insight:
queuemaintains ordering (which object to process next), whileitemsaccumulates all pending deltas per object. Multiple changes to the same object are batched — but never reordered relative to other objects.
3. Producing Messages: queueActionLocked
Every time Reflector calls syncWith() (from List) or watchHandler() (from Watch), it ultimately calls queueActionLocked to enqueue an event:
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// ① Compute the object's key (e.g. "default/nginx")
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// ② Append new Delta to the existing list for this key
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
// ③ Deduplicate consecutive identical events
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
// ④ If this key is new, add it to the queue (maintain FIFO order)
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
// ⑤ Update items map with the new delta list
f.items[id] = newDeltas
// ⑥ Wake up all blocked consumers
f.cond.Broadcast()
}
return nil
}
Production flow:
Reflector (r.syncWith / r.watchHandler)
│
▼
queueActionLocked(actionType, obj)
│
├── ① KeyOf(obj) → compute key (e.g. "default/nginx")
├── ② append Delta → add {type, obj} to existing delta list
├── ③ dedupDeltas() → remove redundant consecutive events
├── ④ queue = append(key) → add key to FIFO queue (if new)
├── ⑤ items[key] = newDeltas → store updated delta list
└── ⑥ cond.Broadcast() → wake up Pop() consumers
Delta Types
const (
Added DeltaType = "Added" // object newly appeared (from Watch or initial List)
Updated DeltaType = "Updated" // object was modified
Deleted DeltaType = "Deleted" // object was removed
Replaced DeltaType = "Replaced" // full re-list replaced the object (resync)
Sync DeltaType = "Sync" // periodic resync — object unchanged but re-delivered
)
Deduplication: dedupDeltas
dedupDeltas prevents redundant back-to-back Deleted events for the same object:
Before dedup: [{Updated, obj}, {Deleted, obj}, {Deleted, obj}]
After dedup: [{Updated, obj}, {Deleted, obj}]
Only the last two deltas are considered for deduplication — older history is always preserved.
4. Consuming Messages: Pop
The consumer side is driven by the Informer's internal Controller, which calls Pop() in its processLoop:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// ① Block if queue is empty — wait for producer to signal
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait() // releases lock and sleeps until cond.Broadcast()
}
// ② Dequeue the first key (FIFO order)
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// ③ Retrieve and remove the delta list for this key
item, ok := f.items[id]
if !ok {
continue // key was already processed (race condition guard)
}
delete(f.items, id)
// ④ Pass the delta list to the callback function (HandleDeltas)
err := process(item)
// ⑤ If processing failed, re-enqueue for retry
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
Consumption flow:
Controller.processLoop()
│
└── DeltaFIFO.Pop(HandleDeltas)
│
├── ① cond.Wait() → sleep if queue empty
├── ② dequeue key → take first key from FIFO queue
├── ③ delete(items,id) → remove delta list from map
├── ④ process(item) → call HandleDeltas callback
└── ⑤ on error:
addIfNotPresent() → re-enqueue for retry
5. HandleDeltas: Routing Events Downstream
Once Pop() delivers a delta list to the callback, HandleDeltas routes each event to two destinations:
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// Object already in cache → Update
s.indexer.Update(d.Object)
isSync := (d.Type == Sync) ||
(d.Type == Replaced && sameResourceVersion(d.Object, old))
// Route to ShareInformer listeners
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// Object not in cache → Add
s.indexer.Add(d.Object)
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
s.indexer.Delete(d.Object)
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
The Two Destinations
HandleDeltas(Deltas)
│
├── ① Indexer (local cache)
│ ├── Added/Updated/Replaced/Sync → indexer.Add() or indexer.Update()
│ └── Deleted → indexer.Delete()
│
└── ② processor.distribute() (ShareInformer fan-out)
├── addNotification → triggers OnAdd callbacks
├── updateNotification → triggers OnUpdate callbacks
└── deleteNotification → triggers OnDelete callbacks
│
▼
informer.AddEventHandler(ResourceEventHandlerFuncs{
AddFunc: func(obj) { workqueue.Add(key) },
UpdateFunc: func(old, new) { workqueue.Add(key) },
DeleteFunc: func(obj) { workqueue.Add(key) },
})
6. The Complete DeltaFIFO Lifecycle
┌─────────────────────────────────────────────────────────────────┐
│ DeltaFIFO Lifecycle │
│ │
│ PRODUCER (Reflector) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ r.syncWith() → queueActionLocked(Added, obj) │ │
│ │ r.watchHandler() → queueActionLocked(Updated/Deleted…) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ cond.Broadcast() │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ DeltaFIFO │ │
│ │ queue: [key1, key2, key3, ...] (FIFO order) │ │
│ │ items: {key1: [Δ,Δ], key2: [Δ], key3: [Δ,Δ,Δ]} │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ cond.Wait → unblock │
│ ▼ │
│ CONSUMER (Controller.processLoop) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Pop(HandleDeltas) │ │
│ │ → HandleDeltas │ │
│ │ ├── Indexer.Add/Update/Delete (local cache) │ │
│ │ └── processor.distribute() (event callbacks) │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
7. Summary
| Concept | Detail |
|---|---|
| queue | Ordered slice of object keys — guarantees FIFO processing order |
| items | Map of key → []Delta — accumulates all pending changes per object |
| Delta |
{Type, Object} — carries both the change type and the full object snapshot |
| queueActionLocked | Producer entry point — appends delta, deduplicates, signals consumers |
| dedupDeltas | Removes redundant back-to-back Deleted events |
| Pop | Consumer entry point — blocks when empty, dequeues FIFO, calls HandleDeltas |
| HandleDeltas | Routes each delta to Indexer (storage) AND processor.distribute (callbacks) |
| cond.Broadcast/Wait | Efficient producer-consumer synchronization — no busy polling |
DeltaFIFO is the reliable handoff point between Reflector (producer) and the Informer's internal Controller (consumer). Its dual structure — an ordered queue for sequencing plus a map for delta accumulation — ensures that no event is lost, no object is processed out of order, and downstream consumers always receive the full change context.
Next in this series: Indexer: Fast In-Memory Resource Storage with Thread-Safe Indexing (Part 4)
Follow the series for more deep dives into Kubernetes development.
Top comments (0)