client-go 是一个使用 go 语言调用 kubernetes 集群资源对象 API 的客户端,即通过 client-go 实现与 k8s apiserver 的交互(包括deployment、service、ingress、replicaSet、pod、namespace、node 等)的增删改查等操作。大部分对 kubernetes 进行前置 API 封装的二次开发都通过 client-go 这个第三方包来实现。

1、client-go架构

github地址:https://github.com/kubernetes/client-go

  • discovery:用于发现 API Server 都是支持哪些 API。

  • dynamic:包含了 kubernetes dynamic client 的逻辑,可以操作任意的 k8s 资源 API 对象,包括内置的、自定义的资源对象

  • informers:包含了所有内置资源的 informer,便于操作k8s的资源对象

  • kubernetes:包含了访问 Kubernetes API 的 所有 ClientSet

  • listers:包含了所有内置资源的 lister,用于读取缓存中 k8s 资源对象的信息

  • plugin/pkg/client/auth:包含所有可选的认证插件,用于从外部获取 credential(凭证)

  • tools:包含一系列工具,编写控制器时会用到很多里面的工具方法

  • transport:包含了创建连接、认证的逻辑,会被上层的 ClientSet 使用

下面是一个官方图形表示,展示了 client-go 库中的各种组件如何工作,以及它们与将要编写的自定义控制器代码的交互点:

整体流程简介:

  • Reflector 会持续监听 k8s 集群中指定资源类型的 API,当发现变动和更新时,就会创建一个发生变动的对象副本,并将其添加到队列 DeltaFIFO 中

  • Informer 监听 DeltaFIFO 队列,取出对象,做两件事:

  • (1)将对象加入 Indexer,Indexer 会将 [对象key, 对象] 存入一个线程安全的 Cache 中

  • (2)根据对象的资源类型和操作,找到对应 Controller 预先提供的 Resource Event Handler,调用 Handler,将对象的 Key 加入该 Controller 的 Workqueue

  • Controller 的循环函数 ProcessItem,监听到 Workqueue 有数据了,就会取出一个 key,交给处理函数 Worker,Worker 会根据 Key,使用 Indexer reference 从 Cache 中 获取该 key 对应的真实对象。然后就可以进行调谐了。

注意点:

  • DeltaFIFO 中存的是对象副本

  • Cache 中存的是 [对象key, 对象] 的映射

  • Workqueue 中存的是对象 Key

  • CRDController 中,使用 Informer 对象,是为了向其中添加一些 Resource Event Handlers

  • CRDController 中,使用 Indexer 对象,是为了根据对象 Key,获取对象实例

client-go组件:

  • Reflector

reflector 会一直监听 kubernetes 中指定资源类型的 API,实现监听的函数就是 ListAndWatch。这种监听机制既适用于 k8s 的内建资源,也适用于自定义资源。

当 reflector 通过监听 API 发现资源对象实例存在新的 notification 时,它就会使用 listing API 获取这个新的实例对象,并将其放入 watchHandler 函数内的 DeltaFIFO 中;

  • Informer

Informer 会从 Delta FIFO 中取出对象。实现这个功能的方法对应源码中的 processLoop;

Informer 取出对象后,根据 Resource 类型,调用对应的 Resource Event Handler 回调函数,该函数实际上由某个具体的 Controller 提供,函数中会获取对象的 key,并将 key 放入到该Controller 内部的 Workqueue 中,等候处理。

  • Indexer 和 Thread Safe Store

Indexer 会提供对象的索引功能,通常是基于对象 Key 来创建索引。

Indexer 维护着一个线程安全的 Cache,即 Thread Safe Store。存储的是[对象key, 对象],用对象 Key 可以进行获取对象实例。

  • Resource Event Handlers reference

这实际上是所有 Controller 的 Resource Event Handlers 的引用。

这些 handlers 由具体的Controller提供,就是 Informer 的回调函数。Informer 会根据资源的类型,调用对应 Controller 的 handler 方法

handler 通常都是用于将资源对象的 key 放入到该Controller 内部的 Workqueue 中,等候处理。

自定义控制器组件:

  • Informer reference

Informer reference 是 Informer 实例对象的引用,用于操作和处理自定义资源对象

我们编写自定义控制器时,需要引用自己需要的 Informer,向其中加入一系列 Resource Event Handlers

  • Indexer reference

Indexer reference 是 Indexer 实例对象的引用,用于根据对象 Key 索引资源对象

我们编写自定义控制器时,应该创建 Indexer 的引用,将对象 Key 传给它,就可以获取想要处理的对象

  • Resource Event Handlers

由具体的 Controller 给 Client-go 的 Informer 提供的回调函数,获取待处理对象的 key,并将 key 放入到 Workqueue 中。

  • Workqueue

此队列是 具体的Controller 内部创建的队列,用于暂时存储从Resource event handler 中 传递过来的,待处理对象的Key。

Resource event handler 函数通常会获取待处理对象的key,并将key放入到这个workqueue中。

  • Process Item

这个函数为循环函数,它不断从 Work queue 中取出对象的 key,并使用 Indexer Reference 获取这个 key 对应的具体资源对象,然后根据资源的变化,做具体的调谐 Reconcile 动作。

2、使用 client-go 编写 Controller 的步骤

  • 先从 client-go 中获取对应资源的 Informer

  • 提供一系列的 Resource event handlers,并加入对应的 Informer,供该 informer 回调

  • 提供一个 Workqueue 队列,存储待处理的对象的 Key

  • 提供一个循环函数 ProcessItem,不断从 Workqueue 中取出对象的 key,交给处理函数 Worker

  • 提供一个处理函数 Worker,根据对象 Key,使用对应资源的 Indexer,获取到该对象的实例,根据对象的属性变化,做真正的调谐过程。

3、client

client-go 主要提供了4种 client 组件:

  • RESTClient:最基础的客户端,提供最基本的封装,可以通过它组装与 API Server 即时通讯的 url,然后发起 http 请求。

  • Clientset:是一个 Client 的集合,在 Clientset 中包含了所有 K8S 内置资源的 Client,通过 Clientset 便可以很方便的操作如 Pod、Service 这些资源

  • dynamicClient:动态客户端,可以操作任意 K8S 的资源,包括 CRD 定义的资源

  • DiscoveryClient:用于发现 K8S 提供的资源组、资源版本和资源信息,比如:kubectl api-resources

tools/clientcmd 工具:

  • 源码位于 client-go/tools/clientcmd 包下

  • clientcmd 是 Kubernetes Go 客户端库(client-go)中的一个包,用于加载和解析 Kubernetes 配置文件,并辅助创建与 Kubernetes API 服务器进行通信的客户端。

clientcmd 提供了一些功能,使得在客户端应用程序中处理 Kubernetes 配置变得更加方便。主要包含以下几个方面的功能:

  • 加载配置文件:clientcmd 可以根据指定的路径加载 Kubernetes 配置文件,例如 kubeconfig 文件。

  • 解析配置文件:一旦加载了配置文件,clientcmd 提供了解析配置文件的功能,可以获取各种配置信息,如集群信息、认证信息、上下文信息等。

  • 辅助创建客户端:clientcmd 可以使用配置文件中的信息,辅助创建与 Kubernetes API 服务器进行通信的客户端对象。这些客户端对象可以用来执行对 Kubernetes 资源的增删改查操作。

  • 切换上下文:clientcmd 还支持在多个上下文之间进行切换。上下文表示一组命名空间、集群和用户的组合,用于确定客户端与哪个Kubernetes 环境进行通信。

之后的几种客户端中,都会使用 clientcmd 作为构建 config 的工具。

3.1、RestClient

RESTClient 是最基础的客户端,提供与 APIServer 通信的最基本封装,可以向 APIServer 发送 Restful 风格请求。其他三种 Client,其实都是 RESTClient 的再封装,内部都使用了 RESTClient

type RESTClient struct {
	base *url.URL
	versionedAPIPath string
	content ClientContentConfig
	createBackoffMgr func() BackoffManager
	rateLimiter flowcontrol.RateLimiter
	warningHandler WarningHandler
	Client *http.Client
}

/rest/client.go 中有一个接口 Interface

// Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {
	GetRateLimiter() flowcontrol.RateLimiter
	Verb(verb string) *Request
	Post() *Request
	Put() *Request
	Patch(pt types.PatchType) *Request
	Get() *Request
	Delete() *Request
	APIVersion() schema.GroupVersion
}

RESTClient 实现了这个接口,因此具有所有的方法,用于发送各种 restful 类型的请求。

另外,Interface 每个方法的返回值都是 Request 类型,Request 类型的各种方法,很多的返回值也是 Request,这样就可以实现 链式编程

Request 位于 /rest/request.go

func (r *Request) Namespace(namespace string) *Request:设置当前Resquest访问的 namespace
func (r *Request) Resource(resource string) *Request:设置当前Resquest 想要访问的资源类型
func (r *Request) Name(resourceName string) *Request:设置当前Resquest 想要访问的资源的名称
func (r *Request) Do(ctx context.Context) Result:格式化并执行请求。返回一个 Result 对象,以便于处理响应。

rest.Config 位于 rest/config.go 中,用于描述 kubernetes 客户端的通用属性:

type Config struct {
	// API 服务器的主机地址,格式为 https://<hostname>:<port>。默认情况下,它为空字符串,表示使用当前上下文中的集群配置。
	Host string
	
	// 指定 API 服务器的路径,目前只有两种取值:/api、/apis
	// - /api:访问core API 组资源时,其实group值为空
	// - /apis:访问其他 API 组资源时,都是apis,他们都有group值
	APIPath string

	// 对请求内容的配置,会影响对象在发送到服务器时的转换方式
	// - ContentConfig中有两个重要属性:
	//   - NegotiatedSerializer:用于序列化和反序列化请求和响应的接口
	//   - GroupVersion:请求资源的 API 组和版本
	ContentConfig

	// 用于进行基本身份验证的用户名的字符串
	Username string
	
	// 用于进行基本身份验证的密码的字符串
	Password string `datapolicy:"password"`

	// 用于进行身份验证的令牌的字符串
	BearerToken string `datapolicy:"token"`

	// 包含身份验证令牌的文件的路径
	BearerTokenFile string

	// TLS 客户端配置,包括证书和密钥
	TLSClientConfig

	// 每秒允许的请求数(Queries Per Second)。默认为 5.0。
	QPS float32

	// 突发请求数。默认为 10
	Burst int

	// 速率限制器,用于控制向 API 服务器发送请求的速率
	RateLimiter flowcontrol.RateLimiter

	// 与 API 服务器建立连接的超时时间
	Timeout time.Duration

	// 用于创建网络连接的 Dial 函数
	Dial func(ctx context.Context, network, address string) (net.Conn, error)

	// ......
}

在有了config后,就可以使用func RESTClientFor(config Config) (RESTClient, error) ,根据 config 返回一个 RESTClient。

使用实例:

package main

import (
	"context"
	"fmt"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		panic(err)
	}
	config.GroupVersion = &v1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs
	config.APIPath = "/api"

  // 返回对应 GV 的 restclient
	restClient, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err)
	}

	// GET request
	pods := v1.PodList{}
	err = restClient.Get().Namespace("kube-system").Resource("pods").Do(context.TODO()).Into(&pods)
	if err != nil {
		panic(err)
	}
	for _, pod := range pods.Items {
		fmt.Println("GET Pod Name:", pod.Name)
	}

	// POST request
	newPod := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name: "new-pod",
		},
		Spec: v1.PodSpec{
			Containers: []v1.Container{
				{
					Name:  "nginx",
					Image: "nginx",
				},
			},
		},
	}
	result := &v1.Pod{}
	err = restClient.Post().Namespace("default").Resource("pods").Body(newPod).Do(context.TODO()).Into(result)
	if err != nil {
		panic(err)
	}
	fmt.Println("POST Pod Name:", result.Name)

	// PUT request
	updatedPod := result.DeepCopy()
	updatedPod.Labels = map[string]string{"updated": "true"}
	err = restClient.Put().Namespace("default").Resource("pods").Name(updatedPod.Name).Body(updatedPod).Do(context.TODO()).Into(result)
	if err != nil {
		panic(err)
	}
	fmt.Println("PUT Pod Name:", result.Name)

	// DELETE request
	err = restClient.Delete().Namespace("default").Resource("pods").Name(result.Name).Do(context.TODO()).Error()
	if err != nil {
		panic(err)
	}
	fmt.Println("DELETE Pod Name:", result.Name)
}

从上面的示例中发现,使用clientcmd.BuildConfigFromFlags从 kubeconfig 文件中获取到了 apiserver 的 host,然后用想要访问的资源的 apipath 进行填充,构造完整的 config 后,用rest.RESTClientFor 构造 restclient,之后就可以利用 rest 客户端进行各种 restful 请求了。

(API Server 本质就是一个 http 服务端,我们的程序就是客户端,向对应的路径发送请求,就是请求路径对应的资源)

3.2、Clientset

从上面 restclient 的使用中发现,在操作资源时需要提前准备很多东西:

  • 要操作 pods,需要指定config,给config设置 APIPath 为 “/api”、设置序列化器、设置 GroupVersion,最后还要调用 rest.RESTClientFor(config) 得到一个用于操作 pods 的 restclient。

  • 而如果要操作 deployment,这个过程又需要写一遍,然后又得到一个用于操作 deployment 的 restclient。

因此,可以事先创建各种资源的 RESTClient,存起来备用。Clientset 就是这样封装起来的一个 RESTClient 集合

Clientset 是一系列 RESTClient 的集合。

clientset 结构体位于 /kubernetes/clientset.go

type Clientset struct {
	......
	appsV1                        *appsv1.AppsV1Client
	appsV1beta1                   *appsv1beta1.AppsV1beta1Client
	appsV1beta2                   *appsv1beta2.AppsV1beta2Client
	authenticationV1              *authenticationv1.AuthenticationV1Client
	authenticationV1alpha1        *authenticationv1alpha1.AuthenticationV1alpha1Client
	authenticationV1beta1         *authenticationv1beta1.AuthenticationV1beta1Client
	authorizationV1               *authorizationv1.AuthorizationV1Client
	authorizationV1beta1          *authorizationv1beta1.AuthorizationV1beta1Client
	autoscalingV1                 *autoscalingv1.AutoscalingV1Client
	autoscalingV2                 *autoscalingv2.AutoscalingV2Client
	autoscalingV2beta1            *autoscalingv2beta1.AutoscalingV2beta1Client
	autoscalingV2beta2            *autoscalingv2beta2.AutoscalingV2beta2Client
	batchV1                       *batchv1.BatchV1Client
	batchV1beta1                  *batchv1beta1.BatchV1beta1Client
	certificatesV1                *certificatesv1.CertificatesV1Client
	certificatesV1beta1           *certificatesv1beta1.CertificatesV1beta1Client
	certificatesV1alpha1          *certificatesv1alpha1.CertificatesV1alpha1Client
	coordinationV1beta1           *coordinationv1beta1.CoordinationV1beta1Client
	coordinationV1                *coordinationv1.CoordinationV1Client
	coreV1                        *corev1.CoreV1Client
	......
}

以 appsv1 的类型 *appsv1.AppsV1Client 举例:可以看到,内部包含了一个 restClient。这也进一步认证,Clientset 就是一系列 RESTClient 的集合。

type AppsV1Client struct {
	restClient rest.Interface
}

Clientset 实现了 /kubernetes/clientset.go 下的 Interface接口

type Interface interface {
	......
	AppsV1() appsv1.AppsV1Interface
	AppsV1beta1() appsv1beta1.AppsV1beta1Interface
	AppsV1beta2() appsv1beta2.AppsV1beta2Interface
	AuthenticationV1() authenticationv1.AuthenticationV1Interface
	AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
	AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
	AuthorizationV1() authorizationv1.AuthorizationV1Interface
	AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
	AutoscalingV1() autoscalingv1.AutoscalingV1Interface
	AutoscalingV2() autoscalingv2.AutoscalingV2Interface
	AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
	AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
	BatchV1() batchv1.BatchV1Interface
	BatchV1beta1() batchv1beta1.BatchV1beta1Interface
	CertificatesV1() certificatesv1.CertificatesV1Interface
	CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
	CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
	CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
	CoordinationV1() coordinationv1.CoordinationV1Interface
	CoreV1() corev1.CoreV1Interface
	......
}

AppsV1() 方法为例,返回值是接口 appsv1.AppsV1Interface 的实现类 appsv1.AppsV1Client 的对象,用这些方法就可以找到对应 GV 的 restclient。

// 接口
type AppsV1Interface interface {
	RESTClient() rest.Interface
	ControllerRevisionsGetter
	DaemonSetsGetter
	DeploymentsGetter
	ReplicaSetsGetter
	StatefulSetsGetter
}

// 实现类
type AppsV1Client struct {
	restClient rest.Interface
}

// AppsV1Client 实现 AppsV1Interface 接口
func (c *AppsV1Client) RESTClient() rest.Interface {
	if c == nil {
		return nil
	}
	return c.restClient
}

有了这个 GV 的 restclient 后,再用类似下面的这种方法,就可以找到这个 restclient 中包含的资源:

一个 GV 对应了一个 restclient,而一个 GV 里面,又会有很多的资源。GV的分布可以看 apimachinery、api、client-go

// 返回值是DeploymentInterface 
func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {
	// 实际上,返回值是 DeploymentInterface 的实现类 deployments 的对象
	return newDeployments(c, namespace)
}

// 构造一个 deployments 的对象
func newDeployments(c *AppsV1Client, namespace string) *deployments {
	return &deployments{
		client: c.RESTClient(),
		ns:     namespace,
	}
}

而 deployment 可以有以下这些方法:

type DeploymentInterface interface {
	Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
	Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
	UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
	Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
	DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
	Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
	List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
	Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
	ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
	GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
	UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
	ApplyScale(ctx context.Context, deploymentName string, scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error)

	DeploymentExpansion
}

观察其中的 create 方法:

func (c *deployments) Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (result *v1.Deployment, err error) {
	result = &v1.Deployment{}
	err = c.client.Post().
		Namespace(c.ns).
		Resource("deployments").
		VersionedParams(&opts, scheme.ParameterCodec).
		Body(deployment).
		Do(ctx).
		Into(result)
	return
}

可以发现,和使用 restclient 是一样的。

所以,一个 GroupVersion 对应一个 restclient,而 clientset 就是把一堆 restclient 集合在一起。

以上这些实现的方法也不是人写的,而是生成器生成的,code-generator 提供了很多工具用于为 k8s 中的资源生成相关代码,其中包括一个 client-gen,可以为资源生成标准的操作方法(get;list;watch;create;update;patch;delete)。具体可以看 生成器

在获取到 config 后,可以使用 kubernetes.NewForConfig() ,根据 config 生成 clientset。

使用示例:

package main

import (
	"context"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		panic(err)
	}

	clientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	
	pods, err := clientSet.CoreV1().Pods(v1.NamespaceDefault).List(context.TODO(), v1.ListOptions{})
	if err != nil {
		panic(err)
	}
	for _, pod := range pods.Items {
		println(pod.Name)
	}
	
	deploys, err := clientSet.AppsV1().Deployments("kube-system").List(context.TODO(), v1.ListOptions{})
	if err != nil {
		panic(err)
	}
	for _, deploy := range deploys.Items {
		println(deploy.Name)
	}
}

从上面的示例中发现:在获取到 clientset 后,使用对应的类似于CoreV1()的 GV 方法,可以获取到对应 CoreV1 的 restclient,然后再用对应的资源方法Pods()就可以找到这个 restclient 下对应资源,然后再用这个资源的方法List()

3.3、DynamicClient

Clientset 是一系列 RESTClient 的集合,创建一个 Clientset 实例,其中已经包含了所有 kubernetes 内置资源的 RESTClient,可是对于 CRD 自定义资源定义,Clientset 就没有操作它们的 RESTClient 了。

因此,client-go 特意提供了一种客户端,即 DynamicClient,可以操作任意的 kubernetes 资源,包括内置资源 + CR 资源。DynamicClient 结构位于/client-go/dynamic/simple.go

type DynamicClient struct {
   client rest.Interface
}

可以看到,DynamicClient 中只包含一个RESTClient的字段 client

DynamicClient 只有一个实例方法: Resource(resource schema.GroupVersionResource),用于指定当前 DynamicClient 要操作的究竟是什么类型。

func (c *DynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
	return &dynamicResourceClient{client: c, resource: resource}
}

dynamicResourceClient 包含了 DynamicClient 对象、要操作的类型的 GVR、还有资源的 ns。

type dynamicResourceClient struct {
	client    *DynamicClient
	namespace string
	resource  schema.GroupVersionResource
}

可以调用它的Namespace方法,为namespace字段赋值

func (c *dynamicResourceClient) Namespace(ns string) ResourceInterface {
	ret := *c
	ret.namespace = ns
	return &ret
}

Unstructured

前面使用 RESTClient 或 Clientset,调用的是某个资源的 Get、List 等方法,返回值都是确定的。比如 调用 Pod 资源的 List,我们能够确定返回的一定是 PodList,所以 Pod 的 List 方法,返回值就是 PodList,固定的。

可当使用 DynamicClient 时,DynamicClient 没有确定的资源,资源类型是我们使用的时候才会去指定 GVR 的,所以开发DynamicClient 的时候,Get 方法自然无法确定返回值类型。

那么kubernetes人员应该如何开发这个 Get 方法?

我们自然而然的能够想到,如果有一个通用的数据结构,可以表示所有类型的资源,问题就解决了。Unstructured 应运而生。它就是一个可以表示所有资源的类型。

Unstructured 结构位于 staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unsructured/unsructured.go

type Unstructured struct {
	// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
	// map[string]interface{}
	// children.
	Object map[string]interface{}
}

我们最终操作资源,其实使用的是 dynamicResourceClient

dynamicResourceClient 实现了 ResourceInterface 接口的所有方法,这些方法就是我们最终操作资源所调用的。

type ResourceInterface interface {
	Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
	Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
	UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
	Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
	DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error
	Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
	List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
	Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error)
	ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error)
}

可以看到:

  • Create 方法因为不知道要创建什么资源,所以参数接收的是 obj *unstructured.Unstructured

  • Get方法因为不知道返回的是什么资源,所以返回的是 *unstructured.Unstructured

  • List方法因为不知道返回的是什么资源列表,所以返回的是 *unstructured.UnstructuredList。这个结构里面包含一个 []unstructured.Unstructured

既然 dynamicResourceClient 的方法接收和返回的,很多都是 Unstructured 类型,那么我们就需要实现真正的资源对象 与 Unstructured 的相互转换

runtime 包下,给我们提供了一个 UnstructuredConverter 接口,接口中提供了两个方法,分别用于资源对象-->Unstructured 和 Unstructured-->资源对象。位于 staging/src/k8s.io/apimachinery/pkg/runtime/converter.go

type UnstructuredConverter interface {
   ToUnstructured(obj interface{}) (map[string]interface{}, error)
   FromUnstructured(u map[string]interface{}, obj interface{}) error
}

UnstructuredConverter 接口只有一个实现类 unstructuredConverter。不过,unstructuredConverter 类型开头小写,我们无法直接使用,kubernetes 创建了一个全局变量 DefaultUnstructuredConverter,类型就是 unstructuredConverter,用以供外界使用。

我们直接使用 runtime.DefaultUnstructuredConverter,调用它的 ToUnstructured 或 FromUnstructured 方法,就可以实现 Unstructured 与资源对象的相互转换了。

示例:

package main

import (
	"context"
	"fmt"
	appsv1 "k8s.io/api/apps/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		panic(err)
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	// 使用 DynamicClient.Resource(),指定要操作的资源对象,获取到该资源的 Client
	dynamicResourceClient := dynamicClient.Resource(schema.GroupVersionResource{
		Group:    "apps",
		Version:  "v1",
		Resource: "deployments",
	})

	// 先为该Client指定ns,然后调用 Client 的 Get() 方法,获取到该资源对象
	unstructured, err := dynamicResourceClient.
		Namespace("kube-system").
		Get(context.TODO(), "coredns", metav1.GetOptions{})
	if err != nil {
		panic(err)
	}

	// 调用 runtime.DefaultUnstructuredConverter.FromUnstructured(),将 unstructured 反序列化成 Deployment 对象
	deploy := &appsv1.Deployment{}
	err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.UnstructuredContent(), deploy)
	if err != nil {
		panic(err)
	}

	// 打印 deploy 名称和命名空间
	fmt.Printf("deploy.Name: %s\ndeploy.namespace: %s\n", deploy.Name, deploy.Namespace)
}

还是先根据 kubeconfig 文件获取到 config,然后创建 dynamicclient,再根据 GVR 找到 dynamicResourceClient,然后调用方法获取到资源,再用runtime.DefaultUnstructuredConverter.FromUnstructured()进行通用结构到具体结构的转换。

3.4、DiscoveryClient

前面的3种Client:RESTClient、Clientset、DynamicClient,都是用来操作 kubernetes 资源的,目前还缺少一个用于检索 kuberentes资源的 Client。DiscoveryClient 就是这样一个 Client。用于检索 kuberentes集群中支持的 API 资源的相关信息,例如版本、组、资源类型等。

DiscoveryClient 提供了一组方法来查询和获取这些信息,以便在编写 Controller 或 Operator 时,能够动态地了解集群中可用的资源。

DiscoveryClient 结构体很简单,位于 staging/src/k8s.io/client-go/discovery/discovery_client.go

type DiscoveryClient struct {
	restClient restclient.Interface

	LegacyPrefix string
	// Forces the client to request only "unaggregated" (legacy) discovery.
	UseLegacyDiscovery bool
}

LegacyPrefix 字段表示旧版本资源的访问前缀,一般值都是/api。Kubernetes 1.16 以前,资源的访问前缀都是 /api,1.16及之后,全面改成 /apis,为了兼容旧资源,这里特意保存了一个常量字符串 /api

DiscoveryClient 实现了接口的所有方法,用于获取 API 资源的各种信息

type DiscoveryInterface interface {
	RESTClient() restclient.Interface
	ServerGroupsInterface
	ServerResourcesInterface
	ServerVersionInterface
	OpenAPISchemaInterface
	OpenAPIV3SchemaInterface
	// Returns copy of current discovery client that will only
	// receive the legacy discovery format, or pointer to current
	// discovery client if it does not support legacy-only discovery.
	WithLegacy() DiscoveryInterface
}

除了 DiscoveryClient,还有一个实现了接口 DiscoveryInterface 的CachedDiscoveryClient

  • CachedDiscoveryClient 就是实现了缓存机制的 DiscoveryClient 的封装

  • CachedDiscoveryClient 在 DiscoveryClient 的基础上增加了一层缓存,用于缓存获取的资源信息,以减少对 API Server 的频繁请求。

  • 在首次调用时,CachedDiscoveryClient 会从 API Server 获取资源信息,并将其缓存在本地。之后的调用会直接从缓存中获取资源信息,而不需要再次向 API Server 发送请求。

  • 因为集群部署完成后,API 资源基本很少变化,所以缓存下来可以很好的提高请求效率。

  • kubectl 工具内部,kubectl api-versions命令其实就是使用了这个 CachedDiscoveryClient,所以多次执行kubectl api-versions命令,其实只有第一次请求了 API Server,后续都是直接使用的本地缓存。

可以通过/client-go/discovery/discovery_client.go 中的func NewDiscoveryClientForConfig(c *restclient.Config) (*DiscoveryClient, error) 创建发现客户端:

func NewDiscoveryClientForConfig(c *restclient.Config) (*DiscoveryClient, error) {
	config := *c
	if err := setDiscoveryDefaults(&config); err != nil {
		return nil, err
	}
	httpClient, err := restclient.HTTPClientFor(&config)
	if err != nil {
		return nil, err
	}
	return NewDiscoveryClientForConfigAndClient(&config, httpClient)
}

示例:

func main() {
	// 1、先创建一个客户端配置config
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		panic(err.Error())
	}

	// 2、使用 discovery.NewDiscoveryClientForConfig(),创建一个 DiscoveryClient 对象
	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	// 3、使用 DiscoveryClient.ServerGroupsAndResources(),获取所有资源列表
	_, resourceLists, err := discoveryClient.ServerGroupsAndResources()
	if err != nil {
		panic(err.Error())
	}

	// 4、遍历资源列表,打印出资源组和资源名称
	for _, resource := range resourceLists {
		fmt.Printf("resource groupVersion: %s\n", resource.GroupVersion)
		for _, resource := range resource.APIResources {
			fmt.Printf("resource name: %s\n", resource.Name)
		}
		fmt.Println("--------------------------")
	}
}

4、informer

  • 从上面架构图中可以看出,client-go 中包含的组件主要有:Reflector、DeltaFIFO、Controller、Indexer、Processer。

  • 这个架构图会迷惑我们,让我们觉得 client-go 只维护了这么一套组件,实现功能。实际上 client-go 并不是只有一套这种组件,而是以 informer 作为基本单位,每个 informer 有自己的一套组件。

  • 每种GVR资源,都有自己对应的 informer,我们可以选择性的创建informer 并启动,进而仅从 apiserver ListAndWatch 自己需要的资源就可以了

  • 每创建一种资源的 informer 对象,该对象就携带了自己需要的 Reflector、DeltaFIFO、Controller、Indexer、Processer,启动之后,该 informer 作为一个单独的协程启动,负责自己资源的 ListAndWatch、缓存、事件处理等。

使用示例:

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		klog.Fatalf("Failed to create config: %v", err)
	}
	
    // 初始化与apiserver通信的clientset
    clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatalf("Failed to create client: %v", err)
	}
	
	// 初始化shared informer factory以及pod informer
	factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
	podInformer := factory.Core().V1().Pods()
	informer := podInformer.Informer()
	
	// 注册informer的自定义ResourceEventHandler
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    xxx,
		UpdateFunc: xxx,
		DeleteFunc: xxx,
	})
	
	// 启动shared informer factory,开始informer的list & watch操作
	stopper := make(chan struct{})
	go factory.Start(stopper)
	
	// 等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中 
	// 或者调用factory.WaitForCacheSync(stopper)
	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}
	
	// 创建lister
	podLister := podInformer.Lister()
	// 从informer中的indexer本地缓存中获取对象
	podList, err := podLister.List(labels.Everything())
	if err != nil {
		fmt.Println(err)
	}
}
  • SharedInformerFactory是什么,等会再说。只需要知道他是一个工厂,可以创建 informer

  • factory.Core().V1().Pods() 和 podInformer.Informer(),最终就是创建了一个 pod 资源的 informer,该 informer 会自动保存在 factory 中

  • factory.Start 就是把 factory 中已经创建的所有 informer,都启动起来,每个 informer 就是一个单独的协程,互不影响,各自进行 ListAndWatch

  • podInformer.Lister() 和 podLister.List() 就是从 pod 的这个 informer 中,获取缓存数据。可以看到获取缓存的时候,确定到了某一个具体的 informer

SharedInformerFactory

SharedInformerFactory 主要有两部分功能:

  • 作为创建 informer 的工厂,用于创建所有类型的 informer

  • SharedInformerFactory 内部还会缓存所有已经创建过的 informer,下次再创建,就会直接使用缓存,不会再创建新的 informer,节省了资源

type sharedInformerFactory struct {
	// 这个client,是clientset类型的客户端,用于与apiserver交互
	client           kubernetes.Interface
	// 限制 当前SharedInformerFactory 所创建的 Informer 只关注指定命名空间中的资源变化
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	lock             sync.Mutex
	defaultResync    time.Duration
	customResync     map[reflect.Type]time.Duration

	// 缓存已经创建的全部informer
	informers map[reflect.Type]cache.SharedIndexInformer
	// 缓存已经启动的 informer,只存 类型:是否启动
	startedInformers map[reflect.Type]bool
}

factory.Core().V1().Pods() 返回的是 PodInformer 接口类型,位于 informers/core/v1/pod.go

type PodInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.PodLister
}
  • informers包下,给每一种GVR,都提供了一个接口类型,用于表示这种 GVR 资源的informer。比如还有 NodeInformer、NamespaceInformer、SecretInformer等

  • 不过这个 PodInformer 并不是我们直接使用的,我们最终使用的还是 cache.SharedIndexInformer 类型,所以 这些资源 xxxInformer 接口,都提供了一个 Informer() cache.SharedIndexInformer 方法,用于获取一个为该资源工作的 cache.SharedIndexInformer

  • 因此,这些资源 xxxInformer 接口,其实就是对 cache.SharedIndexInformer 类型的封装。

  • client-go源码中,/client-go/informers 包下,就是 factory 结构+所有GVR资源的 informer 结构

5、client-go 开发自定义控制器

我们希望实现这个效果:

  • 创建或更新 Service 的时候,如果这个 Service 的 Annotations 中,包含了"ingress/http:true",那么在创建或更新这个 Service 的时候,会自动为它创建一个 Ingress。

  • 删除Service的时候,如果这个 Service 的 Annotations 中,包含了"ingress/http:true",那么同时也要删除它的 Ingress。

需求分析:

这个效果需要编写三个事件处理方法,addService、updateService、deleteIngress

  • addService/updateService:用户创建或更新 service 的时候,kubernetes的 ServiceController 已经完成 service 的创建或更新了。我们要做的是拿到已存在的 service 对象,看是否包含 “ingress/http:true”。如果包含,则保证有一个对应的 ingress;如果不包含,则保证不能有对应的ingress

  • deleteIngress:用于删除ingress的时候,也触发addService/updateService一样的逻辑,保证service和ingress的对应关系是正确的。

你可能会疑问,为什么没有 deleteService?

因为我们会使用 OwnerReferences 将 service+ingress 关联起来。因此删除service,会由 kubernetes 的 ControllerManager 中的特殊 Controller,自动完成 ingress 的 gc,所以删除service时我们无需特殊处理。

main函数如下:

package main

import (
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"log"
	"share-code-operator-study/addingress/pkg"
)

func main() {
	// 创建一个 集群客户端配置
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		inClusterConfig, err := rest.InClusterConfig()
		if err != nil {
			log.Fatalln("can't get config")
		}
		config = inClusterConfig
	}

	// 创建一个 clientset 客户端,用于创建 informerFactory
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	// 创建一个 informerFactory
	factory := informers.NewSharedInformerFactory(clientset, 0)
	// 使用 informerFactory 创建Services资源的 informer对象
	serviceInformer := factory.Core().V1().Services()
	// 使用 informerFactory 创建Ingresses资源的 informer对象
	ingressInformer := factory.Networking().V1().Ingresses()

	// 创建一个自定义控制器
	controller := pkg.NewController(clientset, serviceInformer, ingressInformer)

	// 创建 停止channel信号
	stopCh := make(chan struct{})
	// 启动 informerFactory,会启动已经创建的 serviceInformer、ingressInformer
	factory.Start(stopCh)
	// 等待 所有informer 从 etcd 实现全量同步
	factory.WaitForCacheSync(stopCh)

	// 启动自定义控制器
	controller.Run(stopCh)
}

controller如下:

package pkg

import (
	"context"
	corev1 "k8s.io/api/core/v1"
	netv1 "k8s.io/api/networking/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	informercorev1 "k8s.io/client-go/informers/core/v1"
	informernetv1 "k8s.io/client-go/informers/networking/v1"
	"k8s.io/client-go/kubernetes"
	listercorev1 "k8s.io/client-go/listers/core/v1"
	listernetv1 "k8s.io/client-go/listers/networking/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
	"reflect"
	"time"
)

const (
	// worker 数量
	workNum = 5
	// service 指定 ingress 的 annotation key
	annoKey = "ingress/http"
	// 调谐失败的最大重试次数
	maxRetry = 10
)

// 自定义控制器
type controller struct {
	client        kubernetes.Interface
	serviceLister listercorev1.ServiceLister
	ingressLister listernetv1.IngressLister
	queue         workqueue.RateLimitingInterface
}

// NewController 创建一个自定义控制器
func NewController(clientset *kubernetes.Clientset, serviceInformer informercorev1.ServiceInformer, ingressInformer informernetv1.IngressInformer) *controller {
	// 控制器中,包含一个clientset、service和ingress的缓存监听器、一个workqueue
	c := controller{
		client:        clientset,
		serviceLister: serviceInformer.Lister(),
		ingressLister: ingressInformer.Lister(),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),
	}

	// 为 serviceInformer 添加 ResourceEventHandler
	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		// 添加service时触发
		AddFunc: c.addService,
		// 修改service时触发
		UpdateFunc: c.updateService,
		// 这里没有删除service的逻辑,因为我们会使用 OwnerReferences 将service+ingress关联起来。
		// 因此删除service,会由kubernetes的ControllerManager中的特殊Controller,自动完成ingress的gc
	})

	// 为 ingressInformer 添加 ResourceEventHandler
	ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		// 删除ingress时触发
		DeleteFunc: c.deleteIngress,
	})

	return &c
}

// 添加service时触发
func (c *controller) addService(obj interface{}) {
	// 将 添加service 的 key 加入 workqueue
	c.enqueue(obj)
}

// 修改service时触发
func (c *controller) updateService(oldObj interface{}, newObj interface{}) {
	// 如果两个对象一致,就无需触发修改逻辑
	if reflect.DeepEqual(oldObj, newObj) {
		return
	}
	// todo 比较annotation
	// 将 修改service 的 key 加入 workqueue
	c.enqueue(newObj)
}

// 删除ingress时触发
func (c *controller) deleteIngress(obj interface{}) {
	// 将对象转成ingress,并获取到它的 ownerReference
	ingress := obj.(*netv1.Ingress)
	ownerReference := metav1.GetControllerOf(ingress)
	// 如果ingress的 ownerReference 没有绑定到service,则无需处理
	if ownerReference == nil || ownerReference.Kind != "Service" {
		return
	}
	// 如果ingress的 ownerReference 已经绑定到service,则需要处理
	c.enqueue(obj)
}

// enqueue 将 待添加service 的 key 加入 workqueue
func (c *controller) enqueue(obj interface{}) {
	// 调用工具方法,获取 kubernetes资源对象的 key(默认是 ns/name,或 name)
	key, err := cache.MetaNamespaceKeyFunc(obj)
	// 获取失败,不加入队列,即本次事件不予处理
	if err != nil {
		runtime.HandleError(err)
		return
	}
	// 将 key 加入 workqueue
	c.queue.Add(key)
}

// dequeue 将处理完成的 key 出队
func (c *controller) dequeue(item interface{}) {
	c.queue.Done(item)
}

// Run 启动controller
func (c *controller) Run(stopCh chan struct{}) {
	// 启动多个worker,同时对workqueue中的事件进行处理,效率提升5倍
	for i := 0; i < workNum; i++ {
		// 每个worker都是一个协程,使用同一个停止信号
		go wait.Until(c.worker, time.Minute, stopCh)
	}
	// 启动完成后,Run函数就停止在这里,等待停止信号
	<-stopCh
}

// worker方法
func (c *controller) worker() {
	// 死循环,worker处理完一个,再去处理下一个
	for c.processNextItem() {

	}
}

// processNextItem 处理下一个
func (c *controller) processNextItem() bool {
	// 从 workerqueue 取出一个key
	item, shutdown := c.queue.Get()
	// 如果已经收到停止信号了,则返回false,worker就会停止处理
	if shutdown {
		return false
	}
	// 处理完成后,将这个key出队
	defer c.dequeue(item)

	// 转成string类型的key
	key := item.(string)

	// 处理service逻辑的核心方法
	err := c.syncService(key)
	// 处理过程出错,进入错误统一处理逻辑
	if err != nil {
		c.handleError(key, err)
	}
	// 处理结束,返回true
	return true
}

// handleError 错误统一处理逻辑
func (c *controller) handleError(key string, err error) {
	// 如果当前key的处理次数,还不到最大重试次数,则再次加入队列
	if c.queue.NumRequeues(key) < maxRetry {
		c.queue.AddRateLimited(key)
		return
	}

	// 运行时统一处理错误
	runtime.HandleError(err)
	// 不再处理这个key
	c.queue.Forget(key)
}

// syncService 处理service逻辑的核心方法
func (c *controller) syncService(key string) error {
	// 将 key 切割为 ns 和 name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	// 从indexer中,获取service
	service, err := c.serviceLister.Services(namespace).Get(name)
	// 没有service,直接返回
	if errors.IsNotFound(err) {
		return nil
	}
	if err != nil {
		return err
	}

	// 检查service的annotation,是否包含 key: "ingress/http"
	_, ok := service.Annotations[annoKey]
	// 从indexer缓存中,获取ingress
	ingress, err := c.ingressLister.Ingresses(namespace).Get(name)

	if ok && errors.IsNotFound(err) {
		// ingress不存在,但是service有"ingress/http",需要创建ingress
		// 创建ingress
		ig := c.createIngress(service)
		// 调用controller中的client,完成ingress的创建
		_, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metav1.CreateOptions{})
		if err != nil {
			return err
		}
	} else if !ok && ingress != nil {
		// ingress存在,但是service没有"ingress/http",需要删除ingress
		// 调用controller中的client,完成ingress的删除
		err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
		if err != nil {
			return err
		}
	}

	return nil
}

// createIngress 创建ingress
func (c *controller) createIngress(service *corev1.Service) *netv1.Ingress {
	icn := "ingress"
	pathType := netv1.PathTypePrefix
	return &netv1.Ingress{
		ObjectMeta: metav1.ObjectMeta{
			Name:      service.Name,
			Namespace: service.Namespace,
			OwnerReferences: []metav1.OwnerReference{
				*metav1.NewControllerRef(service, corev1.SchemeGroupVersion.WithKind("Service")),
			},
		},
		Spec: netv1.IngressSpec{
			IngressClassName: &icn,
			Rules: []netv1.IngressRule{
				{
					Host: "example.com",
					IngressRuleValue: netv1.IngressRuleValue{
						HTTP: &netv1.HTTPIngressRuleValue{
							Paths: []netv1.HTTPIngressPath{
								{
									Path:     "/",
									PathType: &pathType,
									Backend: netv1.IngressBackend{
										Service: &netv1.IngressServiceBackend{
											Name: service.Name,
											Port: netv1.ServiceBackendPort{
												Number: 80,
											},
										},
									},
								},
							},
						},
					},
				},
			},
		},
	}
}

整体的流程就是,informer 观察到有对应资源发生变化后,就会触发之前注册好的 eventhandler,把这个资源对象放入队列 DeltaFIFO 中。在 worker 进程中会一直循环执行 processNextItem,也就是从 DeltaFIFO 中拿 key 出来,然后再去查 indexer 找到对应的具体资源,进入 sync 调谐逻辑,然后再出队。

一个 GVR 对应一个 informer,所以 informer 可以监听对应资源的事件,而不是所有资源的事件。

注意其中的 NewController()、Run()、worker()、processNextItem()、handleError()、syncService() 这些方法的命名和处理逻辑。

你会发现 kubernetes 源码中,内置的控制器和他们是一样的, 即 kubernetes内置控制器也是使用 client-go 的这种方式,实现控制器开发的。