雲計算

圖解 K8S 源碼 – Informer 篇

前言

眾所周知,在 Kubernetes 中各組件是通過 HTTP 協議進行通信的,而組件間的通信也並沒有依賴任何中間件,那麼如何保證消息的實時性、可靠性、順序性呢?Informer 機制很好的解決了這個問題。Kubernetes 中各組件與 API Server 的通信都是通過 client-go 的 informer 機制來保證和完成的。

控制器模式

控制器模式最核心的就是控制循環的概念。而 Informer 機制,也就是控制循環中負責觀察系統的傳感器(Sensor)主要由 Reflector、Informer、Indexer 三個組件構成。其與各種資源的 Controller 相配合,就可以完成完整的控制循環,不斷的使系統向終態趨近 status -> spec

informer 機制

Informer

所謂 informer,其實就是一個帶有本地緩存和索引機制的,可以註冊 EventHandler 的 client,目的是為了減輕頻繁通信 API Server 的壓力而抽取出來的一層 cache,客戶端對 API Server 數據的讀取監測操作都通過本地的 informer 來進行。

每一個 Kubernetes 資源上都實現了 informer 機制,每一個 informer 上都會實現 Informer()Lister() 方法:

// client-go/informers/core/v1/pod.go
type PodInformer interface {
  Informer() cache.SharedIndexInformer
  Lister() v1.PodLister
}

定義不同資源的 Informer,允許監控不同資源事件。同時為了避免同一資源的 Informer 被實例化多次,而每個 Informer 都會使用一個 Reflector,這樣會運行過多相同的 ListAndWatch,從而加重 API Server 的壓力,Informer 還提供了共享機制,多個 Informer 可以共享一個 Reflector,從而達到節約資源的目的。

// client-go/informers/factory.go
type sharedInformerFactory struct {
  client           kubernetes.Interface
  namespace        string
  tweakListOptions internalinterfaces.TweakListOptionsFunc
  lock             sync.Mutex
  defaultResync    time.Duration
  customResync     map[reflect.Type]time.Duration

  informers map[reflect.Type]cache.SharedIndexInformer
  // startedInformers is used for tracking which informers have been started.
  // This allows Start() to be called multiple times safely.
  startedInformers map[reflect.Type]bool
}
...
// InternalInformerFor returns the SharedIndexInformer for obj using an internal client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
  f.lock.Lock()
  defer f.lock.Unlock()

  informerType := reflect.TypeOf(obj)
  informer, exists := f.informers[informerType]
  if exists {
    return informer
  }

  resyncPeriod, exists := f.customResync[informerType]
  if !exists {
    resyncPeriod = f.defaultResync
  }

  informer = newFunc(f.client, resyncPeriod)
  f.informers[informerType] = informer

  return informer
}

使用 map 數據結構實現共享 Informer 機制,在 InformerFor() 函數添加了不同資源的 Informer,在添加過程中如果已經存在同類型的 Informer,則返回當前 Informer,不再繼續添加。如下就是 deploymentInformer() 方法,其中就調用了 InformerFor() 函數。

// client-go/informers/apps/v1beta1/deployment.go
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
  return f.factory.InformerFor(&appsv1beta1.Deployment{}, f.defaultInformer)
}

Reflector

Reflector 用於監測制定 Kubernetes 資源,當資源發生變化時,觸發相應的事件,如:Added(資源添加)事件、Update(資源更新)事件、Delete(資源刪除)事件,並將事件及資源名稱添加到 DeltaFIFO 中。

ListAndWatch

在實例化 Reflector 時,必須傳入 ListerWatcher 接口對象,其擁有 List()Watch() 方法。Reflector 通過 Run() 方法啟動監控並處理事件。在程序第一次運行時,會執行 List() 方法將所有的對象數據存入 DeltaFIFO 中,每次 Controller 重啟,都會執行 List() 方法;同時,Reflector 實例中還有 resyncPeriod 參數,如果該參數不為 0,則會根據該參數值週期性的執行 List() 操作,此時這些資源對象會被設置為 Sync 操作類型(不同於 Add、Update 等)。

Watch() 則會根據 Reflector 實例 period 參數,週期性的監控資源對象是否有變更。如果發生變更,則通過 r.watchHandler 處理變更事件。

Reflector

DeltaFIFO

DeltaFIFO 顧名思義,Delta 是一個資源對象存儲,可以保持操作類型(Add、Update、Delete、Sync等);而 FIFO 則是一個先進先出的隊列。其是一個生產者與消費者的隊列,其中 Reflector 是生產者,消費者則調用 Pop() 方法取出最早進入隊列的對象數據。

Indexer

Indexer 是 client-go 用來存儲資源對象並自帶索引功能的本地存儲,Reflector 從 DeltaFIFO 中將消費出來的資源對象存儲至 Indexer。同時 Indexer 中的數據與 Etcd 中的數據保持完全一致。client-go 可以很方便的從本次存儲中讀取相應的資源對象數據,而無需每次都從遠程 Etcd 集群中讀取,從而降低了 API Server 和 Etcd 集群的壓力。

結語

要了解 Kubernetes,Informer 是繞不過的內容,其在 Kubernetes 中非常重要。本文主要圖解了 Informer 機制以及 Reflector,由於篇幅有限,DeltaFIFO,Indexer 等概念只做了簡單介紹,這些內容會在後續的文章中進行詳解,敬請期待。

歡迎掃描二維碼關注公眾號,瞭解更多雲原生知識

Leave a Reply

Your email address will not be published. Required fields are marked *