DEV Community

James Lee
James Lee

Posted on

client-go Deep Dive: Reflector — How Kubernetes Syncs Resources from the API Server

In the previous article we saw that Informer's core mechanism is List/Watch. The component responsible for executing that mechanism is Reflector. Every time an Informer starts up, it's Reflector that connects to the API Server, fetches the full resource snapshot, and then keeps watching for changes.

Source: k8s.io/client-go/tools/cache/reflector.go


1. Where Reflector Fits

Informer starts
     │
     ▼
┌──────────────────────────────────────────────────┐
│                   Reflector                      │
│                                                  │
│  Phase 1: LIST                                   │
│  ├── call listerWatcher.List()                   │
│  ├── get ResourceVersion                         │
│  ├── extract object list                         │
│  └── syncWith() → store all objects in DeltaFIFO │
│                                                  │
│  Phase 2: WATCH                                  │
│  ├── call listerWatcher.Watch()                  │
│  ├── receive incremental events (HTTP chunked)   │
│  └── watchHandler() → push events to DeltaFIFO  │
└──────────────────────────────────────────────────┘
     │
     ▼
DeltaFIFO (event queue)
Enter fullscreen mode Exit fullscreen mode

2. The Reflector Struct

// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
    name             string
    expectedTypeName string
    expectedType     reflect.Type          // the resource type being watched (e.g. *v1.Pod)
    expectedGVK      *schema.GroupVersionKind

    store            Store                 // DeltaFIFO — where events are written
    listerWatcher    ListerWatcher         // the actual List/Watch implementation

    backoffManager         wait.BackoffManager  // retry backoff for Watch reconnects
    initConnBackoffManager wait.BackoffManager  // backoff for initial connection

    resyncPeriod     time.Duration         // how often to force a full resync (0 = never)
    ShouldResync     func() bool           // optional: custom resync decision function

    clock            clock.Clock
    paginatedResult  bool

    // ResourceVersion tracking — critical for Watch correctness
    lastSyncResourceVersion              string
    isLastSyncResourceVersionUnavailable bool
    lastSyncResourceVersionMutex         sync.RWMutex

    WatchListPageSize  int64
    watchErrorHandler  WatchErrorHandler
}
Enter fullscreen mode Exit fullscreen mode

Key fields explained:

Field Purpose
store The DeltaFIFO queue — Reflector writes all events here
listerWatcher Interface that provides the actual List() and Watch() calls
lastSyncResourceVersion Tracks the latest version seen — Watch resumes from this point after reconnect
resyncPeriod If > 0, Reflector periodically forces a full re-List to catch any missed events
backoffManager Exponential backoff when Watch connection drops

3. The ListerWatcher Interface

listerWatcher is an interface — Reflector doesn't know or care which resource type it's watching. The concrete implementation is injected per resource type.

type ListerWatcher interface {
    Lister
    Watcher
}

type Lister interface {
    List(options metav1.ListOptions) (runtime.Object, error)
}

type Watcher interface {
    Watch(options metav1.ListOptions) (watch.Interface, error)
}
Enter fullscreen mode Exit fullscreen mode

For a Pod Informer, the concrete implementation looks like this:

// k8s.io/client-go/informers/core/v1/pods.go
func NewFilteredPodInformer(
    client kubernetes.Interface,
    namespace string,
    resyncPeriod time.Duration,
    indexers cache.Indexers,
    tweakListOptions internalinterfaces.TweakListOptionsFunc,
) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            // List: calls the real Kubernetes API
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).List(context.TODO(), options)
            },
            // Watch: opens a long-lived HTTP connection to the API Server
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}
Enter fullscreen mode Exit fullscreen mode

This is why Clientset must be created before SharedInformerFactory. The ListFunc and WatchFunc are closures over the Clientset — without it, Reflector has no way to talk to the API Server.

func NewKubeController(...) *KubeController {
    kc := &KubeController{clientset: clientset}

    // Clientset is passed in here — it flows down into every ListFunc/WatchFunc
    kc.factory = informers.NewSharedInformerFactory(clientset, defaultResync)

    kc.podInformer      = kc.factory.Core().V1().Pods()
    kc.podsLister       = kc.podInformer.Lister()
    kc.podsSynced       = kc.podInformer.Informer().HasSynced
    kc.deploymentInformer   = kc.factory.Apps().V1().Deployments()
    kc.deploymentsLister    = kc.deploymentInformer.Lister()
    kc.deploymentsSynced    = kc.deploymentInformer.Informer().HasSynced
    return kc
}
Enter fullscreen mode Exit fullscreen mode

4. ListAndWatch: Phase 1 — Full List

The ListAndWatch method is the heart of Reflector. Phase 1 fetches the complete current state:

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    var resourceVersion string
    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    if err := func() error {
        // ① Fetch all resources (with pagination support)
        go func() {
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)   // → calls ListFunc → Clientset.CoreV1().Pods().List()
            }))
            list, paginatedResult, err = pager.List(context.Background(), options)
            // handle expired ResourceVersion: retry with fresh version
            if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                list, paginatedResult, err = pager.List(context.Background(),
                    metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
            }
            close(listCh)
        }()

        // wait for list to complete or stop signal
        select {
        case <-stopCh:   return nil
        case r := <-panicCh: panic(r)
        case <-listCh:
        }

        // ② Extract ResourceVersion from the list response
        listMetaInterface, _ := meta.ListAccessor(list)
        resourceVersion = listMetaInterface.GetResourceVersion()

        // ③ Convert list result into a slice of runtime.Object
        items, _ := meta.ExtractList(list)

        // ④ Store all objects + ResourceVersion into DeltaFIFO
        r.syncWith(items, resourceVersion)

        return nil
    }(); err != nil {
        return err
    }

    // ⑤ Record the latest ResourceVersion — Watch will resume from here
    r.setLastSyncResourceVersion(resourceVersion)
    ...
}
Enter fullscreen mode Exit fullscreen mode

Phase 1 call chain:

ListAndWatch()
  │
  ├── ① r.listerWatcher.List()        → fetch all objects from API Server
  ├── ② listMetaInterface.GetResourceVersion()  → extract version stamp
  ├── ③ meta.ExtractList()            → convert to []runtime.Object
  ├── ④ r.syncWith()                  → write all objects into DeltaFIFO
  └── ⑤ r.setLastSyncResourceVersion() → save version for Watch resume point
Enter fullscreen mode Exit fullscreen mode

5. ListAndWatch: Phase 2 — Continuous Watch

After the full List completes, Reflector enters an infinite loop watching for incremental changes:

    // Periodic resync goroutine (if resyncPeriod > 0)
    go func() {
        for {
            select {
            case <-resyncCh:
                if r.ShouldResync == nil || r.ShouldResync() {
                    r.store.Resync()   // force re-sync of Indexer from DeltaFIFO
                }
            case <-stopCh:
                return
            }
        }
    }()

    // Main Watch loop
    for {
        select {
        case <-stopCh:
            return nil
        default:
        }

        // Add randomized timeout to prevent thundering herd on reconnect
        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion:     resourceVersion,  // resume from last known version
            TimeoutSeconds:      &timeoutSeconds,
            AllowWatchBookmarks: true,
        }

        // ① Open a long-lived Watch connection to the API Server
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            if utilnet.IsConnectionRefused(err) {
                <-r.initConnBackoffManager.Backoff().C()  // backoff and retry
                continue
            }
            return err
        }

        // ② Process incoming events: write to DeltaFIFO + update ResourceVersion
        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if !isExpiredError(err) {
                // log unexpected errors
            }
            return nil
        }
    }
Enter fullscreen mode Exit fullscreen mode

Phase 2 call chain:

ListAndWatch() — Watch loop
  │
  ├── ① r.listerWatcher.Watch()    → open HTTP long-poll to API Server
  │                                   (passes ResourceVersion to resume from)
  └── ② r.watchHandler()           → for each incoming event:
                                       write object to DeltaFIFO
                                       update lastSyncResourceVersion
Enter fullscreen mode Exit fullscreen mode

6. ResourceVersion: The Key to Reliable Watch

ResourceVersion is a monotonically increasing version stamp that etcd assigns to every resource object. It's the mechanism that makes Watch reliable:

List response:  ResourceVersion = "1000"
                (all objects as of version 1000)
     │
     ▼
Watch request:  ResourceVersion = "1000"
                (give me all events AFTER version 1000)
     │
     ▼
API Server streams:
  - Pod "nginx" Updated  (RV=1001)
  - Pod "redis" Deleted  (RV=1002)
  - ...

If Watch connection drops:
     │
     ▼
Reflector reconnects with:  ResourceVersion = "1002"
(resumes exactly where it left off — no events missed, no duplicates)
Enter fullscreen mode Exit fullscreen mode
Scenario ResourceVersion behavior
Fresh start "" or "0" — get latest snapshot
After List Set to the version returned by List response
After each Watch event Updated to the event's ResourceVersion
Watch reconnect Resumes from lastSyncResourceVersion
Expired version (too old) Triggers a fresh List from scratch

7. Under the Hood: How Watch Works (HTTP Chunked Transfer)

The Watch connection is not a WebSocket or gRPC stream — it's a plain HTTP/1.1 long-lived connection using Chunked Transfer Encoding.

Client (Reflector)                    API Server
     │                                     │
     │  GET /api/v1/pods?watch=true        │
     │  ResourceVersion=1000               │
     │ ──────────────────────────────────► │
     │                                     │
     │  HTTP/1.1 200 OK                    │
     │  Transfer-Encoding: chunked         │
     │ ◄────────────────────────────────── │
     │                                     │
     │  chunk: {"type":"ADDED","object":…} │
     │ ◄────────────────────────────────── │
     │                                     │
     │  chunk: {"type":"MODIFIED","object":…}
     │ ◄────────────────────────────────── │
     │                                     │
     │  (connection stays open…)           │
     │  (new chunks arrive as events occur)│
Enter fullscreen mode Exit fullscreen mode

Why Chunked Transfer Encoding?

In standard HTTP, the Content-Length header tells the client how many bytes to expect. But for a Watch stream, the server doesn't know in advance how many events will occur — the stream is open-ended.

Chunked Transfer Encoding solves this:

  • The response body is split into variable-size chunks
  • Each chunk is prefixed with its size in hex
  • The server sends chunks as events arrive — no pre-declared content length needed
  • A zero-length chunk signals the end of the stream
HTTP/1.1 200 OK
Transfer-Encoding: chunked

1a\r\n                          ← chunk size (hex)
{"type":"ADDED","object":…}\r\n ← chunk data
\r\n
2f\r\n
{"type":"MODIFIED","object":…}\r\n
\r\n
0\r\n                           ← end of stream
\r\n
Enter fullscreen mode Exit fullscreen mode

This is only available in HTTP/1.1 and later. It's what makes Kubernetes Watch efficient — a single persistent connection replaces thousands of polling requests.


8. Summary

Reflector.Run()
     │
     └── ListAndWatch()
           │
           ├── Phase 1: LIST
           │     ├── listerWatcher.List()          → full snapshot from API Server
           │     ├── GetResourceVersion()           → stamp the snapshot version
           │     ├── meta.ExtractList()             → parse into objects
           │     └── syncWith() → DeltaFIFO         → seed the local cache
           │
           └── Phase 2: WATCH (infinite loop)
                 ├── listerWatcher.Watch(RV)        → HTTP chunked long-poll
                 ├── watchHandler() → DeltaFIFO     → stream events to queue
                 └── on disconnect: backoff + retry
                     on expired RV: re-List from scratch
Enter fullscreen mode Exit fullscreen mode
Concept Detail
ListerWatcher Interface injected per resource type; backed by Clientset API calls
ResourceVersion etcd version stamp; enables Watch to resume without missing events
DeltaFIFO The downstream queue; Reflector is its sole producer
Chunked Transfer HTTP/1.1 mechanism that keeps the Watch connection open indefinitely
Backoff Exponential retry when Watch connection drops or API Server is unavailable

Reflector is the bridge between the Kubernetes API Server and the local Informer cache. Once you understand its List → syncWith → Watch → watchHandler pipeline, the reliability guarantees of the entire Informer framework become clear.


Next in this series: DeltaFIFO: The Event Queue Behind Informer (Part 3)


Follow the series for more deep dives into Kubernetes development.

Top comments (0)