DEV Community

Cover image for Como construir um operador HPA baseado na quantidade de mensagens na fila do RabbitMQ no Kubernetes
Ezequiel Lopes
Ezequiel Lopes

Posted on

Como construir um operador HPA baseado na quantidade de mensagens na fila do RabbitMQ no Kubernetes

Os operadores Kubernetes são extensões projetadas para gerenciar aplicações complexas e seus componentes no Kubernetes. Utilizando recursos personalizados, eles automatizam tarefas que normalmente seriam realizadas por operadores humanos, como implantação, configuração, escalonamento e recuperação de aplicações. Ao encapsular conhecimento específico de domínio, os operadores permitem que o Kubernetes gerencie aplicações de forma eficaz, algo que seria desafiador usando apenas as capacidades nativas do Kubernetes​ (Red Hat Developer)​​ (Redhat)​.

Custom Resources:

Custom Resources (CRs) no Kubernetes são extensões da API do Kubernetes que permitem aos usuários criar e gerenciar seus próprios tipos de recursos personalizados, além dos recursos padrão fornecidos pelo Kubernetes. Eles são utilizados para adicionar funcionalidades específicas que não estão disponíveis na instalação padrão do Kubernetes, permitindo a customização do cluster conforme as necessidades dos aplicativos. (CNCF)​ (sokube)

Custom Resource Definitions:

Custom Resource Definition (CRD) é um recurso que permite aos usuários definir recursos personalizados além dos tipos de recursos nativos fornecidos pelo Kubernetes, como Pods, Services e Deployments. Com CRDs, os usuários podem estender a funcionalidade do Kubernetes de acordo com suas necessidades específicas.

Role-Based Access Control:

Role-Based Access Control (RBAC) é um sistema de controle de acesso no Kubernetes que gerencia permissões de acesso a recursos do cluster. Ele permite atribuir permissões específicas a usuários e grupos, garantindo um controle granular sobre quem pode executar ações como criar, listar e atualizar Pods, Deployments e Services.​ (k8s)​.

Problema:

O Golang consome poucos recursos e quando você tem uma aplicação feita nele que consome uma fila do RabbitMQ, pode ser um pouco difícil escalar as réplicas com base no consumo de recursos como memória e CPU. Uma opção interessante seria manter a quantidade de réplicas pelo número de mensagens na fila.

Neste artigo, vamos construir um operador que monitora uma fila do RabbitMQ e, de acordo com a quantidade de mensagens na fila, altera a quantidade de réplicas no deployment. O objetivo deste artigo é criar um operador experimental apenas para explorar os conceitos desse tipo de recurso no Kubernetes.

Configuração Inicial

Para começar, precisamos definir um recurso personalizado que descreva a configuração desejada para o RabbitMQ e o comportamento de escalonamento. Isso envolve a criação de um CRD e um controlador que monitora o estado atual da fila e ajusta o número de réplicas conforme necessário.

Monitoramento da Fila

O operador deve ser capaz de consultar a fila do RabbitMQ para obter a quantidade de mensagens prontas para processamento. Utilizando uma chamada HTTP, podemos obter essas informações e usá-las para decidir se é necessário escalar o número de réplicas da aplicação.

Auto Scaling

Com base nas informações obtidas da fila, o operador ajustará automaticamente o número de réplicas do deployment do Kubernetes. Isso garante que a aplicação possa lidar com variações na carga de trabalho sem intervenção manual​​.

Pré-requisitos

Antes de começar, certifique-se que atende os requisitos:

  1. Instalar o Kubebuilder:

  2. Instalar o kubectl:

    • O kubectl é a ferramenta de linha de comando para interagir com o Kubernetes. Siga as instruções de instalação específicas para seu sistema operacional no site oficial do Kubernetes.
  3. Instalar e Configurar o RabbitMQ:

    • Instale o RabbitMQ e configure um usuário. Para este experimento, utilizaremos admin como usuário e admin como senha.
  4. Ter um Cluster Kubernetes:

    • Certifique-se de ter um cluster Kubernetes funcionando. Pode ser um cluster local (como Minikube) ou um cluster na nuvem.

Para iniciar o projeto utilizaremos o kubebuilder.

kubebuilder init --domain my.domain --repo github.com/infezek/rabbitmq-operator --owner "Seu nome"
kubebuilder create api --group apps --version v1 --kind RabbitHPA

make manifests
make generate # aceite com y para criar os modulos

make docker-build # para testar o build
Enter fullscreen mode Exit fullscreen mode

Para começar, vamos definir o Spec, que é a estrutura principal do manifesto do nosso operador. Aqui, podemos definir os requisitos de configuração, nome e tipo. Os arquivos de manifesto são tipicamente criados no formato .yaml devido à sua legibilidade facilitada pela indentação, embora sigam o mesmo princípio do json.

No Spec, vamos separar em dois tipos:

  1. Rabbit: Onde ficarão as propriedades da fila que será monitorada.
  2. HPA: Onde ficarão os requisitos de escalabilidade e o deployment.
// api/v1/rabbithpa_types.go
package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// 
type RabbitHPASpec struct {
    Rabbit RabbitSpec `json:"rabbit"`
    HPA    HPASpec    `json:"hpa"`
}
// Estrutura das Configurações do RabbitMQ
type RabbitSpec struct {
    Namespace string `json:"namespace"`
    Host      string `json:"host"`
    Username  string `json:"username"`
    Password  string `json:"password"`
    Queue     string `json:"queue"`
}
// Estrutura das Configurações do HPA
type HPASpec struct {
    Label                string `json:"label"`
    Namespace            string `json:"namespace"`
    ReplicasetPerMessage int32  `json:"replicasetPerMessage"`
    MinReplicas          int32  `json:"minReplicas"`
    MaxReplicas          int32  `json:"maxReplicas"`
}

type RabbitHPAStatus struct {
    CurrentReplicas int32 `json:"currentReplicas"`
}

type RabbitHPA struct {
    metav1.TypeMeta        `json:",inline"`
    metav1.ObjectMeta      `json:"metadata,omitempty"`
    Spec   RabbitHPASpec   `json:"spec,omitempty"`
    Status RabbitHPAStatus `json:"status,omitempty"`
}

type RabbitHPAList struct {
    metav1.TypeMeta             `json:",inline"`
    metav1.ListMeta             `json:"metadata,omitempty"`
    Items           []RabbitHPA `json:"items"`
}

func init() {
    SchemeBuilder.Register(&RabbitHPA{}, &RabbitHPAList{})
}
Enter fullscreen mode Exit fullscreen mode

O método CalculateReplicas será responsável pela regra de réplicas que deverá ser aplicada no deployment. Sendo um método do RabbitHPA, podemos chamar essa função passando a quantidade de mensagens, e ele devolverá a quantidade de réplicas que deve ter.

// api/v1/rabbit_hpa.go
package v1

func (r *RabbitHPA) CalculateReplicas(currentMessage int32) (target int32) {
    target = currentMessage / r.Spec.HPA.ReplicasetPerMessage
    if target > r.Spec.HPA.MaxReplicas {
        return r.Spec.HPA.MaxReplicas
    }
    if target < r.Spec.HPA.MinReplicas {
        return r.Spec.HPA.MinReplicas
    }
    return target
}
Enter fullscreen mode Exit fullscreen mode

Implementando o Reconcile no Controlador

O método Reconcile é responsável pela lógica do operador, onde podemos verificar a nossa estratégia e atualizar os componentes conforme necessário. Para o nosso projeto, podemos definir a lógica em quatro partes principais:

  1. Verificar os requisitos do Spec: Obter e validar as especificações definidas pelo usuário.
  2. Consultar o RabbitMQ: Obter o número de mensagens na fila do RabbitMQ.
  3. Calcular o número de réplicas que deve ter: Determinar a quantidade ideal de réplicas com base nas mensagens na fila.
  4. Atualizar o Deployment: Se o número atual de réplicas for diferente do esperado, atualizar o deployment para refletir a quantidade correta de réplicas.
// internal/controller/rabbithpa_controller.go
package controller

import (
    "context"
    "time"
    apiv1 "github.com/infezek/rabbitmq-operator/api/v1"
    "github.com/sirupsen/logrus"
    apps "k8s.io/api/apps/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/types"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
)

type RabbitHPAReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

func (r *RabbitHPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := logrus.WithField("Type", "[RECONCILE]")
    log.Infof("Starting")
    defer log.Infof("Finished")
    // Buscar as especificações do RabbitHPA
    request := &apiv1.RabbitHPA{}
    if err := r.Get(ctx, req.NamespacedName, request); err != nil {
        if errors.IsNotFound(err) {
            log.Info("RabbitHPA resource not found. Ignoring since object must be deleted")
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }
    // Realizar a busca do deployment
    deployment := &apps.Deployment{}
    err := r.Get(ctx, types.NamespacedName{
        Namespace: request.Spec.HPA.Namespace,
        Name:      request.Spec.HPA.Label,
    }, deployment)
    if err != nil {
        log.Error(err.Error())
        return ctrl.Result{}, err
    }
    rabbitProps := request.Spec.Rabbit
    // Buscar as propriedades da fila
    gateway := NewRabbitMQGateway(rabbitProps.Host, rabbitProps.Namespace, rabbitProps.Username, rabbitProps.Password)
    queueProps, err := gateway.RequestQuery(rabbitProps.Queue)
    if err != nil {
        log.Errorf(err.Error())
        return ctrl.Result{}, err
    }
    // Calcular a quantidade de réplicas que precisa ter
    target := request.CalculateReplicas(queueProps.MessagesReady)
    log.WithFields(logrus.Fields{
        "current": *deployment.Spec.Replicas,
        "target":  target,
    }).Infof("Current ReplicaSet Deployment")
    if *deployment.Spec.Replicas != target {
        log.Info("Updating deployment replicas")
        // Atualizar o deployment
        deployment.Spec.Replicas = &target
        err = r.Update(ctx, deployment)
        if err != nil {
            log.Errorf(err.Error())
            return ctrl.Result{}, err
        }
    }
    // Tempo para uma nova reconciliação
    return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

func (r *RabbitHPAReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appsv1.RabbitHPA{}).
        Complete(r)
}
Enter fullscreen mode Exit fullscreen mode

Consultar da fila

É possível consultar detalhes da fila do RabbitMQ por meio de uma chamada HTTP. Basta realizar a chamada conforme o exemplo abaixo. Apenas um detalhe: se estiver dentro do Kubernetes e em outro namespace, é necessário especificar o namespace na URL.

# my é o nome da nossa fila
curl -u admin:admin -s -w "\n%{http_code}\n"  GET http://localhost:15672/api/queues/%2F/my
Enter fullscreen mode Exit fullscreen mode

Esta chamada retornará várias informações relevantes sobre a fila, sendo de especial interesse para o nosso operador o campo messages_ready, que indica a quantidade de mensagens prontas para leitura na fila.

{
...
    "messages": "20"
    "messages_ready": "15"
...
}
Enter fullscreen mode Exit fullscreen mode

A função RequestQuery será responsável por consultar os detalhes da fila atual no RabbitMQ com Golang. Vamos criar a URL para a chamada, passar as credenciais de autenticação no cabeçalho e decodificar a resposta. A função também inclui vários logs para acompanhar e entender o que está acontecendo.

// internal/controller/gateway.go
package controller

import (
    "encoding/base64"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "github.com/sirupsen/logrus"
)

type RabbitMQGateway struct {
    host      string
    namespace string
    username  string
    password  string
}

type RabbitOutput struct {
    Messages      int32 `json:"messages"`
    MessagesReady int32 `json:"messages_ready"`
}

func NewRabbitMQGateway(host, namespace, user, pass string) *RabbitMQGateway {
    return &RabbitMQGateway{host, namespace, user, pass}
}

func (r *RabbitMQGateway) RequestQuery(queue string) (*RabbitOutput, error) {
    log := logrus.WithField("Type", "[RABBITMQ_GATEWAY]")
    log.Info("Requesting RabbitMQ")
    defer log.Info("Finished")
    url := fmt.Sprintf("http://%s.%s:15672/api/queues/%s/%s", r.host, r.namespace, "%2f", queue)
    req, err := http.NewRequest(http.MethodGet, url, nil)
    if err != nil {
        log.WithFields(logrus.Fields{"error": err}).Error("Failed to create request")
        return nil, err
    }
    auth := fmt.Sprintf("%s:%s", r.username, r.password)
    authHeader := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
    req.Header.Add("Authorization", authHeader)
    client := http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        log.WithFields(logrus.Fields{"error": err}).Error("Failed to request RabbitMQ")
        return nil, err
    }
    defer resp.Body.Close()
    log.Infof("Status code: %d", resp.StatusCode)
    if resp.StatusCode == http.StatusUnauthorized {
        return nil, fmt.Errorf("unauthorized")
    }
    if resp.StatusCode == http.StatusNotFound {
        return nil, fmt.Errorf("not found queue")
    }
    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
    }
    responseBty, err := io.ReadAll(resp.Body)
    if err != nil {
        log.WithFields(logrus.Fields{"error": err}).Error("Failed to read response")
        return nil, err
    }
    output := &RabbitOutput{}
    if err := json.Unmarshal(responseBty, output); err != nil {
        return nil, err
    }
    return output, nil
}
Enter fullscreen mode Exit fullscreen mode

Agora vamos realizar o deploy do operador no Kubernetes. Para isso, o próprio Kubebuilder já fornece uma maneira muito simples de buildar e realizar o deploy com o Makefile. Basta rodar o comando:

# IMG=usuario do docker hub ou do Register Container / nome da imagem
make docker-build docker-push IMG=infectiionz/rabbitmq-operator:beta1
make deploy IMG=infectiionz/rabbitmq-operator:beta1
Enter fullscreen mode Exit fullscreen mode

Aplicando o Manifesto do Operador

Com o manifesto do operador definido, podemos aplicá-lo ao cluster Kubernetes. Em seguida, vamos definir o deployment que será modificado pelo nosso operador.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ubuntu-deployment
  namespace: ubuntu
spec:
  replicas: 1
  selector:
    matchLabels:
      app: ubuntu
  template:
    metadata:
      labels:
        app: ubuntu
    spec:
      containers:
        - name: ubuntu
          image: ubuntu:latest
          command: ["/bin/bash", "-c", "--"]
          args: ["while true; do sleep 30; done;"]
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f deployment.yaml
Enter fullscreen mode Exit fullscreen mode

Este é o manifesto do nosso operador. Conforme explicado anteriormente, ele contém as especificações sobre nosso RabbitMQ e o HPA.

apiVersion: apps.my.domain/v1
kind: RabbitHPA
metadata:
  name: rabbitmq-hpa
  namespace: default
spec:
  rabbit:
    namespace: "rabbitmq"
    host: "rabbitmq"
    username: "admin"
    password: "admin"
    queue: "my"
  hpa:
    label: "ubuntu-deployment"
    namespace: "ubuntu"
    replicasetPerMessage: 2
    minReplicas: 1
    maxReplicas: 5
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f rabbit-operator.yaml
Enter fullscreen mode Exit fullscreen mode

Podemos verificar os logs do operador.

# Verifique o nome correto do seu operador e o seu namespace e altere no comando abaixo
kubectl logs rabbitmq-operator-controller-manager-767f8947-xptj6 -n rabbitmq-operator-system --tail=30
Enter fullscreen mode Exit fullscreen mode

Teremos um erro parecido com este aqui.

E0609 1 reflector.go:150] pkg/mod/k8s.io/client-go@v0.30.0/tools/cache/reflector.go:232: Failed to watch *v1.Deployment: failed to list *v1.Deployment: deployments.apps is forbidden: User "system:serviceaccount:rabbitmq-operator-system:rabbitmq-operator-controller-manager" cannot list resource "deployments" in API group "apps" at the cluster scope

Enter fullscreen mode Exit fullscreen mode

Esse erro está relacionado à falta de permissão para que o operador execute determinadas tarefas no cluster, especialmente no deployment. Para solucionar este problema, é necessário criar um ClusterRole com as permissões necessárias e um ClusterRoleBinding para atribuir essa role ao nosso operador.

Criando um ClusterRole e ClusterRoleBinding

Um ClusterRole define permissões em nível de cluster. Aqui está um exemplo de um ClusterRole que concede permissões para listar, obter e atualizar deployments:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: rabbit-hpa-manager
rules:
  - apiGroups: ["apps"]
    resources: ["deployments"]
    verbs: ["list", "watch", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: rabbit-hpa-manager-binding
subjects:
  - kind: ServiceAccount
    name: rabbitmq-operator-controller-manager
    namespace: rabbitmq-operator-system
roleRef:
  kind: ClusterRole
  name: rabbit-hpa-manager
  apiGroup: rbac.authorization.k8s.io
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f role.yaml
Enter fullscreen mode Exit fullscreen mode

Agora que nosso operador tem as permissões para executar a tarefa, basta apagar o operador para recriá-lo com as permissões corretas.

kubectl delete pod rabbitmq-operator-controller-manager-767f8947-xptj6 -n rabbitmq-operator-system 
Enter fullscreen mode Exit fullscreen mode

Agora que o operador está configurado, podemos testar e verificar seu funcionamento, observando a criação e terminação dos pods de acordo com o número de mensagens prontas na fila do RabbitMQ. Lembre-se das condições definidas: mínimo de um pod, máximo de cinco pods, e a cada duas mensagens, um novo pod será criado.

watch -n 2 "kubectl get pods -A | grep ubuntu"
Enter fullscreen mode Exit fullscreen mode

Com 8 mensagens na fila

Fila do rabbitmq

Na próxima vez que nosso operador rodar e verificar que a quantidade de réplicas desejada não está correta, ele atualizará o deployment para um novo número de réplicas. Dessa forma, o Kubernetes criará ou removerá os pods conforme necessário.

operador sendo executado

A aplicação possui vários logs para facilitar o entendimento do que está acontecendo. Portanto, basta verificar os logs do operador para acompanhar seu funcionamento e diagnosticar problemas.

Conclusão

Os operadores Kubernetes simplificam a gestão de aplicações complexas, automatizando tarefas críticas e permitindo um gerenciamento eficiente. Ao integrar o RabbitMQ com um operador personalizado, podemos explorar novas maneiras de escalar aplicações de forma inteligente e responsiva, aumentando a eficiência e a confiabilidade do sistema.

Este artigo tem como objetivo demonstrar um pouco do poder dos operadores, abordando conceitos teóricos e criando um projeto prático para um problema específico que podemos ter. Com essa base, será possível criar um operador específico para o seu problema.

Linkedin
GitHub

Top comments (0)