DEV Community

James Lee
James Lee

Posted on

client-go Deep Dive: Controller — The Central Hub That Wires the Informer Framework Together

Over the past six articles we've examined each component of the Informer framework individually. Now it's time to look at the component that wires them all together: the Informer-internal Controller.

⚠️ Naming clarification: The "Controller" in this article is k8s.io/client-go/tools/cache/controller.go — the Informer-internal orchestrator. It is NOT the same as the business-logic controller you write to reconcile custom resources. We'll call it the Informer Controller throughout this article to avoid confusion.

Source: k8s.io/client-go/tools/cache/controller.go


1. The Two Types of "Controller" in Kubernetes

Before diving in, let's be precise about terminology:

┌──────────────────────────────────────────────────────────────────┐
│  Type A: Informer Controller (this article)                      │
│  Package: k8s.io/client-go/tools/cache                          │
│  Role: internal orchestrator — starts Reflector, drives          │
│        processLoop, connects DeltaFIFO → Indexer + callbacks     │
│  You never instantiate this directly.                            │
└──────────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────┐
│  Type B: Your Application Controller                             │
│  Package: your codebase                                          │
│  Role: business logic — reconciles desired vs actual state       │
│        using WorkQueue + Lister + Clientset                      │
│  You write and own this.                                         │
└──────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

2. The Controller Struct

// k8s.io/client-go/tools/cache/controller.go
type controller struct {
    config         Config          // all dependencies injected here
    reflector      *Reflector      // created from config at startup
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}
Enter fullscreen mode Exit fullscreen mode

The controller itself is intentionally thin — all meaningful configuration lives in Config:

type Config struct {
    // DeltaFIFO — the event queue (producer: Reflector, consumer: processLoop)
    Queue

    // Provides List() and Watch() — injected into Reflector at startup
    ListerWatcher

    // The callback invoked by processLoop for each item popped from DeltaFIFO
    // In practice: sharedIndexInformer.HandleDeltas
    Process ProcessFunc

    // The resource type being watched (e.g. *v1.Pod)
    ObjectType runtime.Object

    // How often to force a full resync (0 = never)
    FullResyncPeriod time.Duration

    // Optional: custom function to decide whether to resync on each tick
    ShouldResync ShouldResyncFunc

    // If true: when Process() returns an error, re-enqueue the item
    RetryOnError bool

    // Called when Watch() returns an error
    WatchErrorHandler WatchErrorHandler

    // Pagination size for Watch requests
    WatchListPageSize int64
}
Enter fullscreen mode Exit fullscreen mode

Config field map

Field Provided by Used for
Queue DeltaFIFO Event buffer between Reflector and processLoop
ListerWatcher Pod/Deployment Informer's ListFunc+WatchFunc Injected into Reflector
Process sharedIndexInformer.HandleDeltas Callback for each popped event batch
ObjectType e.g. &v1.Pod{} Tells Reflector what type to deserialize
FullResyncPeriod User config (e.g. 30*time.Minute) Periodic forced re-List
RetryOnError Usually false Re-enqueue on Process() failure
WatchErrorHandler Optional custom handler React to Watch connection errors

3. How the Informer Controller Starts: Run()

func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()

    // ① Create Reflector from Config
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,       // DeltaFIFO is passed as the Store
        c.config.FullResyncPeriod,
    )
    r.ShouldResync   = c.config.ShouldResync
    r.WatchListPageSize = c.config.WatchListPageSize
    r.clock          = c.clock
    if c.config.WatchErrorHandler != nil {
        r.watchErrorHandler = c.config.WatchErrorHandler
    }

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()

    // ② Start Reflector in a goroutine — runs List/Watch forever
    wg.StartWithChannel(stopCh, r.Run)

    // ③ Start processLoop — consumes DeltaFIFO forever
    wait.Until(c.processLoop, time.Second, stopCh)
}
Enter fullscreen mode Exit fullscreen mode

Startup sequence:

controller.Run(stopCh)
     
     ├──  NewReflector(ListerWatcher, ObjectType, DeltaFIFO, resyncPeriod)
               Reflector is wired to DeltaFIFO as its Store
     
     ├──  go r.Run(stopCh)
               Reflector.ListAndWatch() starts in background goroutine
               Phase 1: List  syncWith()  DeltaFIFO seeded
               Phase 2: Watch  watchHandler()  events stream into DeltaFIFO
     
     └──  wait.Until(c.processLoop, 1s, stopCh)
                processLoop runs in foreground, restarted every 1s if it exits
Enter fullscreen mode Exit fullscreen mode

4. The processLoop: Consuming DeltaFIFO

func (c *controller) processLoop() {
    for {
        // Pop() blocks until DeltaFIFO has an item
        // Passes each item to c.config.Process (= HandleDeltas)
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == ErrFIFOClosed {
                return  // queue was shut down — exit cleanly
            }
            if c.config.RetryOnError {
                // Re-enqueue the item if RetryOnError is set
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

processLoop flow:

processLoop (runs forever)
     
     └── DeltaFIFO.Pop(HandleDeltas)
           
           ├── blocks on cond.Wait() if queue is empty
           ├── dequeues next key (FIFO order)
           └── calls HandleDeltas([]Delta)
                      
                      ├── Indexer.Add/Update/Delete    keep local cache in sync
                      └── sharedProcessor.distribute()  fan-out to user callbacks
Enter fullscreen mode Exit fullscreen mode

5. The Complete Informer Framework — All Components Together

Now that we've covered every component, here is the complete picture:

┌──────────────────────────────────────────────────────────────────────────┐
│                    Informer Framework (full picture)                     │
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐    │
│  │                    sharedIndexInformer                           │    │
│  │                                                                  │    │
│  │  ┌─────────────────────────────────────────────────────────┐    │    │
│  │  │              Informer Controller (internal)             │    │    │
│  │  │                                                         │    │    │
│  │  │   API Server                                            │    │    │
│  │  │       │  List/Watch (HTTP chunked)                      │    │    │
│  │  │       ▼                                                 │    │    │
│  │  │  ┌──────────┐  events   ┌──────────────┐               │    │    │
│  │  │  │ Reflector│ ────────► │  DeltaFIFO   │               │    │    │
│  │  │  │(goroutine│           │  queue+items │               │    │    │
│  │  │  └──────────┘           └──────┬───────┘               │    │    │
│  │  │                                │ Pop()                  │    │    │
│  │  │                                ▼                        │    │    │
│  │  │                        processLoop()                    │    │    │
│  │  │                                │                        │    │    │
│  │  └────────────────────────────────┼────────────────────────┘    │    │
│  │                                   │ HandleDeltas                 │    │
│  │                    ┌──────────────┴──────────────┐              │    │
│  │                    ▼                             ▼              │    │
│  │             ┌────────────┐            ┌──────────────────┐      │    │
│  │             │  Indexer   │            │ sharedProcessor  │      │    │
│  │             │(ThreadSafe │            │  distribute()    │      │    │
│  │             │   Map)     │            └────────┬─────────┘      │    │
│  │             └────────────┘                     │                │    │
│  │                    ▲                           ▼                │    │
│  │                    │ Lister             AddEventHandler         │    │
│  └────────────────────┼───────────────────────────┼───────────────┘    │
│                       │                           │                     │
│              ┌────────┴──────────────────────────▼──────────────┐      │
│              │              Your Application Controller          │      │
│              │                                                   │      │
│              │  podsLister.Pods(ns).Get(name)  ← read from cache │      │
│              │  clientset.CoreV1().Pods().Update() ← write to API│      │
│              │  workqueue.AddRateLimited(key)   ← retry on error │      │
│              │  recorder.Eventf(obj, ...)       ← emit events    │      │
│              └───────────────────────────────────────────────────┘      │
└──────────────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

6. Data Flow: End to End

 factory.Start(stopCh)
   └── for each registered informer:
         go sharedIndexInformer.Run(stopCh)
              └── controller.Run(stopCh)
                    ├── go Reflector.Run(stopCh)   [goroutine A]
                    └── processLoop()              [goroutine B, main]

 Goroutine A  Reflector
   └── ListAndWatch()
         ├── Phase 1: List  syncWith()  DeltaFIFO.Replace()
              all existing objects enqueued as "Added" or "Replaced"
         └── Phase 2: Watch  watchHandler()  DeltaFIFO.Add/Update/Delete
               incremental events stream in continuously

 Goroutine B  processLoop
   └── DeltaFIFO.Pop(HandleDeltas)  [blocks when empty]
         └── HandleDeltas([]Delta)
               for each Delta:
               ├── Indexer.Add/Update/Delete    local cache updated
               └── sharedProcessor.distribute()
                     └── for each listener (AddEventHandler):
                           ├── OnAdd(newObj)
                           ├── OnUpdate(oldObj, newObj)
                           └── OnDelete(oldObj)
                                 └── workqueue.Add(key)

 Your controller worker goroutine(s)
   └── workqueue.Get(key)
         └── reconcile(key)
               ├── lister.Get(key)               read from Indexer (no API call)
               ├── clientset.Update/Patch/Delete  write to API Server
               ├── recorder.Eventf(...)           emit Event to etcd
               └── on error: workqueue.AddRateLimited(key)
Enter fullscreen mode Exit fullscreen mode

7. WaitForCacheSync: Why It Matters

Before your controller starts processing, the local cache must be fully populated from the initial List. WaitForCacheSync blocks until this is complete:

func (kc *KubeController) Run(stopCh chan struct{}) {
    // Start all informers
    kc.factory.Start(stopCh)

    // Block until initial List is complete and DeltaFIFO is drained
    // HasSynced() returns true when initialPopulationCount reaches 0
    if !cache.WaitForCacheSync(stopCh,
        kc.podsSynced,
        kc.deploymentsSynced,
    ) {
        utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
        return
    }

    // Safe to start workers now — Indexer has a complete snapshot
    for i := 0; i < workers; i++ {
        go wait.Until(kc.runWorker, time.Second, stopCh)
    }

    <-stopCh
}
Enter fullscreen mode Exit fullscreen mode

What HasSynced() tracks internally:

DeltaFIFO.initialPopulationCount
     
     ├── set to N during Replace() (initial List result: N objects)
     ├── decremented by 1 on each Pop()
     └── HasSynced() = true when initialPopulationCount == 0
                        AND at least one Replace() has been called
Enter fullscreen mode Exit fullscreen mode

Never start reconciliation workers before WaitForCacheSync returns. If you do, your Lister queries will return incomplete results and your controller will make incorrect decisions based on a partial view of cluster state.


8. Series Summary: The Complete Component Map

We've now covered every component in the Informer framework. Here's the complete reference:

Component Package Role Article
Reflector tools/cache List/Watch against API Server; feeds DeltaFIFO Part 2
DeltaFIFO tools/cache FIFO event queue; stores {type, object} deltas Part 3
Indexer tools/cache Thread-safe in-memory cache with custom indexing Part 4
WorkQueue util/workqueue Rate-limited, deduplicated task queue for workers Part 5
EventBroadcaster tools/record Emits structured Events to API Server / logs Part 6
Controller (internal) tools/cache Orchestrates Reflector + DeltaFIFO + processLoop Part 7 (this)
sharedProcessor tools/cache Fan-out: delivers events to all AddEventHandler listeners
SharedInformerFactory informers Ensures one Reflector per resource type across all controllers Part 1

9. Summary

Informer Controller = orchestrator, not business logic

controller.Run(stopCh)
     
     ├── NewReflector(ListerWatcher, ObjectType, DeltaFIFO)
               wires API Server connection to event queue
     
     ├── go Reflector.Run()
               List (seed cache) + Watch (stream events) forever
     
     └── processLoop()
                Pop from DeltaFIFO
                HandleDeltas:
                     Indexer (storage) + sharedProcessor (callbacks)
Enter fullscreen mode Exit fullscreen mode
Concept Detail
controller struct Thin wrapper — holds Config + Reflector reference
Config.Queue DeltaFIFO — the handoff point between Reflector and processLoop
Config.Process HandleDeltas — routes events to Indexer and sharedProcessor
Config.RetryOnError Re-enqueues items when HandleDeltas returns an error
processLoop Infinite loop: Pop → HandleDeltas → repeat
WaitForCacheSync Safety gate — ensures Indexer is fully populated before workers start

The Informer Controller is the invisible backbone of every Kubernetes controller. By orchestrating Reflector, DeltaFIFO, Indexer, and sharedProcessor into a single coherent pipeline, it gives you a reliable, consistent, and efficient view of cluster state — so your application controller can focus entirely on reconciliation logic.


Next in this series: CRD: Extending the Kubernetes API with Custom Resources (Part 8)


Follow the series for more deep dives into Kubernetes development.

Top comments (0)