DEV Community

Cover image for Watch and react to Kubernetes objects changes
Luca Sepe
Luca Sepe

Posted on

Watch and react to Kubernetes objects changes

The Kubernetes API server exposes a REST interface consumable by any client.

client-go is the official client library for the Go programming language. It is used both internally by Kubernetes itself (for example, inside kubectl) as well as by numerous external consumers: operators, higher level frameworks and many more.

Using this library you can write Go applications to access kubernetes' API Server and you can programmatically add, delete, modify, and check kubernetes resources.

client-go introduces different types of clients such as: RESTClient, Clientset and dynamic.Interface. All these clients make available the Watch verb, which offers an event interface that reacts to objects changes: add, update, delete, etc.

In all cases, the returned object is an implementation of watch.Interface that looks like this:

watch.Interface

Let's implement a watcher for namespaces changes. The application will do these things:

  • attempts to begin watching the namespaces resource getting a watch.Interface on success
  • iterates all the events produced by the watcher
  • when an event is of type "namespace added" adds a custom label patching the namespace
  • when en event is of type "namespace deleted" greet the namespace gone

In this article I'll show you how to use RESTClient to watch and then react to namespaces changes.

Here the source code - you can grab all the code @ https://github.com/lucasepe/using-client-go.

package main


import (
  "context"
  "fmt"
  "time"


  corev1 "k8s.io/api/core/v1"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  "k8s.io/apimachinery/pkg/types"
  "k8s.io/apimachinery/pkg/watch"
  "k8s.io/client-go/kubernetes/scheme"
  "k8s.io/client-go/rest"
  "k8s.io/client-go/tools/clientcmd"
)

func main() {
  // Using the default configuration rules get the info 
  // to connect to the Kubernetes cluster
  configLoader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
    clientcmd.NewDefaultClientConfigLoadingRules(),
    &clientcmd.ConfigOverrides{},
  )

  // create the Config object
  cfg, err := configLoader.ClientConfig()
  if err != nil {
    panic(err)
  }

  // we want to use the core API (namespaces lives here)
  cfg.APIPath = "/api"
  cfg.GroupVersion = &corev1.SchemeGroupVersion
  cfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

  // create a RESTClient
  rc, err := rest.RESTClientFor(cfg)
  if err != nil {
    panic(err.Error())
  }

  // utility function to create a int64 pointer
  i64Ptr := func(i int64) *int64 { return &i }

  opts := metav1.ListOptions{
    TimeoutSeconds: i64Ptr(120),
    Watch:          true,
  }

  // attempts to begin watching the namespaces
  // returns a `watch.Interface`, or an error
  watcher, err := rc.Get().Resource("namespaces").
    VersionedParams(&opts, scheme.ParameterCodec).
    Timeout(time.Duration(*opts.TimeoutSeconds)).
    Watch(context.TODO())
  if err != nil {
    panic(err)
  }

  // the patch data, just add a custom label
  pd := []byte(`{"metadata":{"labels":{"modified-by":"lucasepe"}}}`)

  // the patch type
  pt := types.MergePatchType

  // who did this patch?
  po := metav1.PatchOptions{
    FieldManager: "my-cool-app",
  }

  // here we iterate all the events streamed by the watch.Interface
  for event := range watcher.ResultChan() {
    // retrieve the Namespace
    item := event.Object.(*corev1.Namespace)

    switch event.Type {
    // when a namespace is deleted...
    case watch.Deleted:
      // let's say hello!
      fmt.Printf("- '%s' %v ...bye bye\n", item.GetName(), event.Type)

    // when a namespace is added...
    case watch.Added:
      fmt.Printf("+ '%s' %v  ", item.GetName(), event.Type)

      // try to patch it!
      err = rc.Patch(pt).Resource("namespaces").
        Name(item.Name).
        VersionedParams(&po, scheme.ParameterCodec).
        Body(pd).
        Do(context.TODO()).
        Error()
      if err != nil {
        panic(err)
      }

      fmt.Println(" ...patched!")
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

To see what the program does, open two terminal windows.

In the first terminal run the code as usual:

$ go run main.go
Enter fullscreen mode Exit fullscreen mode

In the second terminal create a namespace using kubectl:

$ kubectl create namespace demo-system
Enter fullscreen mode Exit fullscreen mode

In the first terminal you should see a new line like this:

+ 'demo-system' ADDED   ...patched!
Enter fullscreen mode Exit fullscreen mode

Going back to the second terminal and typing:

$ kubectl describe namespace demo-system

Name:         demo-syste
Labels:       kubernetes.io/metadata.name=demo-system
              modified-by=lucasepe
Annotations:  <none>
Status:       Active


No resource quota.


No LimitRange resource
Enter fullscreen mode Exit fullscreen mode

You can see that the program added a new label to to the newly created namespace.

Now deleting the namespace with:

$ kubectl delete namespace demo-system
Enter fullscreen mode Exit fullscreen mode

Going to the program terminal, you should see a new line:

- 'demo-system' DELETED ...bye bye
Enter fullscreen mode Exit fullscreen mode

Using the watch.Interface interface directly is actually discouraged.

For instance, since the server will close watch connections regularly, the ResultChan channel can be closed at any time (due to an io.EOF error).

There are helpers to re-establishing a watch at the last-received resourceVersion. RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) it will get restarted from the last point without the consumer even knowing about it.

But what if API server lost events because etcd erased all resource versions?

In order to be resilient to etcd cache not having the resource version anymore - you would need to use Informers.

If you want to discover much more about how to use the client-go library to do all kind of interaction with Kubernetes up to the step-by-step instruction about how to create a custom controller (operator) to manage your custom resource, you could eventually buy my notebook "Using client-go" here: https://leanpub.com/using-client-go - but take a peek first - download a free notebook excerpt with the TOC and some random page.

Thank you for your time!

All the best,

Luca

Top comments (0)