---
date: 2025-10-04T13:50:47+08:00
title: Informer原理详解
tags: [k8s,源码,informer]
categories: [源码阅读,client-go]
draft: false
---
> [!note]
>
> 基于 client-go@v0.31.13
## informer 介绍
informer 是 k8s 客户端库提供的一个组件,用于 **资源变更监听+资源缓存**,用于高效感知 k8s 集群中的资源变化。
实际上它就是构建用户控制器 Controller 的基础,Controller 一般是用来监听 k8s 中资源的状态更新然后我们去写业务逻辑代码对资源进行调谐,而所谓“监听”的功能就是 informer 实现的,即:
- Informer:负责监听和缓存资源变化
- Controller:负责消费这些变化,比如执行 **Reconcile**(调谐逻辑)
当然,informer 可以单独拿来用,不与 Controller 强绑定,监听资源不一定就是做资源的调谐,还可以去做一些比如 k8s 资源实时可视化的功能,我这里只是拿 informer+Controller 来举例子,因为它比较常见。
## informer 架构

图来自 [client-go under the hood](https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md)。
这张图清晰地表达了 informer+controller 所涉及到的组件以及它们的各自的组件边界,上半部分是 client-go 库内部实现 informer 的相关组件,下半部分是用户自定义 Controller。我们下面都会围绕这张架构图来展开讨论。
informer 核心组件包括:
- **Reflector**:负责监听 API Server 中的资源变化,将这些变化包装成事件发送到 DeltaFIFO
- **DeltaFIFO**:先进先出的队列,队列元素是资源对象一段时间内的历史变更事件
- **Indexer**:存放全量的资源对象,并提供索引用于快速访问对象
- **Informer**:消费 DeltaFIFO 中的事件,并分发给 ResourceEventHandler 进行处理
以及用户代码部分的组件:
- ResourceEventHandler:用户注册的事件处理器,由 informer 进行调用。该回调函数的实现一般是拿到对象的 key 然后放进 workqueue 让 worker 协程去处理
- WorkQueue:用于解耦事件的接收与处理,用来做一些比如限流、出队策略等定制化操作
- ProcessItem:用于处理事件的函数,一般会持有 Indexer 的引用,通过 key 快速查询事件对应的资源对象
## 示例代码
talk is cheap,理解任何东西的原理之前首先得会用这个东西,因此先看看 informer 怎么在代码中使用,直接参考 [这篇文章](https://www.zhaohuabing.com/post/2023-03-09-how-to-create-a-k8s-controller/),里面的示例代码涵盖了 informer 的前生今世,从原始 http 请求、到 clientset、到 informer 再到 sharedIndexInformer 一步步是怎么演进的。
## Reflector
Reflector 负责监听 API Server 中的资源变化,具体来说,Reflector 首先通过 list 获取 apiserver 中的全量数据并通过 watch 持续监听增量变化。
Reflector 构造函数如下,创建一个 Reflector 必须指定一个 ListerWatcher 以及要监听的资源类型,由此可以看出,一个 reflector 只负责一种资源类型(比如只监听 pod 相关事件)
``` go
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector
```
ListerWatcher 是将 Lister 和 Watcher 两个接口合并成同一个接口(类似 ReaderWriter 那样),提供 List 和 Watch 的能力,具体实现是 cache.ListWatch:
``` go
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
}
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
return lw.ListFunc(options)
}
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
```
cache.ListWatch 具体的实现交给了 ListFunc 和 WatchFunc,这两个函数由具体的资源类型来实现,下面看看 pod 的 ListFunc 和 WatchFunc:
``` go
// client就是ClientSet
func NewFilteredPodInformer(client kubernetes.Interface, /* ... */) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
// List
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
// Watch
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
// ...
)
}
```
由此可以发现,Reflector 的本质是对 client 提供的 List 和 Watch 的进一步封装:
Reflector 会开一个协程不断地去监听资源保持缓存与 apiserver 同步。为了兼容不同 apiserver 版本,可能会细分为两种不同的同步方式。
第一种是传统的 ListWatch,通过分块/分页一次性 list 完成后关闭连接,再开新的 watch 长连接监听后续资源:
```
List(获取快照) → Watch(监听变化)
```
第二种是使用同一个 watch 连接,先发全量数据,继续使用同一条连接获取增量数据,即全程都使用流式传输的方式进行同步:
```
Watch(带 SendInitialEvents) → 收到所有 Added → Bookmark → 继续 Watch
```
相比第一种方式,流式 list 的好处在于这样可以降低 apiserver 的压力,详情可见 [proposal](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal)。下面是开始 ListWatch 的实现代码:
``` go
// ListWatch方法会获取全量资源对象以及持续监听后续变更
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error
var w watch.Interface
useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList
// stream方式list
if useWatchList {
w, err = r.watchList(stopCh)
if err != nil {
klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
fallbackToList = true
w = nil
}
}
// chunking方式list(fallback行为)
if fallbackToList {
err = r.list(stopCh)
if err != nil {
return err
}
}
// watch
return r.watchWithResync(w, stopCh)
}
```
下面把两种方式都介绍下,做个对比(代码省略了一些 err 处理)
### 流式 list
``` go
func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// ...
for {
// ...
// 最后一次观察到的最大RV,该RV作为List和Watch的分界点,即小于该RV的对象属于全量数据,后续的对象都属于增量数据。初始时还没有观察到任何数据,因此RV="",表示使用当前集群最大RV
lastKnownRV := r.rewatchResourceVersion()
// 临时存储接收到的全量
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
options := metav1.ListOptions{
ResourceVersion: lastKnownRV,
AllowWatchBookmarks: true,
// 为了以watch的方式进行list,使用该参数指定watch在发送增量数据之前还要发送当前的全量数据,并且这些全量数据以Added事件表示
SendInitialEvents: pointer.Bool(true),
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: &timeoutSeconds,
}
// 开始流式list
w, err = r.listerWatcher.Watch(options)
// ...
// 处理接收的到的数据,并存入temporaryStore中
watchListBookmarkReceived, err := handleListWatch(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string) { resourceVersion = rv },
r.clock, make(chan error), stopCh)
// ...
// 收到了Bookmark事件,意味着list已完成,否则进行下一次循环重试
if watchListBookmarkReceived {
break
}
}
// 使用Replace整体更新现有的缓存数据
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
}
// 更新观察到的最大的RV
r.setLastSyncResourceVersion(resourceVersion)
// 返回w,后续将使用同一个w进行watch
return w, nil
}
```
流式 list 的处理核心点在于对 bookmark 的处理,bookmark 是 list 与 watch 之间的分割点。
### 分块 list
不同于 watch 的持续监听事件流,list 的行为是发一次请求,就返回一次数据
```go
func (r *Reflector) list(stopCh <-chan struct{}) error {
// ...
go func() {
// 使用 pager 进行分页读取数据
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
// 决定是否分页、分页的大小
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
case options.ResourceVersion != "" && options.ResourceVersion != "0":
pager.PageSize = 0
}
// 开始分块 list
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
// ...
close(listCh)
}()
select {
// ...
case <-listCh:
}
// list 已结束
// 此后的 list 调用一直启用分页(详解见下面第 3 点)
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
//...
// 使用 Replace 整体更新现有的缓存数据
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
// ...
}
```
关于分页这块可以注意一下:
1. 目前没有开放的 API 让用户设置分页大小,因此分页大小使用默认的大小 500(每次最多返回 500 个对象)
2. 不一定会进行分页,即可能会一次性返回所有对象。原因与 [设置的参数/API 协议](https://kubernetes.io/zh-cn/docs/reference/using-api/api-concepts/#resource-versions) 有关,不过多深究,想了解的话可以看 3
3. (TL; DR)首次 list 时将会使用 RV = "0",在数据一致性方面,这意味这返回的数据是非强一致性的,可能是旧一点的数据,但读取效率比较高;在数据源方面,将会优先从 apiserver 的 watch cache 中读取,**watch cache 会忽略分页**,一次性返回所有数据,但如果 watch cache 没有启用,将会从 etcd 的 follower 或者 leader 读取。首次 list 结束之后,如果发现请求的 RV = "0" 并且数据是以分页的方式返回的,说明 watch cache 没有启用,并且返回的对象比较多,已经超过了一页,那么将 r.paginatedResult 设置为 true,以后的每次 list 都会使用分页的方式去拉数据。当 RV 等于某个精确值时,将始终从 watch cache 中拉数据,此时禁止分页。当 RV = "" 时,将从 etcd 拉数据,此时需要使用分页。
分块 list 的处理核心点在于是否分页以及分页大小。
### watch
watchList 或 list 完了之后,就开始持续不断地 watch 了。在 watch 的同时,还会周期性地执行 resync。resync 用于将不在 DeltaFIFO 中的本地缓存对象,以 Sync 事件放到 DeltaFIFO 中通知上层去处理一下它们。至于为什么要 resync,可以参考 [这个问答](https://github.com/cloudnativeto/sig-kubernetes/issues/11),以及 [这篇博客](https://gobomb.github.io/post/whats-resync-in-informer/)。
``` go
// watchWithResync runs watch with startResync in the background.
func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}) error {
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
// 定时resync
go r.startResync(stopCh, cancelCh, resyncerrc)
// watch
return r.watch(w, stopCh, resyncerrc)
}
```
resync 不是我们的重点,来看下 watch:
``` go
// watch simply starts a watch request with the server.
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
// ...
for {
// ...
// w==nil说明之前可能用的是分块list,主动发起Watch即可
if w == nil {
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
TimeoutSeconds: &timeoutSeconds,
AllowWatchBookmarks: true,
}
w, err = r.listerWatcher.Watch(options)
// ...
}
// 持续watch,处理接收到的事件并直接存入store中
err = handleWatch(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc, stopCh)
// ...
// 错误处理
// 可能会进行下一轮循环重新建立Watch,或者退出
}
}
```
## DeltaFIFO
DeltaFIFO 顾名思义是一个先进先出队列,队列元素是 Deltas,一个 Deltas 包含多个 Delta,Delta 是对象事件:
``` go
type Deltas []Delta
type Delta struct {
Type DeltaType // 在对象上发生的事件,比如新增、删除等
Object interface{} // 对象
}
```
因此每次从队列 pop 元素时,是 pop 对象的在这段时间内发生的所有事件,用张图举个例子:
按照时间序,在两个对象上发生了四个事件,最终 pop 的时候是以对象为粒度进行 pop,即先将对象 1 的两个事件作为整体 pop,再到对象 2。
DeltaFIFO 的代码部分就没必要细看了,知道它在 Pop 元素的时候会处理被 pop 的元素即可,Pop 简化一下大概如下:
``` go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 弹出队首元素(Deltas)
id := f.queue[0]
f.queue = f.queue[1:]
item, ok := f.items[id]
delete(f.items, id)
// 处理元素
err := process(item, isInInitialList)
// 将元素的所有权移交给外界
return item, err
}
}
```
## Indexer
Indexer 顾名思义是用来做索引的,索引什么呢?索引对象的 key 的。比如要获取 namespace = "default" 的所有的 pod,用 Indexer 就能快速得到这些对象的 key 集合(比如 "pod1"、"pod2" 这样的字符串集合),然后用这些 key 访问 map 获取对象。索引过程有点像倒排索引,即搜索属性为给定值的所有资源,比如 namespace 就是 pod 的属性。因此,索引的结构大概是这样的:
```json
{
// 属性名称
"namespace": {
// 属性值 对应的资源
"default": ["pod1", "pod2"],
"kube-system": ["pod3"],
},
"nodeName": {
"node-1": ["pod1", "pod3"],
"node-2": ["pod2"],
},
}
```
实际上 Indexer 的本质还是一个存储了所有对象的本地缓存,只不过在这之基础上提供了索引功能。
## 存储相关接口
上面说到的 Indexer 只是一个接口,可以看到 Indexer 是在 Store 接口基础上提供了索引的相关方法:
```go
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexers return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
AddIndexers(newIndexers Indexers) error
}
```
Indexer 的实现类是 cache,cache 又依赖于 threadSafeMap 提供索引功能。另外,DeltaFIFO 所实现的 Queue 接口其实也是 Store。
说到这里我觉得可以捋一下这些本地存储相关的各个接口的关系,因为看起来还挺乱的,类图如下:
Store 提供了最基础的 key-value 存储能力,但是要注意这里 key 是通过 value 计算出来的,所以可以看到 Store 的许多方法都只传 value 不需要传 key,比如插入一个对象时,key 是通过 keyFunc(obj)计算得到的:
``` go
Add(obj interface{}) error
```
需要注意的是,key 对应的 value 实际上叫 accumulator。accumulator 可以被实现为简单的一个 obj,即简单 kv 存储,比如图中的 cache;accumulator 也可以被实现为对象的集合,比如 DeltaFIFO 的实现中,accumulator 就是一个 Deltas。
Indexer 和 Queue 则是在这个 Store 的基础上扩展了其它能力:Indexer 提供了对 key 进行索引查找的能力,Queue 提供了对象先进先出的能力。
最后再来看看结构体,我们关注的是 cache 和 DeltaFIFO。cache 缓存了所有的对象,缓存的是 apiserver 中的对象,并且 cache 实现了 Indexer 提供索引查找的能力。cache 的实现很简单,因为具体的存储与索引实现放在了 threadSafeMap 中。DeltaFIFO 虽然是 Store,但目的不是存下所有对象,只是复用 Store 提供的方法,比如 Add 方法实际上类似 Push 的能力。
## Controller(Informer)
我们在代码中使用 `cache.NewInformer` 或者 `cache.NewIndexInformer` 时,会发现返回的是一个 `Controller` 接口:
``` go
type Controller interface {
// Run does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
Run(stopCh <-chan struct{})
// HasSynced delegates to the Config's Queue
HasSynced() bool
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion() string
}
```
实际返回对象是 `controller` 对象:
``` go
// `*controller` implements Controller
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
```
**但是这里的 Controller 并不是指用户层面的那个控制器,对照上方的架构图来说,这里的 Controller 对应于架构图中的 Informer 组件,属于图中上方 client-go 的那层。因此,我觉得代码里的 Controller 应该命名为 Informer 猜对,同样地 controller 应该命名为 informer。**
controller 的运行逻辑很简单,就是启动 reflector 然后运行 processLoop 不断地消费 DeltaFIFO 中的资源事件。
``` go
func (c *controller) Run(stopCh <-chan struct{}) {
// 创建reflector
r := NewReflectorWithOptions(/*...*/)
r.WatchListPageSize = c.config.WatchListPageSize
// ...
// 启动reflector
wg.StartWithChannel(stopCh, r.Run)
// 启动processLoop
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
```
``` go
func (c *controller) processLoop() {
for {
// 不断从DeltaFIFO中pop事件并处理
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// ...
}
}
```
处理事件的函数是 `c.config.Process`:
``` go
func newInformer(clientState Store, options InformerOptions) Controller {
// ...
cfg := &Config{
// ...
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
// 直接交给processDeltas处理
return processDeltas(options.Handler, clientState, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)
}
```
具体由 processDeltas 去处理:
``` go
func processDeltas(
handler ResourceEventHandler,
clientState Store,
deltas Deltas,
isInInitialList bool,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
// update
// 更新Indexer
if err := clientState.Update(obj); err != nil {
return err
}
// 回调ResourceEventHandler
handler.OnUpdate(old, obj)
} else {
// add
// ...
}
case Deleted:
// delete
// ...
}
}
return nil
}
```
结合之前给出的示例代码,当我们去用 `cache.NewXXXInformer` 创建一个 informer 时,它处理事件的逻辑就是简单的两个步骤:
1. 更新 Indexer
2. 回调用户注册的 ResourceEventHandler
## SharedIndexInformer
当我们一个程序里有多个地方需求监听同一种资源,如果每次都是 `cache.NewXXXInformer` 去创建新的 Informer 的话,这个动作的背后实际上在创建多个 Reflector,即创建了多条与 apiserver 的连接,但监听的实际上是同一样东西,并且每个 Informer 中都缓存了一样的东西。如此一来既增加了 apiserver 的压力,又浪费了本地的内存。
因此 client-go 在 Informer(即 Controller 接口)的基础上进一步封装,提供了 SharedInformer 接口,使得对同一种资源只需一次 ListWatch、一个缓存,就能在程序任意地方去消费事件,从每个消费者的视角来看就好像自己独占一个 Informer 一样,也就是实现了所谓的 fan-out,一次发送,多处消费。
其实要实现这个很简单,用伪代码表示大概是这样的:
``` go
// 从DeltaFIFO获取事件
item := DeltaFIFO.pop()
// 通知所有ResourceEventHandler
for _, handler := range handlers {
handler.handle(item)
}
```
在真正的代码实现上,sharedIndexInformer 运用了设计模式中非常经典的 **代理模式**:在不改动 controller 代码的前提下,sharedIndexInformer 自身实现了 ResourceEventHandler 接口,将自己提供给 controller,如此一来,之前 controller 处理事件第 2 步中的“回调用户注册的 ResourceEventHandler”就变成了“回调 sharedIndexInformer”。同时,sharedIndexInformer 对外提供 `AddEventHandler(handler ResourceEventHandler)` 方法,在内部维护这些用户添加进来的 ResourceEventHandler,将收到的事件逐一分发给这些 ResourceHandler。
代码就没必要细看了,明白这个道理就行。
另外,SharedInformer 其实就是 SharedIndexInformer,因为索引功能几乎是一定会用得上的,所以 client-go 官方并不提供有 "shared" 但没有 "index" 的实现类。
## WorkQueue
光看架构图的话,我一直不理解为什么要有 WorkQueue 这个东西,监听到事件之后直接去 processItem 不就可以了吗?但我们要知道,在 k8s 的设计理念中,控制器不是简单的事件回调,而是一个状态收敛循环,控制器会去访问 apiserver 对资源状态进行调谐,那么处理事件流时,自然就需要对事件进行去重、限流等操作,这就是 WorkQueue 所能提供的能力。
以示例代码使用到的 workqueue 为例:
``` go
workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
```
首先从名字可以看出,创建了一个具有限流功能的 workqueue,并且传入了默认的限流器。这里默认限流器其实是两个限流器的组合,限流的力度取他俩中的最大值,即只要任意一个限流器觉得该 key 应该被延迟,就延迟:
- 指数退避限流器:对于单个元素,每重复 Add 一次,下次 Get 的时长就是上次 Get 的两倍
- 令牌桶限流器:对于所有元素,采用令牌桶限流(关于令牌桶限流算法自己 chatgpt 一下)
其中,指数退避限流器的意义是控制单个 key 的重试频率,因为某个 key 重试次数越多,说明失败率有点高,让它等久一点再同时。而令牌桶限流器的意义在于避免全局事件的突发流量,比如某时刻突然有很多不同的资源事件到来,肯定也得限流一下。因此他们的侧重点不同,组合起来能更有效地去做好限流。
以上我们介绍了 WorkQueue 限流的作用,还有解耦、缓冲、去重等,其实只是这些 workqueue 顺手的事。
workqueue 看起来接口好像挺多挺乱的,可以像上面的存储相关接口一样,自己画个类图,就很容易理清了,在日后使用起来也更加得心应手。
## 总结
用 [operator](https://kubernetes.io/zh-cn/docs/concepts/extend-kubernetes/operator/) 那一套来举例:平时基于 opeartor 那一套去进行开发用户控制器进行集群资源调谐的时候,一般用 kubebuilder 去生成一些资源对象的深拷贝代码、Reconciler 的脚手架代码等,虽然这些代码看起来并没涉及 informer、indexer 那些东西,但运行起来,当集群资源对象发生变更时,确实会及时回调我们的 Reconcile 调谐方法。在了解了 informer 这套机制后,不用想就知道是 informer 在底层起作用。
实际上,kubebuilder 生成的代码里是用了 **controller-runtime** 这个库,它是对 client-go 进一步封装。这个库将 informer、workqueue、processitem 等封装起来,对外提供各种丰富的功能,让开发者更方便地去开发用户控制器。比如开发者只需要按照 controller-runtime 的规范,定义一个 Reconciler 并注册进去,把项目运行起来就能轻松对资源进行调谐了,十分省事。
## 参考