
client-go
client-go 是一个使用 go 语言调用 kubernetes 集群资源对象 API 的客户端,即通过 client-go 实现与 k8s apiserver 的交互(包括deployment、service、ingress、replicaSet、pod、namespace、node 等)的增删改查等操作。大部分对 kubernetes 进行前置 API 封装的二次开发都通过 client-go 这个第三方包来实现。
1、client-go架构
github地址:https://github.com/kubernetes/client-go
discovery:用于发现 API Server 都是支持哪些 API。
dynamic:包含了 kubernetes dynamic client 的逻辑,可以操作任意的 k8s 资源 API 对象,包括内置的、自定义的资源对象
informers:包含了所有内置资源的 informer,便于操作k8s的资源对象
kubernetes:包含了访问 Kubernetes API 的 所有 ClientSet
listers:包含了所有内置资源的 lister,用于读取缓存中 k8s 资源对象的信息
plugin/pkg/client/auth:包含所有可选的认证插件,用于从外部获取 credential(凭证)
tools:包含一系列工具,编写控制器时会用到很多里面的工具方法
transport:包含了创建连接、认证的逻辑,会被上层的 ClientSet 使用
下面是一个官方图形表示,展示了 client-go 库中的各种组件如何工作,以及它们与将要编写的自定义控制器代码的交互点:
整体流程简介:
Reflector 会持续监听 k8s 集群中指定资源类型的 API,当发现变动和更新时,就会创建一个发生变动的对象副本,并将其添加到队列 DeltaFIFO 中
Informer 监听 DeltaFIFO 队列,取出对象,做两件事:
(1)将对象加入 Indexer,Indexer 会将 [对象key, 对象] 存入一个线程安全的 Cache 中
(2)根据对象的资源类型和操作,找到对应 Controller 预先提供的 Resource Event Handler,调用 Handler,将对象的 Key 加入该 Controller 的 Workqueue
Controller 的循环函数 ProcessItem,监听到 Workqueue 有数据了,就会取出一个 key,交给处理函数 Worker,Worker 会根据 Key,使用 Indexer reference 从 Cache 中 获取该 key 对应的真实对象。然后就可以进行调谐了。
注意点:
DeltaFIFO 中存的是对象副本
Cache 中存的是 [对象key, 对象] 的映射
Workqueue 中存的是对象 Key
CRDController 中,使用 Informer 对象,是为了向其中添加一些 Resource Event Handlers
CRDController 中,使用 Indexer 对象,是为了根据对象 Key,获取对象实例
client-go组件:
Reflector
reflector 会一直监听 kubernetes 中指定资源类型的 API,实现监听的函数就是 ListAndWatch。这种监听机制既适用于 k8s 的内建资源,也适用于自定义资源。
当 reflector 通过监听 API 发现资源对象实例存在新的 notification 时,它就会使用 listing API 获取这个新的实例对象,并将其放入 watchHandler 函数内的 DeltaFIFO 中;
Informer
Informer 会从 Delta FIFO 中取出对象。实现这个功能的方法对应源码中的 processLoop;
Informer 取出对象后,根据 Resource 类型,调用对应的 Resource Event Handler 回调函数,该函数实际上由某个具体的 Controller 提供,函数中会获取对象的 key,并将 key 放入到该Controller 内部的 Workqueue 中,等候处理。
Indexer 和 Thread Safe Store
Indexer 会提供对象的索引功能,通常是基于对象 Key 来创建索引。
Indexer 维护着一个线程安全的 Cache,即 Thread Safe Store。存储的是[对象key, 对象],用对象 Key 可以进行获取对象实例。
Resource Event Handlers reference
这实际上是所有 Controller 的 Resource Event Handlers 的引用。
这些 handlers 由具体的Controller提供,就是 Informer 的回调函数。Informer 会根据资源的类型,调用对应 Controller 的 handler 方法
handler 通常都是用于将资源对象的 key 放入到该Controller 内部的 Workqueue 中,等候处理。
自定义控制器组件:
Informer reference
Informer reference 是 Informer 实例对象的引用,用于操作和处理自定义资源对象
我们编写自定义控制器时,需要引用自己需要的 Informer,向其中加入一系列 Resource Event Handlers
Indexer reference
Indexer reference 是 Indexer 实例对象的引用,用于根据对象 Key 索引资源对象
我们编写自定义控制器时,应该创建 Indexer 的引用,将对象 Key 传给它,就可以获取想要处理的对象
Resource Event Handlers
由具体的 Controller 给 Client-go 的 Informer 提供的回调函数,获取待处理对象的 key,并将 key 放入到 Workqueue 中。
Workqueue
此队列是 具体的Controller 内部创建的队列,用于暂时存储从Resource event handler 中 传递过来的,待处理对象的Key。
Resource event handler 函数通常会获取待处理对象的key,并将key放入到这个workqueue中。
Process Item
这个函数为循环函数,它不断从 Work queue 中取出对象的 key,并使用 Indexer Reference 获取这个 key 对应的具体资源对象,然后根据资源的变化,做具体的调谐 Reconcile 动作。
2、使用 client-go 编写 Controller 的步骤
先从 client-go 中获取对应资源的 Informer
提供一系列的 Resource event handlers,并加入对应的 Informer,供该 informer 回调
提供一个 Workqueue 队列,存储待处理的对象的 Key
提供一个循环函数 ProcessItem,不断从 Workqueue 中取出对象的 key,交给处理函数 Worker
提供一个处理函数 Worker,根据对象 Key,使用对应资源的 Indexer,获取到该对象的实例,根据对象的属性变化,做真正的调谐过程。
3、client
client-go 主要提供了4种 client 组件:
RESTClient:最基础的客户端,提供最基本的封装,可以通过它组装与 API Server 即时通讯的 url,然后发起 http 请求。
Clientset:是一个 Client 的集合,在 Clientset 中包含了所有 K8S 内置资源的 Client,通过 Clientset 便可以很方便的操作如 Pod、Service 这些资源
dynamicClient:动态客户端,可以操作任意 K8S 的资源,包括 CRD 定义的资源
DiscoveryClient:用于发现 K8S 提供的资源组、资源版本和资源信息,比如:
kubectl api-resources
tools/clientcmd 工具:
源码位于 client-go/tools/clientcmd 包下
clientcmd 是 Kubernetes Go 客户端库(client-go)中的一个包,用于加载和解析 Kubernetes 配置文件,并辅助创建与 Kubernetes API 服务器进行通信的客户端。
clientcmd 提供了一些功能,使得在客户端应用程序中处理 Kubernetes 配置变得更加方便。主要包含以下几个方面的功能:
加载配置文件:clientcmd 可以根据指定的路径加载 Kubernetes 配置文件,例如 kubeconfig 文件。
解析配置文件:一旦加载了配置文件,clientcmd 提供了解析配置文件的功能,可以获取各种配置信息,如集群信息、认证信息、上下文信息等。
辅助创建客户端:clientcmd 可以使用配置文件中的信息,辅助创建与 Kubernetes API 服务器进行通信的客户端对象。这些客户端对象可以用来执行对 Kubernetes 资源的增删改查操作。
切换上下文:clientcmd 还支持在多个上下文之间进行切换。上下文表示一组命名空间、集群和用户的组合,用于确定客户端与哪个Kubernetes 环境进行通信。
之后的几种客户端中,都会使用 clientcmd 作为构建 config 的工具。
3.1、RestClient
RESTClient 是最基础的客户端,提供与 APIServer 通信的最基本封装,可以向 APIServer 发送 Restful 风格请求。其他三种 Client,其实都是 RESTClient 的再封装,内部都使用了 RESTClient。
type RESTClient struct {
base *url.URL
versionedAPIPath string
content ClientContentConfig
createBackoffMgr func() BackoffManager
rateLimiter flowcontrol.RateLimiter
warningHandler WarningHandler
Client *http.Client
}
/rest/client.go 中有一个接口 Interface
// Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}
RESTClient 实现了这个接口,因此具有所有的方法,用于发送各种 restful 类型的请求。
另外,Interface 每个方法的返回值都是 Request 类型,Request 类型的各种方法,很多的返回值也是 Request,这样就可以实现 链式编程 了
Request 位于 /rest/request.go
func (r *Request) Namespace(namespace string) *Request:设置当前Resquest访问的 namespace
func (r *Request) Resource(resource string) *Request:设置当前Resquest 想要访问的资源类型
func (r *Request) Name(resourceName string) *Request:设置当前Resquest 想要访问的资源的名称
func (r *Request) Do(ctx context.Context) Result:格式化并执行请求。返回一个 Result 对象,以便于处理响应。
rest.Config 位于 rest/config.go 中,用于描述 kubernetes 客户端的通用属性:
type Config struct {
// API 服务器的主机地址,格式为 https://<hostname>:<port>。默认情况下,它为空字符串,表示使用当前上下文中的集群配置。
Host string
// 指定 API 服务器的路径,目前只有两种取值:/api、/apis
// - /api:访问core API 组资源时,其实group值为空
// - /apis:访问其他 API 组资源时,都是apis,他们都有group值
APIPath string
// 对请求内容的配置,会影响对象在发送到服务器时的转换方式
// - ContentConfig中有两个重要属性:
// - NegotiatedSerializer:用于序列化和反序列化请求和响应的接口
// - GroupVersion:请求资源的 API 组和版本
ContentConfig
// 用于进行基本身份验证的用户名的字符串
Username string
// 用于进行基本身份验证的密码的字符串
Password string `datapolicy:"password"`
// 用于进行身份验证的令牌的字符串
BearerToken string `datapolicy:"token"`
// 包含身份验证令牌的文件的路径
BearerTokenFile string
// TLS 客户端配置,包括证书和密钥
TLSClientConfig
// 每秒允许的请求数(Queries Per Second)。默认为 5.0。
QPS float32
// 突发请求数。默认为 10
Burst int
// 速率限制器,用于控制向 API 服务器发送请求的速率
RateLimiter flowcontrol.RateLimiter
// 与 API 服务器建立连接的超时时间
Timeout time.Duration
// 用于创建网络连接的 Dial 函数
Dial func(ctx context.Context, network, address string) (net.Conn, error)
// ......
}
在有了config后,就可以使用func RESTClientFor(config Config) (RESTClient, error)
,根据 config 返回一个 RESTClient。
使用实例:
package main
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
config.GroupVersion = &v1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
config.APIPath = "/api"
// 返回对应 GV 的 restclient
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
// GET request
pods := v1.PodList{}
err = restClient.Get().Namespace("kube-system").Resource("pods").Do(context.TODO()).Into(&pods)
if err != nil {
panic(err)
}
for _, pod := range pods.Items {
fmt.Println("GET Pod Name:", pod.Name)
}
// POST request
newPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "new-pod",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}
result := &v1.Pod{}
err = restClient.Post().Namespace("default").Resource("pods").Body(newPod).Do(context.TODO()).Into(result)
if err != nil {
panic(err)
}
fmt.Println("POST Pod Name:", result.Name)
// PUT request
updatedPod := result.DeepCopy()
updatedPod.Labels = map[string]string{"updated": "true"}
err = restClient.Put().Namespace("default").Resource("pods").Name(updatedPod.Name).Body(updatedPod).Do(context.TODO()).Into(result)
if err != nil {
panic(err)
}
fmt.Println("PUT Pod Name:", result.Name)
// DELETE request
err = restClient.Delete().Namespace("default").Resource("pods").Name(result.Name).Do(context.TODO()).Error()
if err != nil {
panic(err)
}
fmt.Println("DELETE Pod Name:", result.Name)
}
从上面的示例中发现,使用
clientcmd.BuildConfigFromFlags
从 kubeconfig 文件中获取到了 apiserver 的 host,然后用想要访问的资源的 apipath 进行填充,构造完整的 config 后,用rest.RESTClientFor
构造 restclient,之后就可以利用 rest 客户端进行各种 restful 请求了。(API Server 本质就是一个 http 服务端,我们的程序就是客户端,向对应的路径发送请求,就是请求路径对应的资源)
3.2、Clientset
从上面 restclient 的使用中发现,在操作资源时需要提前准备很多东西:
要操作 pods,需要指定config,给config设置 APIPath 为 “/api”、设置序列化器、设置 GroupVersion,最后还要调用 rest.RESTClientFor(config) 得到一个用于操作 pods 的 restclient。
而如果要操作 deployment,这个过程又需要写一遍,然后又得到一个用于操作 deployment 的 restclient。
因此,可以事先创建各种资源的 RESTClient,存起来备用。Clientset 就是这样封装起来的一个 RESTClient 集合。
Clientset 是一系列 RESTClient 的集合。
clientset 结构体位于 /kubernetes/clientset.go
中
type Clientset struct {
......
appsV1 *appsv1.AppsV1Client
appsV1beta1 *appsv1beta1.AppsV1beta1Client
appsV1beta2 *appsv1beta2.AppsV1beta2Client
authenticationV1 *authenticationv1.AuthenticationV1Client
authenticationV1alpha1 *authenticationv1alpha1.AuthenticationV1alpha1Client
authenticationV1beta1 *authenticationv1beta1.AuthenticationV1beta1Client
authorizationV1 *authorizationv1.AuthorizationV1Client
authorizationV1beta1 *authorizationv1beta1.AuthorizationV1beta1Client
autoscalingV1 *autoscalingv1.AutoscalingV1Client
autoscalingV2 *autoscalingv2.AutoscalingV2Client
autoscalingV2beta1 *autoscalingv2beta1.AutoscalingV2beta1Client
autoscalingV2beta2 *autoscalingv2beta2.AutoscalingV2beta2Client
batchV1 *batchv1.BatchV1Client
batchV1beta1 *batchv1beta1.BatchV1beta1Client
certificatesV1 *certificatesv1.CertificatesV1Client
certificatesV1beta1 *certificatesv1beta1.CertificatesV1beta1Client
certificatesV1alpha1 *certificatesv1alpha1.CertificatesV1alpha1Client
coordinationV1beta1 *coordinationv1beta1.CoordinationV1beta1Client
coordinationV1 *coordinationv1.CoordinationV1Client
coreV1 *corev1.CoreV1Client
......
}
以 appsv1 的类型 *appsv1.AppsV1Client 举例:可以看到,内部包含了一个 restClient。这也进一步认证,Clientset 就是一系列 RESTClient 的集合。
type AppsV1Client struct {
restClient rest.Interface
}
Clientset 实现了 /kubernetes/clientset.go 下的 Interface接口
type Interface interface {
......
AppsV1() appsv1.AppsV1Interface
AppsV1beta1() appsv1beta1.AppsV1beta1Interface
AppsV1beta2() appsv1beta2.AppsV1beta2Interface
AuthenticationV1() authenticationv1.AuthenticationV1Interface
AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
AuthorizationV1() authorizationv1.AuthorizationV1Interface
AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
AutoscalingV1() autoscalingv1.AutoscalingV1Interface
AutoscalingV2() autoscalingv2.AutoscalingV2Interface
AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
BatchV1() batchv1.BatchV1Interface
BatchV1beta1() batchv1beta1.BatchV1beta1Interface
CertificatesV1() certificatesv1.CertificatesV1Interface
CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
CoordinationV1() coordinationv1.CoordinationV1Interface
CoreV1() corev1.CoreV1Interface
......
}
以 AppsV1()
方法为例,返回值是接口 appsv1.AppsV1Interface
的实现类 appsv1.AppsV1Client
的对象,用这些方法就可以找到对应 GV 的 restclient。
// 接口
type AppsV1Interface interface {
RESTClient() rest.Interface
ControllerRevisionsGetter
DaemonSetsGetter
DeploymentsGetter
ReplicaSetsGetter
StatefulSetsGetter
}
// 实现类
type AppsV1Client struct {
restClient rest.Interface
}
// AppsV1Client 实现 AppsV1Interface 接口
func (c *AppsV1Client) RESTClient() rest.Interface {
if c == nil {
return nil
}
return c.restClient
}
有了这个 GV 的 restclient 后,再用类似下面的这种方法,就可以找到这个 restclient 中包含的资源:
一个 GV 对应了一个 restclient,而一个 GV 里面,又会有很多的资源。GV的分布可以看 apimachinery、api、client-go。
// 返回值是DeploymentInterface
func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {
// 实际上,返回值是 DeploymentInterface 的实现类 deployments 的对象
return newDeployments(c, namespace)
}
// 构造一个 deployments 的对象
func newDeployments(c *AppsV1Client, namespace string) *deployments {
return &deployments{
client: c.RESTClient(),
ns: namespace,
}
}
而 deployment 可以有以下这些方法:
type DeploymentInterface interface {
Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
ApplyScale(ctx context.Context, deploymentName string, scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error)
DeploymentExpansion
}
观察其中的 create 方法:
func (c *deployments) Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (result *v1.Deployment, err error) {
result = &v1.Deployment{}
err = c.client.Post().
Namespace(c.ns).
Resource("deployments").
VersionedParams(&opts, scheme.ParameterCodec).
Body(deployment).
Do(ctx).
Into(result)
return
}
可以发现,和使用 restclient 是一样的。
所以,一个 GroupVersion 对应一个 restclient,而 clientset 就是把一堆 restclient 集合在一起。
以上这些实现的方法也不是人写的,而是生成器生成的,code-generator 提供了很多工具用于为 k8s 中的资源生成相关代码,其中包括一个 client-gen,可以为资源生成标准的操作方法(get;list;watch;create;update;patch;delete)。具体可以看 生成器 。
在获取到 config 后,可以使用 kubernetes.NewForConfig()
,根据 config 生成 clientset。
使用示例:
package main
import (
"context"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
pods, err := clientSet.CoreV1().Pods(v1.NamespaceDefault).List(context.TODO(), v1.ListOptions{})
if err != nil {
panic(err)
}
for _, pod := range pods.Items {
println(pod.Name)
}
deploys, err := clientSet.AppsV1().Deployments("kube-system").List(context.TODO(), v1.ListOptions{})
if err != nil {
panic(err)
}
for _, deploy := range deploys.Items {
println(deploy.Name)
}
}
从上面的示例中发现:在获取到 clientset 后,使用对应的类似于CoreV1()
的 GV 方法,可以获取到对应 CoreV1 的 restclient,然后再用对应的资源方法Pods()
就可以找到这个 restclient 下对应资源,然后再用这个资源的方法List()
。
3.3、DynamicClient
Clientset 是一系列 RESTClient 的集合,创建一个 Clientset 实例,其中已经包含了所有 kubernetes 内置资源的 RESTClient,可是对于 CRD 自定义资源定义,Clientset 就没有操作它们的 RESTClient 了。
因此,client-go 特意提供了一种客户端,即 DynamicClient,可以操作任意的 kubernetes 资源,包括内置资源 + CR 资源。DynamicClient 结构位于/client-go/dynamic/simple.go
。
type DynamicClient struct {
client rest.Interface
}
可以看到,DynamicClient 中只包含一个RESTClient的字段 client
DynamicClient 只有一个实例方法: Resource(resource schema.GroupVersionResource)
,用于指定当前 DynamicClient 要操作的究竟是什么类型。
func (c *DynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
return &dynamicResourceClient{client: c, resource: resource}
}
dynamicResourceClient 包含了 DynamicClient 对象、要操作的类型的 GVR、还有资源的 ns。
type dynamicResourceClient struct {
client *DynamicClient
namespace string
resource schema.GroupVersionResource
}
可以调用它的Namespace方法,为namespace字段赋值
func (c *dynamicResourceClient) Namespace(ns string) ResourceInterface {
ret := *c
ret.namespace = ns
return &ret
}
Unstructured
前面使用 RESTClient 或 Clientset,调用的是某个资源的 Get、List 等方法,返回值都是确定的。比如 调用 Pod 资源的 List,我们能够确定返回的一定是 PodList,所以 Pod 的 List 方法,返回值就是 PodList,固定的。
可当使用 DynamicClient 时,DynamicClient 没有确定的资源,资源类型是我们使用的时候才会去指定 GVR 的,所以开发DynamicClient 的时候,Get 方法自然无法确定返回值类型。
那么kubernetes人员应该如何开发这个 Get 方法?
我们自然而然的能够想到,如果有一个通用的数据结构,可以表示所有类型的资源,问题就解决了。Unstructured 应运而生。它就是一个可以表示所有资源的类型。
Unstructured 结构位于
staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unsructured/unsructured.go
中type Unstructured struct { // Object is a JSON compatible map with string, float, int, bool, []interface{}, or // map[string]interface{} // children. Object map[string]interface{} }
我们最终操作资源,其实使用的是 dynamicResourceClient
dynamicResourceClient 实现了 ResourceInterface 接口的所有方法,这些方法就是我们最终操作资源所调用的。
type ResourceInterface interface { Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error) UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error) Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error) ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error) }
可以看到:
Create 方法因为不知道要创建什么资源,所以参数接收的是 obj *unstructured.Unstructured
Get方法因为不知道返回的是什么资源,所以返回的是 *unstructured.Unstructured
List方法因为不知道返回的是什么资源列表,所以返回的是 *unstructured.UnstructuredList。这个结构里面包含一个 []unstructured.Unstructured
既然 dynamicResourceClient 的方法接收和返回的,很多都是 Unstructured 类型,那么我们就需要实现真正的资源对象 与 Unstructured 的相互转换
runtime 包下,给我们提供了一个 UnstructuredConverter 接口,接口中提供了两个方法,分别用于资源对象-->Unstructured 和 Unstructured-->资源对象。位于
staging/src/k8s.io/apimachinery/pkg/runtime/converter.go
type UnstructuredConverter interface { ToUnstructured(obj interface{}) (map[string]interface{}, error) FromUnstructured(u map[string]interface{}, obj interface{}) error }
UnstructuredConverter 接口只有一个实现类 unstructuredConverter。不过,unstructuredConverter 类型开头小写,我们无法直接使用,kubernetes 创建了一个全局变量 DefaultUnstructuredConverter,类型就是 unstructuredConverter,用以供外界使用。
我们直接使用
runtime.DefaultUnstructuredConverter
,调用它的 ToUnstructured 或 FromUnstructured 方法,就可以实现 Unstructured 与资源对象的相互转换了。
示例:
package main
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
// 使用 DynamicClient.Resource(),指定要操作的资源对象,获取到该资源的 Client
dynamicResourceClient := dynamicClient.Resource(schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
})
// 先为该Client指定ns,然后调用 Client 的 Get() 方法,获取到该资源对象
unstructured, err := dynamicResourceClient.
Namespace("kube-system").
Get(context.TODO(), "coredns", metav1.GetOptions{})
if err != nil {
panic(err)
}
// 调用 runtime.DefaultUnstructuredConverter.FromUnstructured(),将 unstructured 反序列化成 Deployment 对象
deploy := &appsv1.Deployment{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.UnstructuredContent(), deploy)
if err != nil {
panic(err)
}
// 打印 deploy 名称和命名空间
fmt.Printf("deploy.Name: %s\ndeploy.namespace: %s\n", deploy.Name, deploy.Namespace)
}
还是先根据 kubeconfig 文件获取到 config,然后创建 dynamicclient,再根据 GVR 找到 dynamicResourceClient,然后调用方法获取到资源,再用
runtime.DefaultUnstructuredConverter.FromUnstructured()
进行通用结构到具体结构的转换。
3.4、DiscoveryClient
前面的3种Client:RESTClient、Clientset、DynamicClient,都是用来操作 kubernetes 资源的,目前还缺少一个用于检索 kuberentes资源的 Client。DiscoveryClient 就是这样一个 Client。用于检索 kuberentes集群中支持的 API 资源的相关信息,例如版本、组、资源类型等。
DiscoveryClient 提供了一组方法来查询和获取这些信息,以便在编写 Controller 或 Operator 时,能够动态地了解集群中可用的资源。
DiscoveryClient 结构体很简单,位于 staging/src/k8s.io/client-go/discovery/discovery_client.go
type DiscoveryClient struct {
restClient restclient.Interface
LegacyPrefix string
// Forces the client to request only "unaggregated" (legacy) discovery.
UseLegacyDiscovery bool
}
LegacyPrefix 字段表示旧版本资源的访问前缀,一般值都是/api。Kubernetes 1.16 以前,资源的访问前缀都是 /api,1.16及之后,全面改成 /apis,为了兼容旧资源,这里特意保存了一个常量字符串 /api
DiscoveryClient 实现了接口的所有方法,用于获取 API 资源的各种信息
type DiscoveryInterface interface {
RESTClient() restclient.Interface
ServerGroupsInterface
ServerResourcesInterface
ServerVersionInterface
OpenAPISchemaInterface
OpenAPIV3SchemaInterface
// Returns copy of current discovery client that will only
// receive the legacy discovery format, or pointer to current
// discovery client if it does not support legacy-only discovery.
WithLegacy() DiscoveryInterface
}
除了 DiscoveryClient
,还有一个实现了接口 DiscoveryInterface 的类 CachedDiscoveryClient
:
CachedDiscoveryClient 就是实现了缓存机制的 DiscoveryClient 的封装
CachedDiscoveryClient 在 DiscoveryClient 的基础上增加了一层缓存,用于缓存获取的资源信息,以减少对 API Server 的频繁请求。
在首次调用时,CachedDiscoveryClient 会从 API Server 获取资源信息,并将其缓存在本地。之后的调用会直接从缓存中获取资源信息,而不需要再次向 API Server 发送请求。
因为集群部署完成后,API 资源基本很少变化,所以缓存下来可以很好的提高请求效率。
kubectl 工具内部,
kubectl api-versions
命令其实就是使用了这个 CachedDiscoveryClient,所以多次执行kubectl api-versions
命令,其实只有第一次请求了 API Server,后续都是直接使用的本地缓存。
可以通过/client-go/discovery/discovery_client.go
中的func NewDiscoveryClientForConfig(c *restclient.Config) (*DiscoveryClient, error)
创建发现客户端:
func NewDiscoveryClientForConfig(c *restclient.Config) (*DiscoveryClient, error) {
config := *c
if err := setDiscoveryDefaults(&config); err != nil {
return nil, err
}
httpClient, err := restclient.HTTPClientFor(&config)
if err != nil {
return nil, err
}
return NewDiscoveryClientForConfigAndClient(&config, httpClient)
}
示例:
func main() {
// 1、先创建一个客户端配置config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err.Error())
}
// 2、使用 discovery.NewDiscoveryClientForConfig(),创建一个 DiscoveryClient 对象
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err.Error())
}
// 3、使用 DiscoveryClient.ServerGroupsAndResources(),获取所有资源列表
_, resourceLists, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
// 4、遍历资源列表,打印出资源组和资源名称
for _, resource := range resourceLists {
fmt.Printf("resource groupVersion: %s\n", resource.GroupVersion)
for _, resource := range resource.APIResources {
fmt.Printf("resource name: %s\n", resource.Name)
}
fmt.Println("--------------------------")
}
}
4、informer
从上面架构图中可以看出,client-go 中包含的组件主要有:Reflector、DeltaFIFO、Controller、Indexer、Processer。
这个架构图会迷惑我们,让我们觉得 client-go 只维护了这么一套组件,实现功能。实际上 client-go 并不是只有一套这种组件,而是以 informer 作为基本单位,每个 informer 有自己的一套组件。
每种GVR资源,都有自己对应的 informer,我们可以选择性的创建informer 并启动,进而仅从 apiserver ListAndWatch 自己需要的资源就可以了
每创建一种资源的 informer 对象,该对象就携带了自己需要的 Reflector、DeltaFIFO、Controller、Indexer、Processer,启动之后,该 informer 作为一个单独的协程启动,负责自己资源的 ListAndWatch、缓存、事件处理等。
使用示例:
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
klog.Fatalf("Failed to create config: %v", err)
}
// 初始化与apiserver通信的clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create client: %v", err)
}
// 初始化shared informer factory以及pod informer
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
podInformer := factory.Core().V1().Pods()
informer := podInformer.Informer()
// 注册informer的自定义ResourceEventHandler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: xxx,
UpdateFunc: xxx,
DeleteFunc: xxx,
})
// 启动shared informer factory,开始informer的list & watch操作
stopper := make(chan struct{})
go factory.Start(stopper)
// 等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中
// 或者调用factory.WaitForCacheSync(stopper)
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 创建lister
podLister := podInformer.Lister()
// 从informer中的indexer本地缓存中获取对象
podList, err := podLister.List(labels.Everything())
if err != nil {
fmt.Println(err)
}
}
SharedInformerFactory是什么,等会再说。只需要知道他是一个工厂,可以创建 informer
factory.Core().V1().Pods() 和 podInformer.Informer(),最终就是创建了一个 pod 资源的 informer,该 informer 会自动保存在 factory 中
factory.Start 就是把 factory 中已经创建的所有 informer,都启动起来,每个 informer 就是一个单独的协程,互不影响,各自进行 ListAndWatch
podInformer.Lister() 和 podLister.List() 就是从 pod 的这个 informer 中,获取缓存数据。可以看到获取缓存的时候,确定到了某一个具体的 informer
SharedInformerFactory
SharedInformerFactory 主要有两部分功能:
作为创建 informer 的工厂,用于创建所有类型的 informer
SharedInformerFactory 内部还会缓存所有已经创建过的 informer,下次再创建,就会直接使用缓存,不会再创建新的 informer,节省了资源
type sharedInformerFactory struct { // 这个client,是clientset类型的客户端,用于与apiserver交互 client kubernetes.Interface // 限制 当前SharedInformerFactory 所创建的 Informer 只关注指定命名空间中的资源变化 namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration // 缓存已经创建的全部informer informers map[reflect.Type]cache.SharedIndexInformer // 缓存已经启动的 informer,只存 类型:是否启动 startedInformers map[reflect.Type]bool }
factory.Core().V1().Pods()
返回的是 PodInformer
接口类型,位于 informers/core/v1/pod.go
中
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
informers包下,给每一种GVR,都提供了一个接口类型,用于表示这种 GVR 资源的informer。比如还有 NodeInformer、NamespaceInformer、SecretInformer等
不过这个 PodInformer 并不是我们直接使用的,我们最终使用的还是 cache.SharedIndexInformer 类型,所以 这些资源 xxxInformer 接口,都提供了一个 Informer() cache.SharedIndexInformer 方法,用于获取一个为该资源工作的 cache.SharedIndexInformer
因此,这些资源 xxxInformer 接口,其实就是对 cache.SharedIndexInformer 类型的封装。
client-go源码中,/client-go/informers 包下,就是 factory 结构+所有GVR资源的 informer 结构
5、client-go 开发自定义控制器
我们希望实现这个效果:
创建或更新 Service 的时候,如果这个 Service 的 Annotations 中,包含了"ingress/http:true",那么在创建或更新这个 Service 的时候,会自动为它创建一个 Ingress。
删除Service的时候,如果这个 Service 的 Annotations 中,包含了"ingress/http:true",那么同时也要删除它的 Ingress。
需求分析:
这个效果需要编写三个事件处理方法,addService、updateService、deleteIngress
addService/updateService:用户创建或更新 service 的时候,kubernetes的 ServiceController 已经完成 service 的创建或更新了。我们要做的是拿到已存在的 service 对象,看是否包含 “ingress/http:true”。如果包含,则保证有一个对应的 ingress;如果不包含,则保证不能有对应的ingress
deleteIngress:用于删除ingress的时候,也触发addService/updateService一样的逻辑,保证service和ingress的对应关系是正确的。
你可能会疑问,为什么没有 deleteService?
因为我们会使用 OwnerReferences 将 service+ingress 关联起来。因此删除service,会由 kubernetes 的 ControllerManager 中的特殊 Controller,自动完成 ingress 的 gc,所以删除service时我们无需特殊处理。
main函数如下:
package main
import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"log"
"share-code-operator-study/addingress/pkg"
)
func main() {
// 创建一个 集群客户端配置
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
inClusterConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatalln("can't get config")
}
config = inClusterConfig
}
// 创建一个 clientset 客户端,用于创建 informerFactory
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 创建一个 informerFactory
factory := informers.NewSharedInformerFactory(clientset, 0)
// 使用 informerFactory 创建Services资源的 informer对象
serviceInformer := factory.Core().V1().Services()
// 使用 informerFactory 创建Ingresses资源的 informer对象
ingressInformer := factory.Networking().V1().Ingresses()
// 创建一个自定义控制器
controller := pkg.NewController(clientset, serviceInformer, ingressInformer)
// 创建 停止channel信号
stopCh := make(chan struct{})
// 启动 informerFactory,会启动已经创建的 serviceInformer、ingressInformer
factory.Start(stopCh)
// 等待 所有informer 从 etcd 实现全量同步
factory.WaitForCacheSync(stopCh)
// 启动自定义控制器
controller.Run(stopCh)
}
controller如下:
package pkg
import (
"context"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informercorev1 "k8s.io/client-go/informers/core/v1"
informernetv1 "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes"
listercorev1 "k8s.io/client-go/listers/core/v1"
listernetv1 "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"reflect"
"time"
)
const (
// worker 数量
workNum = 5
// service 指定 ingress 的 annotation key
annoKey = "ingress/http"
// 调谐失败的最大重试次数
maxRetry = 10
)
// 自定义控制器
type controller struct {
client kubernetes.Interface
serviceLister listercorev1.ServiceLister
ingressLister listernetv1.IngressLister
queue workqueue.RateLimitingInterface
}
// NewController 创建一个自定义控制器
func NewController(clientset *kubernetes.Clientset, serviceInformer informercorev1.ServiceInformer, ingressInformer informernetv1.IngressInformer) *controller {
// 控制器中,包含一个clientset、service和ingress的缓存监听器、一个workqueue
c := controller{
client: clientset,
serviceLister: serviceInformer.Lister(),
ingressLister: ingressInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),
}
// 为 serviceInformer 添加 ResourceEventHandler
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// 添加service时触发
AddFunc: c.addService,
// 修改service时触发
UpdateFunc: c.updateService,
// 这里没有删除service的逻辑,因为我们会使用 OwnerReferences 将service+ingress关联起来。
// 因此删除service,会由kubernetes的ControllerManager中的特殊Controller,自动完成ingress的gc
})
// 为 ingressInformer 添加 ResourceEventHandler
ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// 删除ingress时触发
DeleteFunc: c.deleteIngress,
})
return &c
}
// 添加service时触发
func (c *controller) addService(obj interface{}) {
// 将 添加service 的 key 加入 workqueue
c.enqueue(obj)
}
// 修改service时触发
func (c *controller) updateService(oldObj interface{}, newObj interface{}) {
// 如果两个对象一致,就无需触发修改逻辑
if reflect.DeepEqual(oldObj, newObj) {
return
}
// todo 比较annotation
// 将 修改service 的 key 加入 workqueue
c.enqueue(newObj)
}
// 删除ingress时触发
func (c *controller) deleteIngress(obj interface{}) {
// 将对象转成ingress,并获取到它的 ownerReference
ingress := obj.(*netv1.Ingress)
ownerReference := metav1.GetControllerOf(ingress)
// 如果ingress的 ownerReference 没有绑定到service,则无需处理
if ownerReference == nil || ownerReference.Kind != "Service" {
return
}
// 如果ingress的 ownerReference 已经绑定到service,则需要处理
c.enqueue(obj)
}
// enqueue 将 待添加service 的 key 加入 workqueue
func (c *controller) enqueue(obj interface{}) {
// 调用工具方法,获取 kubernetes资源对象的 key(默认是 ns/name,或 name)
key, err := cache.MetaNamespaceKeyFunc(obj)
// 获取失败,不加入队列,即本次事件不予处理
if err != nil {
runtime.HandleError(err)
return
}
// 将 key 加入 workqueue
c.queue.Add(key)
}
// dequeue 将处理完成的 key 出队
func (c *controller) dequeue(item interface{}) {
c.queue.Done(item)
}
// Run 启动controller
func (c *controller) Run(stopCh chan struct{}) {
// 启动多个worker,同时对workqueue中的事件进行处理,效率提升5倍
for i := 0; i < workNum; i++ {
// 每个worker都是一个协程,使用同一个停止信号
go wait.Until(c.worker, time.Minute, stopCh)
}
// 启动完成后,Run函数就停止在这里,等待停止信号
<-stopCh
}
// worker方法
func (c *controller) worker() {
// 死循环,worker处理完一个,再去处理下一个
for c.processNextItem() {
}
}
// processNextItem 处理下一个
func (c *controller) processNextItem() bool {
// 从 workerqueue 取出一个key
item, shutdown := c.queue.Get()
// 如果已经收到停止信号了,则返回false,worker就会停止处理
if shutdown {
return false
}
// 处理完成后,将这个key出队
defer c.dequeue(item)
// 转成string类型的key
key := item.(string)
// 处理service逻辑的核心方法
err := c.syncService(key)
// 处理过程出错,进入错误统一处理逻辑
if err != nil {
c.handleError(key, err)
}
// 处理结束,返回true
return true
}
// handleError 错误统一处理逻辑
func (c *controller) handleError(key string, err error) {
// 如果当前key的处理次数,还不到最大重试次数,则再次加入队列
if c.queue.NumRequeues(key) < maxRetry {
c.queue.AddRateLimited(key)
return
}
// 运行时统一处理错误
runtime.HandleError(err)
// 不再处理这个key
c.queue.Forget(key)
}
// syncService 处理service逻辑的核心方法
func (c *controller) syncService(key string) error {
// 将 key 切割为 ns 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 从indexer中,获取service
service, err := c.serviceLister.Services(namespace).Get(name)
// 没有service,直接返回
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// 检查service的annotation,是否包含 key: "ingress/http"
_, ok := service.Annotations[annoKey]
// 从indexer缓存中,获取ingress
ingress, err := c.ingressLister.Ingresses(namespace).Get(name)
if ok && errors.IsNotFound(err) {
// ingress不存在,但是service有"ingress/http",需要创建ingress
// 创建ingress
ig := c.createIngress(service)
// 调用controller中的client,完成ingress的创建
_, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metav1.CreateOptions{})
if err != nil {
return err
}
} else if !ok && ingress != nil {
// ingress存在,但是service没有"ingress/http",需要删除ingress
// 调用controller中的client,完成ingress的删除
err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
return err
}
}
return nil
}
// createIngress 创建ingress
func (c *controller) createIngress(service *corev1.Service) *netv1.Ingress {
icn := "ingress"
pathType := netv1.PathTypePrefix
return &netv1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Namespace: service.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(service, corev1.SchemeGroupVersion.WithKind("Service")),
},
},
Spec: netv1.IngressSpec{
IngressClassName: &icn,
Rules: []netv1.IngressRule{
{
Host: "example.com",
IngressRuleValue: netv1.IngressRuleValue{
HTTP: &netv1.HTTPIngressRuleValue{
Paths: []netv1.HTTPIngressPath{
{
Path: "/",
PathType: &pathType,
Backend: netv1.IngressBackend{
Service: &netv1.IngressServiceBackend{
Name: service.Name,
Port: netv1.ServiceBackendPort{
Number: 80,
},
},
},
},
},
},
},
},
},
},
}
}
整体的流程就是,informer 观察到有对应资源发生变化后,就会触发之前注册好的 eventhandler,把这个资源对象放入队列 DeltaFIFO 中。在 worker 进程中会一直循环执行 processNextItem,也就是从 DeltaFIFO 中拿 key 出来,然后再去查 indexer 找到对应的具体资源,进入 sync 调谐逻辑,然后再出队。
一个 GVR 对应一个 informer,所以 informer 可以监听对应资源的事件,而不是所有资源的事件。
注意其中的 NewController()、Run()、worker()、processNextItem()、handleError()、syncService() 这些方法的命名和处理逻辑。
你会发现 kubernetes 源码中,内置的控制器和他们是一样的, 即 kubernetes内置控制器也是使用 client-go 的这种方式,实现控制器开发的。