kubernetes源码剖析之client-go(二) Informer机制
2021-01-09 00:30
标签:基本操作 pts for wan 处理 incr 变更 now() current ? Kubernetes通过informer机制,实现在不依赖任何中间件的情况下保证消息的实时性、可靠性、顺序性。其他Kubernetes组件通过client-go的informer机制与Api Server进行通信。Informer的核心组件包括: Informer对Kubernetes的Api Server资源进行监控(Watch)操作。其中最核心的功能是Reflector,Reflector用于监控指定的Kubernetes资源,当监控的资源发生变化时,触发相应的变更事件。并将其资源对象存放到本地缓冲DeltaFIFO中。 通过NewReflector方法实例化Reflector对象,方法必须传入ListerWatcher数据接口对象。 ListerWatcher拥有List和Watch方法,用于获取和监控资源列表,只要实现了List和Watch方法的对象都可以成为ListerWatcher。 Reflector通过Run函数启动监控进程,并处理监控的事件。其中最主要的是ListAndWatch函数,它负责List和Watch指定的Kubernetes Api Server资源。 ListAndWatch第一次运行时,通过List获取资源下的所有对象和版本信息,后续通过版本进行watch DeltaFIFO可以分开理解为FIFO和Delta。 FIFO是一个先进先出队列,拥有队列的基本操作方法。Delta是资源对象存储,可以报错资源对象的操作类型。DeltaFIFO队列中,Reflector是生长泽,controller是消费者。DeltaFIFO结构如下: DeltaFIFO队列中的资源对象在Added、Updated、Delete等事件被调用时都调用了queueActionLocked方法,它是DeltaFIFO实现的关键。 kubernetes源码剖析之client-go(二) Informer机制 标签:基本操作 pts for wan 处理 incr 变更 now() current 原文地址:https://blog.51cto.com/cubix/2523029
?用于监控(Watch)指定Kubernetes资源
? Delta的先进先出队列,Reflector为生产者,Controller为消费者
?自带索引功能的本地存储,用于存储资源对象Infermers
运行原理
代码示例
package main
import (
"log"
"time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "D:\\coding\\config")
if err != nil {
panic(err)
}
// Imformer通过clientset与Api Server通信
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 创建stopCH对象,用于进程退出前通知Imformer提前退出
stopCh := make(chan struct{})
defer close(stopCh)
// 实例化SharedInformer对象,参数clientset用于与Api Server交互, time.Minute设定resync周期,0为禁用resync
// 通过map共享Informer( informers map[reflect.Type]cache.SharedIndexInformer ),避免同一资源的Informer被重复实例化
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
// 获取Pod资源的informer对象
// 每一个K8S资源都会实现Informer机制,每个Informer实现都会提供Informer和Lister方法
informer := sharedInformers.Core().V1().Pods().Informer()
// 添加资源的回调方法
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// 创建资源时的回调方法
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New Pod Added to Stroe: %s", mObj.GetName())
},
// 更新资源时的回调方法
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
},
// 删除资源时的回调方法
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("Pod Deleted from Stroe : %s", mObj.GetName())
},
})
informer.Run(stopCh)
}
Reflector
#源码路径 vender\k8s.io\client-go\tools\cache\reflector.go
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server‘s contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
#源码路径 vender\k8s.io\client-go\tools\cache\reflector.go
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh
ListAndWatch函数
#源码路径 vender\k8s.io\client-go\tools\cache\reflector.go
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn‘t even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh
DeltaFIFO
#源码路径 vendor\k8s.io\client-go\tools\cache\delta_fifo.go
type DeltaFIFO struct {
// lock/cond protects access to ‘items‘ and ‘queue‘.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
...
}
生产者方法
#源码路径 vendor\k8s.io\client-go\tools\cache\delta_fifo.go
## Add、Update方法
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn‘t already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
// 执行前先进行加锁
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
## queueActionLockedf方法
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 计算出资源对象的key
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 将actionType和资源对象构造成dELTA,添加到items中,并通过dedupDeltas中去重。
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
// 通过cond.Broadcast通知所有消费者接触阻塞。
f.cond.Broadcast()
} else {
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
}
return nil
}
消费者
#源码路径 vendor\k8s.io\client-go\tools\cache\delta_fifo.go
// Pop blocks until an item is added to the queue, and then returns it. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don‘t successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a ‘Deltas‘, which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
// 当队列中没有数据时,通过f.cond.wait阻塞等待数据。只有接收到cond.Broadcast时才说明有数据被添加,接触当前阻塞状态。
f.cond.Wait()
}
// 如果队列不为空,去除队列头部的数据。
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
// 将数据传入process回调函数,由上层消费者进行处理
err := process(item)
if e, ok := err.(ErrRequeue); ok {
// 如果回调出错,则将数据重新添加回队列中
f.addIfNotPresent(id, item)
err = e.Err
}
// Don‘t need to copyDeltas here, because we‘re transferring
// ownership to the caller.
return item, err
}
}
文章标题:kubernetes源码剖析之client-go(二) Informer机制
文章链接:http://soscw.com/essay/41049.html