In the previous article we saw that Informer's core mechanism is List/Watch. The component responsible for executing that mechanism is Reflector. Every time an Informer starts up, it's Reflector that connects to the API Server, fetches the full resource snapshot, and then keeps watching for changes.
Source: k8s.io/client-go/tools/cache/reflector.go
1. Where Reflector Fits
Informer starts
│
▼
┌──────────────────────────────────────────────────┐
│ Reflector │
│ │
│ Phase 1: LIST │
│ ├── call listerWatcher.List() │
│ ├── get ResourceVersion │
│ ├── extract object list │
│ └── syncWith() → store all objects in DeltaFIFO │
│ │
│ Phase 2: WATCH │
│ ├── call listerWatcher.Watch() │
│ ├── receive incremental events (HTTP chunked) │
│ └── watchHandler() → push events to DeltaFIFO │
└──────────────────────────────────────────────────┘
│
▼
DeltaFIFO (event queue)
2. The Reflector Struct
// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
name string
expectedTypeName string
expectedType reflect.Type // the resource type being watched (e.g. *v1.Pod)
expectedGVK *schema.GroupVersionKind
store Store // DeltaFIFO — where events are written
listerWatcher ListerWatcher // the actual List/Watch implementation
backoffManager wait.BackoffManager // retry backoff for Watch reconnects
initConnBackoffManager wait.BackoffManager // backoff for initial connection
resyncPeriod time.Duration // how often to force a full resync (0 = never)
ShouldResync func() bool // optional: custom resync decision function
clock clock.Clock
paginatedResult bool
// ResourceVersion tracking — critical for Watch correctness
lastSyncResourceVersion string
isLastSyncResourceVersionUnavailable bool
lastSyncResourceVersionMutex sync.RWMutex
WatchListPageSize int64
watchErrorHandler WatchErrorHandler
}
Key fields explained:
| Field | Purpose |
|---|---|
store |
The DeltaFIFO queue — Reflector writes all events here |
listerWatcher |
Interface that provides the actual List() and Watch() calls |
lastSyncResourceVersion |
Tracks the latest version seen — Watch resumes from this point after reconnect |
resyncPeriod |
If > 0, Reflector periodically forces a full re-List to catch any missed events |
backoffManager |
Exponential backoff when Watch connection drops |
3. The ListerWatcher Interface
listerWatcher is an interface — Reflector doesn't know or care which resource type it's watching. The concrete implementation is injected per resource type.
type ListerWatcher interface {
Lister
Watcher
}
type Lister interface {
List(options metav1.ListOptions) (runtime.Object, error)
}
type Watcher interface {
Watch(options metav1.ListOptions) (watch.Interface, error)
}
For a Pod Informer, the concrete implementation looks like this:
// k8s.io/client-go/informers/core/v1/pods.go
func NewFilteredPodInformer(
client kubernetes.Interface,
namespace string,
resyncPeriod time.Duration,
indexers cache.Indexers,
tweakListOptions internalinterfaces.TweakListOptionsFunc,
) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
// List: calls the real Kubernetes API
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
// Watch: opens a long-lived HTTP connection to the API Server
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
This is why Clientset must be created before SharedInformerFactory. The
ListFuncandWatchFuncare closures over the Clientset — without it, Reflector has no way to talk to the API Server.
func NewKubeController(...) *KubeController {
kc := &KubeController{clientset: clientset}
// Clientset is passed in here — it flows down into every ListFunc/WatchFunc
kc.factory = informers.NewSharedInformerFactory(clientset, defaultResync)
kc.podInformer = kc.factory.Core().V1().Pods()
kc.podsLister = kc.podInformer.Lister()
kc.podsSynced = kc.podInformer.Informer().HasSynced
kc.deploymentInformer = kc.factory.Apps().V1().Deployments()
kc.deploymentsLister = kc.deploymentInformer.Lister()
kc.deploymentsSynced = kc.deploymentInformer.Informer().HasSynced
return kc
}
4. ListAndWatch: Phase 1 — Full List
The ListAndWatch method is the heart of Reflector. Phase 1 fetches the complete current state:
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
// ① Fetch all resources (with pagination support)
go func() {
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts) // → calls ListFunc → Clientset.CoreV1().Pods().List()
}))
list, paginatedResult, err = pager.List(context.Background(), options)
// handle expired ResourceVersion: retry with fresh version
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
list, paginatedResult, err = pager.List(context.Background(),
metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
// wait for list to complete or stop signal
select {
case <-stopCh: return nil
case r := <-panicCh: panic(r)
case <-listCh:
}
// ② Extract ResourceVersion from the list response
listMetaInterface, _ := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
// ③ Convert list result into a slice of runtime.Object
items, _ := meta.ExtractList(list)
// ④ Store all objects + ResourceVersion into DeltaFIFO
r.syncWith(items, resourceVersion)
return nil
}(); err != nil {
return err
}
// ⑤ Record the latest ResourceVersion — Watch will resume from here
r.setLastSyncResourceVersion(resourceVersion)
...
}
Phase 1 call chain:
ListAndWatch()
│
├── ① r.listerWatcher.List() → fetch all objects from API Server
├── ② listMetaInterface.GetResourceVersion() → extract version stamp
├── ③ meta.ExtractList() → convert to []runtime.Object
├── ④ r.syncWith() → write all objects into DeltaFIFO
└── ⑤ r.setLastSyncResourceVersion() → save version for Watch resume point
5. ListAndWatch: Phase 2 — Continuous Watch
After the full List completes, Reflector enters an infinite loop watching for incremental changes:
// Periodic resync goroutine (if resyncPeriod > 0)
go func() {
for {
select {
case <-resyncCh:
if r.ShouldResync == nil || r.ShouldResync() {
r.store.Resync() // force re-sync of Indexer from DeltaFIFO
}
case <-stopCh:
return
}
}
}()
// Main Watch loop
for {
select {
case <-stopCh:
return nil
default:
}
// Add randomized timeout to prevent thundering herd on reconnect
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion, // resume from last known version
TimeoutSeconds: &timeoutSeconds,
AllowWatchBookmarks: true,
}
// ① Open a long-lived Watch connection to the API Server
w, err := r.listerWatcher.Watch(options)
if err != nil {
if utilnet.IsConnectionRefused(err) {
<-r.initConnBackoffManager.Backoff().C() // backoff and retry
continue
}
return err
}
// ② Process incoming events: write to DeltaFIFO + update ResourceVersion
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if !isExpiredError(err) {
// log unexpected errors
}
return nil
}
}
Phase 2 call chain:
ListAndWatch() — Watch loop
│
├── ① r.listerWatcher.Watch() → open HTTP long-poll to API Server
│ (passes ResourceVersion to resume from)
└── ② r.watchHandler() → for each incoming event:
write object to DeltaFIFO
update lastSyncResourceVersion
6. ResourceVersion: The Key to Reliable Watch
ResourceVersion is a monotonically increasing version stamp that etcd assigns to every resource object. It's the mechanism that makes Watch reliable:
List response: ResourceVersion = "1000"
(all objects as of version 1000)
│
▼
Watch request: ResourceVersion = "1000"
(give me all events AFTER version 1000)
│
▼
API Server streams:
- Pod "nginx" Updated (RV=1001)
- Pod "redis" Deleted (RV=1002)
- ...
If Watch connection drops:
│
▼
Reflector reconnects with: ResourceVersion = "1002"
(resumes exactly where it left off — no events missed, no duplicates)
| Scenario | ResourceVersion behavior |
|---|---|
| Fresh start |
"" or "0" — get latest snapshot |
| After List | Set to the version returned by List response |
| After each Watch event | Updated to the event's ResourceVersion |
| Watch reconnect | Resumes from lastSyncResourceVersion
|
| Expired version (too old) | Triggers a fresh List from scratch |
7. Under the Hood: How Watch Works (HTTP Chunked Transfer)
The Watch connection is not a WebSocket or gRPC stream — it's a plain HTTP/1.1 long-lived connection using Chunked Transfer Encoding.
Client (Reflector) API Server
│ │
│ GET /api/v1/pods?watch=true │
│ ResourceVersion=1000 │
│ ──────────────────────────────────► │
│ │
│ HTTP/1.1 200 OK │
│ Transfer-Encoding: chunked │
│ ◄────────────────────────────────── │
│ │
│ chunk: {"type":"ADDED","object":…} │
│ ◄────────────────────────────────── │
│ │
│ chunk: {"type":"MODIFIED","object":…}
│ ◄────────────────────────────────── │
│ │
│ (connection stays open…) │
│ (new chunks arrive as events occur)│
Why Chunked Transfer Encoding?
In standard HTTP, the Content-Length header tells the client how many bytes to expect. But for a Watch stream, the server doesn't know in advance how many events will occur — the stream is open-ended.
Chunked Transfer Encoding solves this:
- The response body is split into variable-size chunks
- Each chunk is prefixed with its size in hex
- The server sends chunks as events arrive — no pre-declared content length needed
- A zero-length chunk signals the end of the stream
HTTP/1.1 200 OK
Transfer-Encoding: chunked
1a\r\n ← chunk size (hex)
{"type":"ADDED","object":…}\r\n ← chunk data
\r\n
2f\r\n
{"type":"MODIFIED","object":…}\r\n
\r\n
0\r\n ← end of stream
\r\n
This is only available in HTTP/1.1 and later. It's what makes Kubernetes Watch efficient — a single persistent connection replaces thousands of polling requests.
8. Summary
Reflector.Run()
│
└── ListAndWatch()
│
├── Phase 1: LIST
│ ├── listerWatcher.List() → full snapshot from API Server
│ ├── GetResourceVersion() → stamp the snapshot version
│ ├── meta.ExtractList() → parse into objects
│ └── syncWith() → DeltaFIFO → seed the local cache
│
└── Phase 2: WATCH (infinite loop)
├── listerWatcher.Watch(RV) → HTTP chunked long-poll
├── watchHandler() → DeltaFIFO → stream events to queue
└── on disconnect: backoff + retry
on expired RV: re-List from scratch
| Concept | Detail |
|---|---|
| ListerWatcher | Interface injected per resource type; backed by Clientset API calls |
| ResourceVersion | etcd version stamp; enables Watch to resume without missing events |
| DeltaFIFO | The downstream queue; Reflector is its sole producer |
| Chunked Transfer | HTTP/1.1 mechanism that keeps the Watch connection open indefinitely |
| Backoff | Exponential retry when Watch connection drops or API Server is unavailable |
Reflector is the bridge between the Kubernetes API Server and the local Informer cache. Once you understand its List → syncWith → Watch → watchHandler pipeline, the reliability guarantees of the entire Informer framework become clear.
Next in this series: DeltaFIFO: The Event Queue Behind Informer (Part 3)
Follow the series for more deep dives into Kubernetes development.
Top comments (0)