Controller-runtime 是一个用于开发 Kubernetes Controller 的库,包含了各种Controller 常用的模块,兼顾了灵活性和模块化。

一开始做 Kubernetes Controller 开发时,是使用 client-go 进行开发,中间会有很多与业务无关的重复工作。后来社区推出了 kubebuilder 和 operatorSDK 这种脚手架,它可以方便的渲染出 Controller 的整个框架,让开发者只用专注 Controller 本身的业务逻辑,特别是在开发 CRD 时,极为方便,这两个脚手架就是基于 controller-runtime

controller-runtime 就是 client-go 的高级封装。

1、主要模块

  • Client:用于读写 Kubernetes 资源

  • Cache:本地缓存,可供 Client 直接读取资源。

  • Manager:可以管理协调多个 Controller,提供 Controller 共用的依赖。

  • Controller:“组装”多个模块(例如SourceQueueReconciler),实现 Kubernetes Controller 的通用逻辑:

    • 1)监听 k8s 资源,缓存资源,并根据EventHandler入队事件;

    • 2)启动多个 goroutine,每个 goroutine 会从队列中获取 event,并调用Reconciler方法处理。

  • Reconciler:状态同步的逻辑所在,是开发者需要实现的主要接口,供Controller 调用。Reconciler 的重点在于“状态同步”,由于 Reconciler 传入的参数是资源的NamespaceName,而非 event,Reconciler 并非用于“处理事件”,而是根据指定资源的状态,来同步“预期集群状态”与“当前集群状态”。

  • Webhook:用于开发 webhook server,实现 Kubernetes Admission Webhooks 机制。

  • Source:source of event,Controller 从中获取 event。

  • EventHandler:顾名思义,event 的处理方法,决定了一个 event 是否需要入队列、如何入队列。

  • Predicate:相当于 event 的过滤器。

2、Controller 的生成与管理

2.1、manager

controller-runtime 库提供的第一个重要抽象是 Manager,它为在管理器内运行的所有控制器提供共享资源,包括:

  • 读取和写入 Kubernetes 资源的 Kubernetes 客户端

  • 用于从本地缓存中读取 Kubernetes 资源的缓存

  • 用于注册所有 Kubernetes 本地和自定义资源的 scheme

要创建管理器,你需要使用提供的 New 函数,如下所示:

import (
      "flag"
      "sigs.k8s.io/controller-runtime/pkg/client/config"
      "sigs.k8s.io/controller-runtime/pkg/manager"
)
flag.Parse()                        ❶
mgr, err := manager.New(
     config.GetConfigOrDie(),
      manager.Options{},
)

第一个参数是 rest.Config 对象,在这个例子中,选择了 controller-runtime 库提供的 GetConfigOrDie() 实用函数,而不是使用 client-go 库的函数。

GetConfigOrDie() 函数将尝试获得一个配置来连接到集群:

  • 通过获取 --kubeconfig 标志的值,如果定义了的话,并在这个路径上读取 kubeconfig 文件。为此,首先你需要执行 flag.Parse()

  • 通过获取 KUBECONFIG 环境变量的值(如果定义了的话),并读取此路径下的 kubeconfig 文件

  • 通过查看 in-cluster 配置,如果定义了的话

  • 通过读取 $HOME/.kube/config 文件

如果前面的情况都不可行,该函数将使程序退出,代码为1。第二个参数是一个用于选项的结构体。

一个重要的选项是 “Scheme"。默认情况下,如果你没有为这个选项指定任何值,将使用 Client-go 库提供的 Scheme。如果控制器只需要访问本地 Kubernetes 资源,这就足够了。然而,如果你想让控制器访问自定义资源,你将需要提供一个能够解决自定义资源的 Scheme

例如,如果你想让控制器访问自定义资源,你将需要在初始化时运行以下代码:

import (
      "k8s.io/apimachinery/pkg/runtime"
      clientgoscheme "k8s.io/client-go/kubernetes/scheme"
      mygroupv1alpha1 "github.com/myid/myresource-crd/pkg/apis/mygroup.example.com/v1alpha1"
)
scheme := runtime.NewScheme()                  ❶
clientgoscheme.AddToScheme(scheme)             ❷
mygroupv1alpha1.AddToScheme(scheme)            ❸
mgr, err := manager.New(
      config.GetConfigOrDie(),
      manager.Options{
            Scheme: scheme,                    ❹
      },
)

❶ 创建一个新的空 scheme。

❷ 使用 Client-go 库添加 Kubernetes 内置资源(k8s 内置资源都在 client-go/kubernetes 下面)。

❷ 将 mygroup/v1alpha1 的资源添加到 scheme 中,就是自定义资源。

❹ 在这个管理器中使用这个 scheme。

2.2、controller

第二个重要的抽象是 Controller。控制器负责实现特定 Kubernetes 资源的实例所给出的规范(Spec)。(在 operator 的情况下,自定义资源是由 operator 处理的)。

为此,控制器观察特定的资源,并接收这些资源的 watch 事件(即,创建、更新、删除)。当事件发生在资源上时,控制器用包含受事件影响的 “主要资源” 实例的名称和命名空间的请求填充一个队列。

请注意,入队的对象只是被 operator 监视的主要资源的实例。如果事件是由另一个资源的实例接收的,则通过跟踪所有者参考(ownerReference)找到主资源。例如,Deployment 控制器观察 Deployment 资源和 ReplicaSet 资源。所有 ReplicaSet 实例都包含一个指向 Deployment 实例的 ownerReference。

  • 当 Deployment 被创建时,控制器会收到一个 Create 事件,刚刚创建的 Deployment 实例被入队

  • 当 ReplicaSet 被修改时(例如,被一些用户修改),这个 ReplicaSet 会收到一个 Update 事件,控制器会使用 ReplicaSet 中包含的 ownerReference 找到被更新的 ReplicaSet 引用的 Deployment。然后,被引用的 Deployment 实例被入队。

控制器实现 Reconcile 方法,每当队列中出现一个 Request 时,它就会被调用。这个 Reconcile 方法接收 Request 作为参数,其中包含要调谐(Reconcile)的主要资源的名称和命名空间

此外,由于多个事件可能在短时间内发生,并与同一主要资源有关,请求可以被批量处理,以限制入队的请求数量。

2.2.1、创建 Controller

要创建控制器,你需要使用提供的 New 函数:

import (
      "sigs.k8s.io/controller-runtime/pkg/controller"
)
controller, err = controller.New(
     "my-operator", mgr,
     controller.Options{
            Reconciler: myReconciler,
})

Reconciler 调谐器选项是必需的,其值是一个实现 Reconciler 接口的对象,定义为:

type Reconciler interface {
      Reconcile(context.Context, Request) (Result, error)
}

2.2.2、监控资源

在控制器创建后,你需要向容器指出哪些资源需要观察,以及这些资源是主要资源还是被拥有的(owned)资源。

控制器上的 Watch 方法被用来添加一个 Watch。该方法定义如下:

Watch(
     src source.Source,
     eventhandler handler.EventHandler,
     predicates ...predicate.Predicate,
) error

第一个参数表示什么是要观察的事件的来源,其类型是 source.Source 接口。controller-runtime 库为 Source 接口提供了两种实现方式:

  • Kind source 用于监视特定种类(kind)的 Kubernetes 对象的事件。Kind 结构体中的 Type 字段是必需的,其值是所需类型的对象。例如,如果我们想监视 Deployment,src 参数的值将是:

    controller.Watch(
          &source.Kind{
                Type: &appsv1.Deployment{},
          },
          ...
    
  • Channel source 是用来观察来自集群外的事件的。Channel 结构体的 Source 字段是必需的,它的值是一个发射 event.GenericEvent 类型对象的通道。

第二个参数是事件处理程序,其类型是 handler.EventHandler 接口。controller-runtime 库为 EventHandler 接口提供了两种实现方式:

  • EnqueueRequestForObject 事件处理程序用于控制器处理的主要资源。在这种情况下,控制器将把连接到事件的对象放入队列中。

    controller.Watch(
          &source.Kind{
                Type: &mygroupv1alpha1.MyResource{},
          },
          &handler.EnqueueRequestForObject{},
    )
  • EnqueueRequestForOwner 事件处理程序用于由主资源拥有(owned)的资源。EnqueueRequestForOwner 的一个字段是必需的:OwnerType。这个字段的值是主资源类型的对象;控制器将跟踪 ownerReferences,直到找到这种类型的对象,并将这个对象放入队列。

    例如,如果控制器处理 MyResource 主资源,并且正在创建 Pod 来实现 MyResource,它将希望使用这个事件处理程序来观察 Pod 资源,并指定一个 MyResource 对象作为 OwnerType。如果字段 IsController 被设置为true,控制器将只考虑 Controller: true 的 ownerReferences。

    controller.Watch(
          &source.Kind{
                Type: &corev1.Pod{},
          },
          &handler.EnqueueRequestForOwner{
                OwnerType: &mygroupv1alpha1.MyResource{},
                IsController: true,
          },
    )

第三个参数是一个可选的谓词列表,其类型为 predicate.Predicate 。controller-runtime 库为 Predicate 接口提供了几种实现:

  • Funcs 是最通用的实现。Funcs 的结构体定义如下:

    type Funcs struct {
          // Create returns true if the Create event
         // should be processed
          CreateFunc func(event.CreateEvent) bool
          // Delete returns true if the Delete event
         // should be processed
          DeleteFunc func(event.DeleteEvent) bool
          // Update returns true if the Update event
         // should be processed
          UpdateFunc func(event.UpdateEvent) bool
          // Generic returns true if the Generic event
         // should be processed
          GenericFunc func(event.GenericEvent) bool
    }

    你可以把这个结构体的一个实例传递给 Watch 方法,作为 Predicate。

    未定义的字段将表示匹配类型的事件应该被处理。

    对于非 nil 字段,匹配事件的函数将被调用(注意,当源是一个通道时,GenericFunc 将被调用;见前文),如果函数返回 true,事件将被处理。

  • 使用 Predicate 的这种实现,你可以为每个事件类型定义一个特定的函数。

    func NewPredicateFuncs(
    	filter func(object client.Object) bool,
    ) Funcs

    该函数接受一个 filter 函数,并返回一个 Funcs 结构体,该过滤器被应用于所有事件。使用这个函数,你可以定义一个适用于所有事件类型的单一过滤器。

  • ResourceVersionChangedPredicate 结构体将定义一个只用于 UpdateEvent 的 filter。

    使用这个 predicate ,所有的Create、Delete 和 Generic 事件将被处理,不需要过滤,而 Update 事件将被过滤,以便只处理有 metadata.resourceVersion 变化的更新。

    每次保存资源的新版本时,metadata.resourceVersion 字段都会被更新,不管资源的变化是什么。

  • GenerationChangedPredicate 结构体定义了一个只用于更新事件过滤器。

    使用这个谓词,所有的 Create、Delete 和 Generic 事件都将被处理,无需过滤,而 Update 事件将被过滤,因此只有具有 metadata.Generation 增量的更新才会被处理。

    每次发生资源的 Spec 部分的更新时,API 服务器都会按顺序递增 metadata.Generation

    请注意,有些资源并不尊重这个假设。例如,当 Annotations 字段被更新时,Deployment 的 Generation 也会被递增。

    对于自定义资源,只有当状态子资源被启用时,生成才会被递增。

  • AnnotationChangedPredicate 结构体定义了一个只用于更新事件过滤器。

    使用这个谓词,所有的创建、删除和通用事件都将被处理,而更新事件将被过滤,因此只有元数据.注释变化的更新才会被处理。

用 controller-runtime,就可以直接用 watch 监听资源变化,自动入队了,不需要像之前 client-go 那样,用 informer 的 eventhandler,还要自己写入队操作了。

2.2.3、例子

在第一个例子中,你将创建管理器和控制器。控制器将管理主要的自定义资源 MyResource,并观察这个资源以及 Pod 资源。

Reconcile 函数将只显示 MyResource 实例的命名空间和名称来进行调节。

package main
import (
      "context"
      "fmt"
      corev1 "k8s.io/api/core/v1"
      "k8s.io/apimachinery/pkg/runtime"
      "sigs.k8s.io/controller-runtime/pkg/client/config"
      "sigs.k8s.io/controller-runtime/pkg/controller"
      "sigs.k8s.io/controller-runtime/pkg/handler"
      "sigs.k8s.io/controller-runtime/pkg/manager"
      "sigs.k8s.io/controller-runtime/pkg/reconcile"
      "sigs.k8s.io/controller-runtime/pkg/source"
      clientgoscheme "k8s.io/client-go/kubernetes/scheme"
      mygroupv1alpha1 "github.com/myid/myresource-crd/pkg/apis/mygroup.example.com/v1alpha1"
)
func main() {
      scheme := runtime.NewScheme()                     ❶
      clientgoscheme.AddToScheme(scheme)
      mygroupv1alpha1.AddToScheme(scheme)
      mgr, err := manager.New(                          ❷
            config.GetConfigOrDie(),
            manager.Options{
                  Scheme: scheme,
            },
      )
      panicIf(err)
      controller, err := controller.New(                 ❸
            "my-operator", mgr,
            controller.Options{
                  Reconciler: &MyReconciler{},
            })
      panicIf(err)
      err = controller.Watch(                             ❹
            &source.Kind{
                  Type: &mygroupv1alpha1.MyResource{},
            },
            &handler.EnqueueRequestForObject{},
      )
      panicIf(err)
      err = controller.Watch(                              ❺
            &source.Kind{
                  Type: &corev1.Pod{},
            },
            &handler.EnqueueRequestForOwner{
                  OwnerType:    &corev1.Pod{},
                  IsController: true,
            },
      )
      panicIf(err)
      err = mgr.Start(context.Background())                ❻
      panicIf(err)
}

type MyReconciler struct{}                                 ➐
func (o *MyReconciler) Reconcile(                          ➑
     ctx context.Context,
     r reconcile.Request,
) (reconcile.Result, error) {
      fmt.Printf("reconcile %v\n", r)
      return reconcile.Result{}, nil
}
// panicIf panic if err is not nil
// Please call from main only!
func panicIf(err error) {
      if err != nil {
            panic(err)
      }
}

➊ 用本地资源和自定义资源 MyResource 创建 scheme。

➋ 使用刚刚创建的 scheme 创建管理器。

➌ 创建控制器,连接到管理器,并传递 Reconciler 实现。

➍ 开始观察作为主要资源的 MyResource 实例。

➎ 开始观察 Pod 实例,作为自有(owned)资源。

➏ 启动管理器。这个函数是长期运行的,只有在发生错误时才会返回。

➐ 实现 Reconciler 接口的类型。

➑ 实现 Reconcile 方法。这将显示要 reconcile 的实例的名称空间和名称(reconcile.Request 就是请求资源的命名空间和名称)。

当资源变化时,就会自动从队列里拿出来,以 Request 形式传入,不需要像client-go一样手动从队列里拿 key 了。

2.2.4、Controller Builder

Controller-runtime 库提出了一个 Controller Builder,使控制器的创建更加简洁。

import (
     "sigs.k8s.io/controller-runtime/pkg/builder"
)
func ControllerManagedBy(m manager.Manager) *Builder

ControllerManagedBy 函数被用来启动一个新的 ControllerBuilder。构建的控制器将被添加到 m manager 中。一个流畅的接口帮助配置构建:

  • For(object client.Object, opts ...ForOption) *Builder - 该方法用于指示控制器处理的主要资源。它只能被调用一次,因为一个控制器只能有一个主要资源。这将在内部调用 Watch 函数与事件处理程序 EnqueueRequestForObject

    可以用 WithPredicates 函数为这个 watch 添加谓词(Predicates),其结果实现了 ForOption 接口。

  • Owns(object client.Object, opts ...OwnsOption) *Builder - 这个方法用来表示控制器拥有(owned)的资源。这将在内部调用 Watch 函数与事件处理程序 EnqueueRequestForOwner

    可以用 WithPredicates 函数为这个Watch添加谓词,该函数的结果实现了OwnsOption接口。

  • Watches(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder - 这个方法可以用来添加更多For或Owns方法没有涵盖的 watcher –例如,具有 Channel source 的观察者。

    可以用 WithPredicates 函数为该 watch 添加谓词,该函数的结果实现了 WatchesOption 接口。

  • WithEventFilter(p predicate.Predicate) *Builder - 这个方法可以用来添加所有用 For、Owns 和 Watch 方法创建的观察者共有的谓词。

  • WithOptions(options controller.Options) *Builder - 此处设置将在内部传递给 controller.New 函数的选项。

  • WithLogConstructor(- 这设置了 logConstructor 选项。

    func(*reconcile.Request) logr.Logger,
    
    ) *Builder
    
  • Named(name string) *Builder - 这设置了构造函数的名称。它应该只使用下划线和字母数字字符。默认情况下,它是主要资源的 kind 的小写版本。

  • build() 构建并返回控制器。

    Build( 
    	r reconcile.Reconciler,
    ) (controller.Controller, error)
    
  • Complete(r reconcile.Reconciler) error –这就建立了控制器。你一般不需要直接访问控制器,所以你可以使用这个不返回控制器值的方法,而不是Build。

2.2.5、controller builder 的例子

在这个例子中,你将使用 ControllerBuilder 构建控制器,而不是使用 Controller.New 函数和控制器上的 Watch 方法。

package main
import (
      "context"
      "fmt"
      corev1 "k8s.io/api/core/v1"
      "k8s.io/apimachinery/pkg/runtime"
      clientgoscheme "k8s.io/client-go/kubernetes/scheme"
      "sigs.k8s.io/controller-runtime/pkg/builder"
      "sigs.k8s.io/controller-runtime/pkg/client"
      "sigs.k8s.io/controller-runtime/pkg/client/config"
      "sigs.k8s.io/controller-runtime/pkg/manager"
      "sigs.k8s.io/controller-runtime/pkg/reconcile"
      mygroupv1alpha1 "github.com/feloy/myresource-crd/pkg/apis/mygroup.example.com/v1alpha1"
)
func main() {
      scheme := runtime.NewScheme()
      clientgoscheme.AddToScheme(scheme)
      mygroupv1alpha1.AddToScheme(scheme)
      mgr, err := manager.New(
            config.GetConfigOrDie(),
            manager.Options{
                  Scheme: scheme,
},
      )
      panicIf(err)
      err = builder.
            ControllerManagedBy(mgr).
            For(&mygroupv1alpha1.MyResource{}).
            Owns(&corev1.Pod{}).
            Complete(&MyReconciler{})
      panicIf(err)
      err = mgr.Start(context.Background())
      panicIf(err)
}
type MyReconciler struct {}
func (a *MyReconciler) Reconcile(
      ctx context.Context,
      req reconcile.Request,
) (reconcile.Result, error) {
      fmt.Printf("reconcile %v\n", req)
      return reconcile.Result{}, nil
}
func panicIf(err error) {
      if err != nil {
            panic(err)
      }
}

2.3、将 manager 资源注入 Reconciler 中

管理器为控制器提供共享资源,包括读取和写入 Kubernetes 资源的客户端、从本地缓存中读取资源的缓存以及解析资源的方案。Reconcile 函数需要访问这些共享资源。有两种方法来共享它们:

2.3.1、创建 Reconciler 结构体时传递数值

当控制器被创建时,你正在传递一个 Reconcile 结构体的实例,实现 Reconciler 接口:

type MyReconciler struct {}
err = builder.
            ControllerManagedBy(mgr).
            For(&mygroupv1alpha1.MyResource{}).
            Owns(&corev1.Pod{}).
            Complete(&MyReconciler{})

在这之前,管理器已经被创建,你可以使用管理器上的 Getters 来访问共享资源。

作为例子,下面是如何从新创建的管理器中获取客户端、缓存和 schema :

mgr, err := manager.New(
      config.GetConfigOrDie(),
      manager.Options{
            manager.Options{
                  Scheme: scheme,
          },
     },
)
// handle err
mgrClient := mgr.GetClient()
mgrCache := mgr.GetCache()
mgrScheme := mgr.GetScheme()

你可以在 Reconciler 结构体中添加字段来传递这些值:

type MyReconciler struct {
     client client.Client
     cache cache.Cache
     scheme *runtime.Scheme
}

最后,你可以在创建控制器的过程中传递这些值:

err = builder.
            ControllerManagedBy(mgr).
            For(&mygroupv1alpha1.MyResource{}).
            Owns(&corev1.Pod{}).
            Complete(&MyReconciler{
      client: mgr.GetClient(),
      cache: mgr.GetCache(),
      scheme: mgr.GetScheme(),
})

2.3.2、使用 Injector

controller-runtime 库提供了一个注入器(Injector)系统,用于将共享资源注入 Reconcilers,以及其他结构体,如你自己实现的 Sources、EventHandlers 和 Predicates。

Reconciler 的实现需要实现 inject 包中的特定 Injector 接口:inject.Clientinject.Cacheinject.Scheme,等等。

这些方法将在初始化时被调用,当你调用 controller.Newbuilder.Complete。为此,需要为每个接口创建一个方法,例如:

type MyReconciler struct {
     client client.Client
     cache cache.Cache
     scheme *runtime.Scheme
}
func (a *MyReconciler) InjectClient(c client.Client) error {
      a.client = c
      return nil
}
func (a *MyReconciler) InjectCache(c cache.Cache) error {
      a.cache = c
      return nil
}
func (a *MyReconciler) InjectScheme(s *runtime.Scheme) error {
      a.scheme = s
      return nil
}

3、使用客户端

客户端可以用来读取和写入集群上的资源,并更新资源的状态。

Read 方法在内部使用一个基于 Informers 和 Listers 的 Cache 系统,以限制对 API 服务器的读取访问。使用这个缓存,同一个管理器的所有控制器都有对资源的读取权限,同时限制对 API 服务器的请求。

必须注意:

由读操作返回的对象是指向缓存的值的指针。你决不能直接修改这些对象。相反,你必须在修改这些对象之前创建一个返回对象的深度拷贝。

3.1、获取资源信息

Get 方法用来获取资源的信息。

Get(
     ctx context.Context,
     key ObjectKey,
     obj Object,
     opts ...GetOption,
) error

它需要一个 ObjectKey 值作为参数,表示资源的名称空间和名称,以及一个Object,表示要获得的资源的类型,并存储结果。Object 必须是一个指向类型资源的指针-例如,一个 Pod 或 MyResource 结构体。ObjectKey 类型是 type.NamespacedName 的别名,定义在 API Machinery 库中。

NamespacedName 也是嵌入在作为参数传递给 Reconcile 函数的 Request 中的对象的类型。你可以直接将 req.NamespacedName 作为 ObjectKey 传递,以获得要调和(reconcile)的资源。例如,使用下面的方法来获取要调和(reconcile)的资源:

myresource := mygroupv1alpha1.MyResource{}
err := a.client.Get(
      ctx,
      req.NamespacedName,
      &myresource,
)

可以向 Get 请求传递一个特定的 resourceVersion 值,传递一个 client.GetOptions 结构体实例作为最后一个参数。

GetOptions 结构体实现了 GetOption 接口,包含一个具有 metav1.GetOptions 值的单一Raw字段。例如,指定一个值为 “0 " 的 resourceVersion 来获取资源的任何版本:

err := a.client.Get(
      ctx,
      req.NamespacedName,
      &myresource,
      &client.GetOptions{
            Raw: &metav1.GetOptions{
                  ResourceVersion: "0",
            },
      },
)

3.2、列出资源

List 方法用于列出特定种类的资源:

List(
     ctx context.Context,
     list ObjectList,
     opts ...ListOption,
) error

list 参数是一个 ObjectList 值,表示要列出并存储结果的资源的种类(kind)。默认情况下,list 是在所有命名空间中进行的。

List 方法接受实现 ListOption 接口的对象的零个或多个参数。这些类型由以下支持:

  • InNamespace,string 的别名,用于返回特定命名空间的资源。

  • MatchingLabelsmap[string]string 的别名,用来表示标签的列表和它们的精确值,这些标签必须被定义为资源的返回。下面的例子建立了一个MatchingLabels 结构体来过滤带有标签 “app=myapp” 的资源。

    matchLabel := client.MatchingLabels{
          "app": "myapp",
    }
  • HasLabels,别名为 []string,用来表示标签的列表,独立于它们的值,必须为一个资源的返回而定义。下面的例子建立了一个 HasLabels 结构体来过滤带有 “app” 和 “debug” 标签的资源。

    hasLabels := client.HasLabels{"app", “debug”}
  • MatchingLabelsSelector,嵌入了一个 labels.Selector 接口,用来传递更高级的标签选择器。关于如何建立一个选择器的更多信息,请参见第6章的 “过滤列表结果” 部分。下面的例子建立了一个 MatchingLabelsSelector 结构,它可以作为 List 的一个选项来过滤标签 mykey 不同于 ignore 的资源。

    selector := labels.NewSelector()
    require, err := labels.NewRequirement(
        "mykey",
        selection.NotEquals,
        []string{"ignore"},
    )
    // assert err is nil
    selector = selector.Add(*require)
    labSelOption := client.MatchingLabelsSelector{
          Selector: selector,
    }
  • MatchingFields 是 fields.Set 的别名,它本身是 map[string]string 的别名,用来指示要匹配的字段和它们的值。下面的例子建立了一个 MatchingFields 结构体,用来过滤字段 “status.phase” 为 “Running” 的资源:

    matchFields := client.MatchingFields{
          "status.phase": "Running",
    }
  • MatchingFieldsSelector,嵌入了一个 fields.Selector,用来传递更高级的字段选择器。下面的例子建立了一个 MatchingFieldsSelector 结构体来过滤字段 “status.phase” 与 “Running” 不同的资源:

    fieldSel := fields.OneTermNotEqualSelector(
          "status.phase",
          "Running",
    )
    fieldSelector := client.MatchingFieldsSelector{
          Selector: fieldSel,
    }
  • Limit(别名 int64)和Continue(别名 string)用于对结果进行分页。

3.3、创建资源

Create 方法用来在集群中创建一个新的资源。

Create(
     ctx context.Context,
     obj Object,
     opts ...CreateOption,
) error

作为参数传递的 obj 定义了要创建的对象的种类(kind),以及它的定义。下面的例子将在集群中创建一个 Pod:

podToCreate := corev1.Pod{ [...] }
podToCreate.SetName("nginx")
podToCreate.SetNamespace("default")
err = a.client.Create(ctx, &podToCreate)

以下选项可以作为 CreateOption 传递,以使 Create 请求参数化。

  • DryRunAll 值表示所有的操作都应该被执行,除了那些将资源持久化到存储的操作。

  • FieldOwner,别名为字符串,表示创建操作的字段管理器的名称。这个信息对于服务器端应用操作的正常工作很有用。

3.4、删除资源

删除方法是用来从集群中删除资源。

Delete(
     ctx context.Context,
     obj Object, k
     opts ...DeleteOption,
) error

作为参数传递的 obj 定义了要删除的对象的种类(kind),以及它的命名空间(如果资源有命名空间)和它的名字。下面的例子可以用来删除 Pod。

podToDelete := corev1.Pod{}
podToDelete.SetName("nginx")
podToDelete.SetNamespace("prj2")
err = a.client.Delete(ctx, &podToDelete)

以下选项可以作为 DeleteOption 被传递,以便对 Delete 请求进行参数化。

  • DryRunAll - 这个值表示所有的操作都应该被执行,除了那些将资源持久化到存储的操作。

  • GracePeriodSeconds, alias to int64 - 这个值只在删除 pod 时有用。它表示在删除 pod 之前的持续时间(秒)。

  • Preconditions / 前提条件,别名 metav1.Preconditions - 这表明你期望删除的资源。

  • PropagationPolicy,别名 metav1.DeletionPropagation - 这表明是否以及如何进行垃圾回收。

4、编写 reconcile 调谐

Reconcile 函数包含 Operator 的所有业务逻辑。该函数将在一个单一的资源种类上工作 – 或者说 Operator 调和这个资源 – 并且可以在其他类型的对象触发事件时得到通知,通过使用所有者引用(Owner References)将这些其他类型映射到调和的对象上。

Reconcile 函数的作用是确保系统的状态与要 Reconcile 的资源中指定的内容相匹配。为此,它将创建 “低级” 资源来实现要调和的资源。这些资源反过来将由其他控制器或 Operator 进行调和。当这些资源的调和完成后,它们的状态将被更新以反映它们的新状态。此外,Operator 将能够检测到这些变化,并相应地调整被调和资源的状态。

Reconcile 函数从队列中接收一个要调谐的资源。第一个要做的操作是获得关于这个资源的信息来进行调谐。

事实上,Request只有资源的命名空间和名称(它的种类/kind 是由它的设计知道的,是由 operator 调和的种类),但你并没有收到资源的完整定义。

Get 操作被用来获取资源的定义。

该资源可能由于各种原因被入队:它被创建、修改或删除(或者另一个拥有(owned)的资源被创建、修改或删除)。在前两种情况下(创建或修改),Get 操作将成功,Reconcile 函数将知道这时资源的定义。在删除的情况下,获取操作将以 Notfound 错误失败,因为该资源现在已经被删除了。

Operator 的良好做法是,在调和资源时为其创建的资源添加 OwnerReferences。主要目的是当这些创建的资源被修改时,能够调和其所有者,而添加这些 OwnerReferences 的结果是,当所有者资源被删除时,这些拥有(owned)的资源将被 Kubernetes 垃圾收集器删除。

出于这个原因,当一个被调和的对象被删除时,集群中没有什么可做的,因为相关的创建资源的删除将由集群处理。

如果在集群中找到了要调和的资源,operator’s 的下一步就是创建 “低级” 资源来实现这个要调和的资源。

因为 Kubernetes 是一个声明式平台,创建这些资源的好方法是声明低级资源应该是什么,与集群中存在或不存在什么无关,并依靠 Kubernetes 控制器来接管和调和这些低级资源。

由于这个原因,不可能盲目地使用 Create 方法,因为我们不确定资源是否存在,如果资源已经存在,操作就会失败。

你可以检查资源是否存在,如果不存在则创建,如果存在则修改。正如前几章所示,服务器端应用方法非常适合这种情况:在运行 Apply 操作时,如果资源不存在,就会被创建;如果资源存在,就会被修补,在资源被其他参与者修改的情况下解决冲突。

使用服务器端的 apply 方法,Operator 不需要检查资源是否存在,或是否被另一个参与者修改过。Operator 只需要从 Operator 的角度,用资源的定义运行服务器端 Apply 操作。

在应用低级别的资源后,应该考虑两种可能性。

  • 情况1:如果低层资源已经存在,并且没有被 Apply 修改,那么将不会为这些资源触发 MODIFIED 事件,并且 Reconcile 函数将不会被再次调用(至少对于这个 Apply 操作)。

  • 情况2:如果低层资源被创建或修改,这将触发这些资源的 CREATED 或 MODIFIED 事件,并且 Reconcile 函数将因为这些事件而被再次调用。这个函数的新执行将再次应用低层资源,如果在此期间没有更新这些资源,Operator 将落入情况1。

新的底层资源最终将由其各自的 Operator 或控制器处理。反过来,他们将调和这些资源,并更新它们的状态,宣布它们的当前状态。

一旦这些低层资源的状态被更新,这些资源的 MODIFIED 事件将被触发,Reconcile 函数将被再次调用。再一次,Operator 将应用这些资源,案例1和2必须被考虑。

Operator 在某些时候需要读取它所创建的低级资源的状态,以便计算出调和后的资源的状态。在简单的情况下,这可以在执行低级资源的服务器端应用后完成。

为了说明这一点,这里有一个完整的 Reconcile 函数,该函数适用于 Operator,该 Operator 用 MyResource 实例中提供的镜像和内存信息创建 Deployment。

func (a *MyReconciler) Reconcile(
  ctx context.Context,
  req reconcile.Request,
) (reconcile.Result, error) {
  log := log.FromContext(ctx)
  log.Info("getting myresource instance")
  myresource := mygroupv1alpha1.MyResource{}
  err := a.client.Get(                                     ❶
    ctx,
    req.NamespacedName,
    &myresource,
    &client.GetOptions{},
  )
  if err != nil {
    if errors.IsNotFound(err) {                            ❷
      log.Info("resource is not found")
      return reconcile.Result{}, nil
    }
    return reconcile.Result{}, err
  }
  ownerReference := metav1.NewControllerRef(               ❸
    &myresource,
    mygroupv1alpha1.SchemeGroupVersion.
          WithKind("MyResource"),
  )
  err = a.applyDeployment(                                 ❹
     ctx,
     &myresource,
     ownerReference,
  )
  if err != nil {
    return reconcile.Result{}, err
  }
  status, err := a.computeStatus(ctx, &myresource)          ❺
  if err != nil {
    return reconcile.Result{}, err
  }
  myresource.Status = *status
  log.Info("updating status", "state", status.State)
  err = a.client.Status().Update(ctx, &myresource)           ❻
  if err != nil {
    return reconcile.Result{}, err
  }
  return reconcile.Result{}, nil
}

❶ 获取要调和的资源的定义

❷ 如果资源不存在,立即返回

❸ 建立指向要调和的资源的 ownerReference

❹ 使用服务器端应用于 “低层次” deployment

❺ 根据 “低级别” deployment 计算资源的状态

❻ 更新资源的状态以进行调和

下面是一个为 operator 创建的 deployment 实现服务器端 Apply 操作的例子:

func (a *MyReconciler) applyDeployment(
  ctx context.Context,
  myres *mygroupv1alpha1.MyResource,
  ownerref *metav1.OwnerReference,
) error {
  deploy := createDeployment(myres, ownerref)
  err := a.client.Patch(                                   ❼
    ctx,
    deploy,
    client.Apply,
    client.FieldOwner(Name),
    client.ForceOwnership,
  )
  return err
}

func createDeployment(
  myres *mygroupv1alpha1.MyResource,
  ownerref *metav1.OwnerReference,
) *appsv1.Deployment {
  deploy := &appsv1.Deployment{
    ObjectMeta: metav1.ObjectMeta{
      Labels: map[string]string{
        "myresource": myres.GetName(),
      },
    },
    Spec: appsv1.DeploymentSpec{
      Selector: &metav1.LabelSelector{
        MatchLabels: map[string]string{
          "myresource": myres.GetName(),
        },
      },
      Template: corev1.PodTemplateSpec{
        ObjectMeta: metav1.ObjectMeta{
          Labels: map[string]string{
            "myresource": myres.GetName(),
          },
        },
        Spec: corev1.PodSpec{
          Containers: []corev1.Container{
            {
              Name:  "main",
              Image: myres.Spec.Image,                    ❽
              Resources: corev1.ResourceRequirements{
                Requests: corev1.ResourceList{
                  corev1.ResourceMemory:      myres.Spec.Memory,          ❾
                },
              },
            },
          },
        },
      },
    },
  }
  deploy.SetName(myres.GetName() + "-deployment")
  deploy.SetNamespace(myres.GetNamespace())
  deploy.SetGroupVersionKind(
    appsv1.SchemeGroupVersion.WithKind("Deployment"),
  )
  deploy.SetOwnerReferences([]metav1.OwnerReference{     ❿
    *ownerref,
  })
  return deploy
}

❼ 使用补丁方法执行服务器端的应用操作

❾ 使用资源中定义的镜像来进行调和

❾ 使用资源中定义的内存来进行调和

❿ 设置 OwnerReference 指向要调和的资源

然后,下面是一个如何实现状态的计算和更新的例子:

const (
  _buildingState = "Building"
  _readyState    = "Ready"
)
func (a *MyReconciler) computeStatus(
  ctx context.Context,
  myres *mygroupv1alpha1.MyResource,
) (*mygroupv1alpha1.MyResourceStatus, error) {
  logger := log.FromContext(ctx)
  result := mygroupv1alpha1.MyResourceStatus{
    State: _buildingState,
  }
  deployList := appsv1.DeploymentList{}
  err := a.client.List(                                   ⓫
    ctx,
    &deployList,
    client.InNamespace(myres.GetNamespace()),
    client.MatchingLabels{
      "myresource": myres.GetName(),
    },
  )
  if err != nil {
    return nil, err
  }
  if len(deployList.Items) == 0 {
    logger.Info("no deployment found")
    return &result, nil
  }
  if len(deployList.Items) > 1 {
    logger.Info(
            "too many deployments found", "count",
      len(deployList.Items),
)
    return nil, fmt.Errorf(
               "%d deployment found, expected 1",
      len(deployList.Items),
)
  }
  status := deployList.Items[0].Status                     ⓬
  logger.Info(
          "got deployment status",
          "status", status,
)
  if status.ReadyReplicas == 1 {
    result.State = _readyState                              ⓭
  }
  return &result, nil
}

⓫ 获取为该资源创建的 deployment 以进行调谐

⓬ 获取找到的唯一 deployment 的状态

⓭ 当 replicas 为1时,为调和的资源设置就绪状态