Over the past six articles we've examined each component of the Informer framework individually. Now it's time to look at the component that wires them all together: the Informer-internal Controller.
⚠️ Naming clarification: The "Controller" in this article is
k8s.io/client-go/tools/cache/controller.go— the Informer-internal orchestrator. It is NOT the same as the business-logic controller you write to reconcile custom resources. We'll call it the Informer Controller throughout this article to avoid confusion.
Source: k8s.io/client-go/tools/cache/controller.go
1. The Two Types of "Controller" in Kubernetes
Before diving in, let's be precise about terminology:
┌──────────────────────────────────────────────────────────────────┐
│ Type A: Informer Controller (this article) │
│ Package: k8s.io/client-go/tools/cache │
│ Role: internal orchestrator — starts Reflector, drives │
│ processLoop, connects DeltaFIFO → Indexer + callbacks │
│ You never instantiate this directly. │
└──────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ Type B: Your Application Controller │
│ Package: your codebase │
│ Role: business logic — reconciles desired vs actual state │
│ using WorkQueue + Lister + Clientset │
│ You write and own this. │
└──────────────────────────────────────────────────────────────────┘
2. The Controller Struct
// k8s.io/client-go/tools/cache/controller.go
type controller struct {
config Config // all dependencies injected here
reflector *Reflector // created from config at startup
reflectorMutex sync.RWMutex
clock clock.Clock
}
The controller itself is intentionally thin — all meaningful configuration lives in Config:
type Config struct {
// DeltaFIFO — the event queue (producer: Reflector, consumer: processLoop)
Queue
// Provides List() and Watch() — injected into Reflector at startup
ListerWatcher
// The callback invoked by processLoop for each item popped from DeltaFIFO
// In practice: sharedIndexInformer.HandleDeltas
Process ProcessFunc
// The resource type being watched (e.g. *v1.Pod)
ObjectType runtime.Object
// How often to force a full resync (0 = never)
FullResyncPeriod time.Duration
// Optional: custom function to decide whether to resync on each tick
ShouldResync ShouldResyncFunc
// If true: when Process() returns an error, re-enqueue the item
RetryOnError bool
// Called when Watch() returns an error
WatchErrorHandler WatchErrorHandler
// Pagination size for Watch requests
WatchListPageSize int64
}
Config field map
| Field | Provided by | Used for |
|---|---|---|
Queue |
DeltaFIFO | Event buffer between Reflector and processLoop |
ListerWatcher |
Pod/Deployment Informer's ListFunc+WatchFunc | Injected into Reflector |
Process |
sharedIndexInformer.HandleDeltas |
Callback for each popped event batch |
ObjectType |
e.g. &v1.Pod{}
|
Tells Reflector what type to deserialize |
FullResyncPeriod |
User config (e.g. 30*time.Minute) |
Periodic forced re-List |
RetryOnError |
Usually false
|
Re-enqueue on Process() failure |
WatchErrorHandler |
Optional custom handler | React to Watch connection errors |
3. How the Informer Controller Starts: Run()
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// ① Create Reflector from Config
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue, // DeltaFIFO is passed as the Store
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
// ② Start Reflector in a goroutine — runs List/Watch forever
wg.StartWithChannel(stopCh, r.Run)
// ③ Start processLoop — consumes DeltaFIFO forever
wait.Until(c.processLoop, time.Second, stopCh)
}
Startup sequence:
controller.Run(stopCh)
│
├── ① NewReflector(ListerWatcher, ObjectType, DeltaFIFO, resyncPeriod)
│ → Reflector is wired to DeltaFIFO as its Store
│
├── ② go r.Run(stopCh)
│ → Reflector.ListAndWatch() starts in background goroutine
│ → Phase 1: List → syncWith() → DeltaFIFO seeded
│ → Phase 2: Watch → watchHandler() → events stream into DeltaFIFO
│
└── ③ wait.Until(c.processLoop, 1s, stopCh)
→ processLoop runs in foreground, restarted every 1s if it exits
4. The processLoop: Consuming DeltaFIFO
func (c *controller) processLoop() {
for {
// Pop() blocks until DeltaFIFO has an item
// Passes each item to c.config.Process (= HandleDeltas)
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return // queue was shut down — exit cleanly
}
if c.config.RetryOnError {
// Re-enqueue the item if RetryOnError is set
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
processLoop flow:
processLoop (runs forever)
│
└── DeltaFIFO.Pop(HandleDeltas)
│
├── blocks on cond.Wait() if queue is empty
├── dequeues next key (FIFO order)
└── calls HandleDeltas([]Delta)
│
├── Indexer.Add/Update/Delete ← keep local cache in sync
└── sharedProcessor.distribute() ← fan-out to user callbacks
5. The Complete Informer Framework — All Components Together
Now that we've covered every component, here is the complete picture:
┌──────────────────────────────────────────────────────────────────────────┐
│ Informer Framework (full picture) │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ sharedIndexInformer │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ Informer Controller (internal) │ │ │
│ │ │ │ │ │
│ │ │ API Server │ │ │
│ │ │ │ List/Watch (HTTP chunked) │ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌──────────┐ events ┌──────────────┐ │ │ │
│ │ │ │ Reflector│ ────────► │ DeltaFIFO │ │ │ │
│ │ │ │(goroutine│ │ queue+items │ │ │ │
│ │ │ └──────────┘ └──────┬───────┘ │ │ │
│ │ │ │ Pop() │ │ │
│ │ │ ▼ │ │ │
│ │ │ processLoop() │ │ │
│ │ │ │ │ │ │
│ │ └────────────────────────────────┼────────────────────────┘ │ │
│ │ │ HandleDeltas │ │
│ │ ┌──────────────┴──────────────┐ │ │
│ │ ▼ ▼ │ │
│ │ ┌────────────┐ ┌──────────────────┐ │ │
│ │ │ Indexer │ │ sharedProcessor │ │ │
│ │ │(ThreadSafe │ │ distribute() │ │ │
│ │ │ Map) │ └────────┬─────────┘ │ │
│ │ └────────────┘ │ │ │
│ │ ▲ ▼ │ │
│ │ │ Lister AddEventHandler │ │
│ └────────────────────┼───────────────────────────┼───────────────┘ │
│ │ │ │
│ ┌────────┴──────────────────────────▼──────────────┐ │
│ │ Your Application Controller │ │
│ │ │ │
│ │ podsLister.Pods(ns).Get(name) ← read from cache │ │
│ │ clientset.CoreV1().Pods().Update() ← write to API│ │
│ │ workqueue.AddRateLimited(key) ← retry on error │ │
│ │ recorder.Eventf(obj, ...) ← emit events │ │
│ └───────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────┘
6. Data Flow: End to End
① factory.Start(stopCh)
└── for each registered informer:
go sharedIndexInformer.Run(stopCh)
└── controller.Run(stopCh)
├── go Reflector.Run(stopCh) [goroutine A]
└── processLoop() [goroutine B, main]
② Goroutine A — Reflector
└── ListAndWatch()
├── Phase 1: List → syncWith() → DeltaFIFO.Replace()
│ all existing objects enqueued as "Added" or "Replaced"
└── Phase 2: Watch → watchHandler() → DeltaFIFO.Add/Update/Delete
incremental events stream in continuously
③ Goroutine B — processLoop
└── DeltaFIFO.Pop(HandleDeltas) [blocks when empty]
└── HandleDeltas([]Delta)
for each Delta:
├── Indexer.Add/Update/Delete → local cache updated
└── sharedProcessor.distribute()
└── for each listener (AddEventHandler):
├── OnAdd(newObj)
├── OnUpdate(oldObj, newObj)
└── OnDelete(oldObj)
└── workqueue.Add(key)
④ Your controller worker goroutine(s)
└── workqueue.Get(key)
└── reconcile(key)
├── lister.Get(key) → read from Indexer (no API call)
├── clientset.Update/Patch/Delete → write to API Server
├── recorder.Eventf(...) → emit Event to etcd
└── on error: workqueue.AddRateLimited(key)
7. WaitForCacheSync: Why It Matters
Before your controller starts processing, the local cache must be fully populated from the initial List. WaitForCacheSync blocks until this is complete:
func (kc *KubeController) Run(stopCh chan struct{}) {
// Start all informers
kc.factory.Start(stopCh)
// Block until initial List is complete and DeltaFIFO is drained
// HasSynced() returns true when initialPopulationCount reaches 0
if !cache.WaitForCacheSync(stopCh,
kc.podsSynced,
kc.deploymentsSynced,
) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// Safe to start workers now — Indexer has a complete snapshot
for i := 0; i < workers; i++ {
go wait.Until(kc.runWorker, time.Second, stopCh)
}
<-stopCh
}
What HasSynced() tracks internally:
DeltaFIFO.initialPopulationCount
│
├── set to N during Replace() (initial List result: N objects)
├── decremented by 1 on each Pop()
└── HasSynced() = true when initialPopulationCount == 0
AND at least one Replace() has been called
Never start reconciliation workers before
WaitForCacheSyncreturns. If you do, your Lister queries will return incomplete results and your controller will make incorrect decisions based on a partial view of cluster state.
8. Series Summary: The Complete Component Map
We've now covered every component in the Informer framework. Here's the complete reference:
| Component | Package | Role | Article |
|---|---|---|---|
| Reflector | tools/cache |
List/Watch against API Server; feeds DeltaFIFO | Part 2 |
| DeltaFIFO | tools/cache |
FIFO event queue; stores {type, object} deltas |
Part 3 |
| Indexer | tools/cache |
Thread-safe in-memory cache with custom indexing | Part 4 |
| WorkQueue | util/workqueue |
Rate-limited, deduplicated task queue for workers | Part 5 |
| EventBroadcaster | tools/record |
Emits structured Events to API Server / logs | Part 6 |
| Controller (internal) | tools/cache |
Orchestrates Reflector + DeltaFIFO + processLoop | Part 7 (this) |
| sharedProcessor | tools/cache |
Fan-out: delivers events to all AddEventHandler listeners | — |
| SharedInformerFactory | informers |
Ensures one Reflector per resource type across all controllers | Part 1 |
9. Summary
Informer Controller = orchestrator, not business logic
controller.Run(stopCh)
│
├── NewReflector(ListerWatcher, ObjectType, DeltaFIFO)
│ → wires API Server connection to event queue
│
├── go Reflector.Run()
│ → List (seed cache) + Watch (stream events) forever
│
└── processLoop()
→ Pop from DeltaFIFO
→ HandleDeltas:
Indexer (storage) + sharedProcessor (callbacks)
| Concept | Detail |
|---|---|
| controller struct | Thin wrapper — holds Config + Reflector reference |
| Config.Queue | DeltaFIFO — the handoff point between Reflector and processLoop |
| Config.Process |
HandleDeltas — routes events to Indexer and sharedProcessor |
| Config.RetryOnError | Re-enqueues items when HandleDeltas returns an error |
| processLoop | Infinite loop: Pop → HandleDeltas → repeat |
| WaitForCacheSync | Safety gate — ensures Indexer is fully populated before workers start |
The Informer Controller is the invisible backbone of every Kubernetes controller. By orchestrating Reflector, DeltaFIFO, Indexer, and sharedProcessor into a single coherent pipeline, it gives you a reliable, consistent, and efficient view of cluster state — so your application controller can focus entirely on reconciliation logic.
Next in this series: CRD: Extending the Kubernetes API with Custom Resources (Part 8)
Follow the series for more deep dives into Kubernetes development.
Top comments (0)