A foto é do Loik Marras no Unsplash
Olá! Se você trabalha com pipelines de dados ou Jobs que precisam de uma resiliência maior do que um simples agendador de tarefas, como o cron
, o Airflow pode ser uma excelente solução! Ele foi desenvolvido pelo time de dados do Airbnb, mas hoje pertence à fundação de software Apache. E claro, é um software de código aberto, mas você ainda vai precisar se preocupar com a infraestrutura pra mantê-lo de pé. Se você tem interesse em saber como subir uma instância de Airflow no Kubernetes, acho que esse post pode ser bem útil para você.
O que vamos fazer aqui?
Vou tentar trazer aqui um passo-a-passo super breve pra você conseguir fazer o deploy do Airflow na sua instância de Kubernetes com a ajuda do Helm. Não sabe do que se trata? Vou riscar a superfície sobre eles no próximo tópico, só pra que você não fique tãão perdido(a). Mas, como em muitas coisas relacionadas à tecnologia, faça seu dever de casa, procure outras postagens, faça cursos e se aprofunde. E claro, se alguma dúvida surgir, pode comentar aqui que eu tento ajudar com o que puder 😄
O Airflow pode funcionar em uma máquina virtual no seu provedor de nuvem ou mesmo na sua máquina local, como mostra esse tutorial do próprio time do Airflow. O que vamos utilizar nesse post é o Kubernetes, por se tratar de uma infraestrutura computacional bastante resiliente, performática e escalável.
Kubernetes
Em um resumo MEGA resumido, o Kubernetes é um serviço de gerenciamento de containers para garantir que sua aplicação sempre esteja de pé. Ele se baseia na sua declaração de como sua aplicação deve funcionar. Para isso, você faz uso de arquivos YAML, que vão dizer exatamente como sua aplicação fica de pé e como ela escala, e o serviço do Kubernetes garante que essas declarações irão se manter ao longo do tempo. Se no meio do caminho um container desliga, o Kubernetes provisiona um novo container. Se existirem muitos usuários, o Kubernetes consegue escalar o número de nós do seu cluster de máquinas, para que sua aplicação se mantenha sempre estável. E aí quando o número de usuários diminuir, ele também desliga as máquinas que não forem necessárias. Doido, né?
E o Helm? Pra que serve?
Bom, trabalhar com Kubernetes tem uma curva de aprendizado bastante inclinada. E você também pode demorar um pouco até que tudo funcione como você deseja, afinal, dependendo da complexidade da sua aplicação, o número de arquivos de configuração pode crescer consideravelmente. Pra tentar tornar esse processo mais amigável e fácil de manter, o projeto do Helm surgiu. Ele tenta templatear a maior parte "estática" das suas declarações, deixando as partes móveis em um único arquivo, comumente chamado values.yaml
. E aí só com ele, você consegue declarar tudo o que você precisa pra subir sua aplicação.
Falar só sobre Airflow dá uma postagem bastante extensa. Mas vamos repassar o básico só pra gente se situar. Se você quiser, recomendo bastante que leia as documentações no site oficial.
Como eu comentei ali em cima, o Airflow é um serviço que vai te ajudar a "programaticamente criar, orquestrar e monitorar fluxos de trabalho". Para isso, você define sua sequência de tarefas com arquivos Python, que o Airflow irá entender e construir DAGs através deles.
É bem importante frisar que o papel do Airflow deve ser majoritariamente chamar outros serviços para realizar a computação pesada, porque é aí que ele brilha. A execução e paralelização de atividades em si vai ser mais bem executada quando feita por um cluster de Spark ou de PrestoSQL, por exemplo.
Web UI
A interface web é um componente que te permite visualizar todas as suas DAGs, o tempo de duração médio delas, a quantidade de falhas, o log de cada atividade. Com ela você também consegue ativar e pausar as suas DAGs, o que irá ser identificado pelo Scheduler.
Scheduler
O Scheduler (orquestrador) é o coração do Airflow, pois ele é quem irá identificar comandos da Web UI, como quando você ativa uma DAG manualmente, e também definições de periodicidade para ativar os seus pipelines. Mas quem irá realmente executar o comando que chama outras aplicações são os Workers.
Worker
O worker é o componente que de fato irá executar a tarefa. Se por exemplo você possui uma tarefa que vai chamar um executor externo, o comando que ativa isso é dado pelo worker.
Banco de dados
O Airflow precisa armazenar informações, como credenciais de usuários, strings de conexão e também logs de DAGs e isso tudo fica armazenado em um banco de dados relacional. Por padrão se utiliza o Postgres, mas é possível conectar a um banco MySQL caso necessário.
Depois dessa introdução talvez longa demais, vamos pro que interessa!
Requisitos necessários
Pra não ficar tão extenso por aqui, vou assumir aqui que você já tenha instalado e configurado na sua máquina:
- kubectl
- helm
Se estiver trabalhando com o MacOS, basta instalar ambas as ferramentas com o homebrew! 🍺
Também vou partir do princípio que você possui acesso a um cluster de Kubernetes no seu provedor de nuvem. Caso você esteja estudando e queira ter a experiência do Kubernetes na máquina local, dê uma olhada no Kind.
Deploy inicial
Para esse deploy, iremos utilizar o Helm chart oficial do Airflow. Para fazer a instalação padrão, basta você adicionar o repositório ao seu helm local com:
$ helm repo add apache-airflow https://airflow.apache.org
$ helm repo update
Então, crie um namespace que seja declarativo. Iremos utilizar aqui airflow-dev
, para representar que essa é a instância de airflow em desenvolvimento. Defina também um nome para sua release. Normalmente se chama de airflow
mesmo. Rode então os seguinte comandos:
$ kubectl create namespace airflow-dev
$ helm install airflow apache-airflow/airflow --namespace airflow-dev
Boa! 🤩 Com isso, você acaba de fazer a instalação padrão da sua primeira instância de Airflow. Mas em cenários reais, quase nunca essa instalação vai atender suas necessidades.
Customizando o chart do Helm
Como comentei, precisamos então acessar o arquivo values.yaml
para que façamos as nossas alterações. Gosto bastante de começar criando um arquivo vazio, com esse mesmo nome, e ir customizando aos poucos, de acordo com o arquivo completo do repositório.
Vamos lá? Crie um arquivo vazio e comece a editá-lo com seu editor de texto preferido. Aqui eu to usando o VS Code, mas não importa muito.
$ mkdir airflow-deploy && touch airflow-deploy/values.yaml
$ code airflow-deploy
Definindo uma Fernet Key
Essa chave é super simples de gerar e, caso você não a defina, o Airflow irá definir uma por você. Ela serve para criptografar as informações que você armazenará no Banco de Dados, como chaves de acesso a outros serviços, então é bem importante utilizá-la. Vale lembrar que é importante ter esse valor no momento de instalação, e não de atualização da sua release. Então caso queira customizá-la, basta rodar o seguinte:
$ python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)"
Aí você copia o valor que aparecer no seu bash e coloca no values.yaml
como:
fernetKey: <sua-chave-hasheada>
Banco de dados externo
Se você verificar, o deploy que você fez com o Helm ali em cima acabou de subir um Postgres no seu Kubernetes. Mas nem sempre é o desejável, principalmente se você já possui um banco de dados de pé para outras aplicações. Pra conectar com esse outro banco de dados, você vai precisar:
- Desabilitar o Postgresql padrão
- Criptografar sua string de conexão
- Criar um secret no Kubernetes com a sua string de conexão
Para criptografar uma string, você vai executar esse comando no seu terminal:
$ echo -n 'sua-string-aqui' | base64
Para desencriptar e conferir, adicione o argumento -d
após base64
.
O secret pode então ser criado se você criar um arquivo com o nome airflow-db-secret.yaml
e preencher com sua string criptografada no arquivo abaixo. Perceba que colocamos ali o namespace que você usou para seu deployment. Isso é importante para que a sua aplicação enxergue esse secret depois.
apiVersion: v1
kind: Secret
metadata:
name: airflow-secret
namespace: airflow-dev
type: Opaque
data:
connection: <sua-connection-string-criptografada>
Aí você sobe esse secret pro seu Kubernetes com o seguinte comando:
$ kubectl apply -f airflow-db-secret.yaml
Existe uma alternativa com o
kubectl
que você consegue criar um segredo e não precisa criptografar essa string na mão. Sabe qual? 🤔
Seu values.yaml
vai ficar assim, por enquanto:
postgresql:
enabled: false
data:
metadataSecretName: airflow-secret
Sincronizando suas DAGs
Agora que você já setou a conexão com o banco, precisamos pensar em como colocar suas DAGs (arquivos Python) pra dentro do Airflow. Uma opção bem fácil seria copiar os arquivos pra dentro da imagem de Docker antes de realizar o deploy. Mas isso não é uma boa prática, pois, a cada modificação ou mesmo nova DAG que seu time fizer, você vai precisar fazer uma atualização do seu deploy. E aí isso pode afetar algum pipeline que esteja rodando ou demorar um pouco mais pra acontecer. Ao invés disso, vamos usar um serviço bastante conveniente, chamado git-sync.
Ele funciona como um container extra, que fica presente em todos os Pods do Airflow. E o que ele faz é sincronizar com uma periodicidade que você definir com o seu repositório remoto. Aí todas as suas DAGs ficam alocadas no Github/Gitlab e você pode fazer o controle do que é sincronizado com um processo de Gitflow, por exemplo.
Pra isso, você precisará colocar os seguintes valores no seu values.yaml
:
dags:
persistence:
enabled: false
gitSync:
enabled: true
repo: git@github.com:<sua-organizacao>/<seu-repo>.git
branch: main
rev: HEAD
depth: 1
subPath: "src/dags"
sshKeySecret: airflow-ssh-key
O primeiro argumento irá indicar que você não vai alocar suas DAGs em um Storage padrão, e na sequência já aparecem os argumentos do git-sync. Veja que você pode definir uma branch e até mesmo uma pasta dentro do seu repositório que você deseja que o Airflow enxergue as DAGs, definida no argumento subPath
. Por último ali eu defini a minha chave SSH como um segredo do Kubernetes.
Você pode fazer isso com o seguinte comando:
$ kubectl create secret generic airflow-ssh-key --from-file=id_rsa=/path/das/chaves-ssh-locais -n airflow-dev
Customizando a imagem base
Se você estiver com as DAGs sendo sincronizadas via git-sync, customizar a imagem base pode não ser estritamente necessário. No entanto, caso você esteja trabalhando com algum provider, você vai precisar editar sua imagem.
Crie uma Dockerfile e estenda com o que você precisar. Eu vou instalar o pacote extra do Databricks aqui, pois é bastante relevante para meus pipelines de dados.
FROM apache/airflow:2.1.0
RUN pip install --no-cache-dir apache-airflow-providers-databricks==1.0.1
Beleza, aí você vai precisar buildar essa imagem e subir no seu repositório de preferência. Caso queria saber exatamente como fazer isso, confira as documentações do Docker Hub. Seu values.yaml
vai ficar assim:
images:
airflow:
repository: <sua-organizacao>/<sua-imagem>
tag: latest
registry:
secretName: airflow-registry
Aqui eu também criei um secret para armazenar as minhas credenciais de login no meu repositório privado. Isso é bem importante caso esteja colocando customizações que não podem ser expostas. Pra isso, basta executar o seguinte comando:
$ kubectl create secret -n airflow-dev docker-registry airflow-registry --docker-server=<seu-servidor-docker> --docker-username=<seu-usuario> --docker-password=<sua-senha> --docker-email=<seu-email>
Exportando logs
Essa é uma das principais customizações, porque pela arquitetura padrão do Airflow no Kubernetes, as tarefas são executadas pelos Pods com os Workers. Esses Pods sobem quando a tarefa é demandada pelo Scheduler, executam e desligam. Só que ao desligar, os logs dessa execução são apagados junto do container.
Existem duas soluções principais pra isso: criar um Persistent Volume no Kubernetes e exportar os logs pra lá ou conectar-se a um Storage de nuvem, como por exemplo o s3. Nesse caso, vou mostrar como eu fiz para me conectar ao Blob Storage da Azure, que foi um pouco mais chato (de achar referência ou documentação, mas não de configurar) do que teria sido com o s3 ou GCS.
Basta você incluir o seguinte no seu values.yaml
:
config:
core:
logging_level: "DEBUG"
fab_logging_level: "DEBUG"
logging:
remote_logging: true
remote_base_log_folder: "wasb-airflow-logs"
remote_log_conn_id: "wasb_default"
Com isso, você está setando o nível de log das tarefas para "DEBUG". Você diz que os logs irão para uma pasta chamada wasb-airflow-logs
. É bem importante que o nome da sua pasta comece com wasb-
, caso contrário os templates não funcionarão (pelo menos não funcionaram pra mim).
Além disso, você precisa dizer o id da conexão que você fará com o Blob. No meu caso, chamei de wasb_default
. Vou colocar um tópico para como setar essa conexão depois de subir as customizações ali em baixo, tá?
Agora quando você executar uma DAG, os logs das tarefas devem cair dentro do Blob, nessa localização. E você também consegue acessar diretamente da UI do Airflow, navegando pelas DAGs.
Exportando métricas para o Datadog
As documentações do Datadog no dia dessa postagem não contemplam exemplos com o Helm chart oficial do Airflow. E foi por essa dificuldade que resolvi escrever. É, como muita coisa na vida, ridiculamente simples. Mas por algum motivo as documentações do Datadog apontam pra um chart mantido pela comunidade e usam annotations ao invés do que o que é necessário. Estranho.
Enfim, temos aqui no nosso cluster a nossa instância de Datadog configurada com DaemonSets, que irão sempre ter Pods atrelados a todos os nós, para pegar métricas dos containers e Pods que subirem e monitorar o nosso cluster como um todo. No entanto, você consegue também exportar métricas específicas do Airflow para esse serviço através do StatsD. Pra não me alongar muito, o que você vai precisar fazer no values.yaml
é o seguinte:
statsd:
enabled: true
config:
metrics:
statsd_on: true
statsd_port: 8125
extraEnv: |
- name: AIRFLOW__METRICS__STATSD_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
A definição desses valores irá:
- Criar um pod para onde irão ser direcionadas as métricas do Airflow
- Mudar a porta padrão do StatsD para a 8125 (requerida pelo Datadog por padrão)
- Apontar o host do StatsD do Airflow para o nó do Kubernetes onde ele está instalado
- Apontar para o mesmo host onde está o Agent do Datadog
Parece muita coisa, mas são apenas essas as configurações que eu precisei pra fazer funcionar a conexão na minha instalação. Aí com isso posso criar um belíssimo dashboard para acompanhar a saúde da minha aplicação. Legal demais 😃
Outros
O que eu fiz foi não subir um componente de mensageria (Redis), que aumentaria bastante a complexidade da instalação nesse primeiro momento. Por isso, também desabilitei o Flower UI, que é uma interface gráfica para acompanhar esse serviço. Vamos ver então como ficou o arquivo values.yaml
final?
# Fernet Key
fernetKey: <sua-chave-hasheada>
# Imagem customizada
images:
airflow:
repository: <sua-organizacao>/<sua-imagem>
tag: latest
registry:
secretName: airflow-registry
# Executor de Kubernetes
executor: "KubernetesExecutor"
# StatsD
statsd:
enabled: true
extraEnv: |
- name: AIRFLOW__METRICS__STATSD_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
# Redis e Flower
redis:
enabled: false
flower:
enabled: false
# Banco de Dados
postgresql:
enabled: false
data:
metadataSecretName: airflow-secret
# Git Sync
dags:
persistence:
enabled: false
gitSync:
enabled: true
repo: git@github.com:<sua-organizacao>/<seu-repo>.git
branch: main
rev: HEAD
depth: 1
subPath: "src/dags"
sshKeySecret: airflow-ssh-key
# Logs e StatsD
config:
core:
logging_level: "DEBUG"
fab_logging_level: "DEBUG"
logging:
remote_logging: true
remote_base_log_folder: "wasb-airflow-logs"
remote_log_conn_id: "wasb_default"
metrics:
statsd_on: true
statsd_port: 8125
logs:
persistence:
enabled: false
Subindo suas alterações
Para subir todas essas customizações, basta você rodar o seguinte comando:
$ helm upgrade airflow apache-airflow/airflow -n airflow-dev -f airflow-deploy/values.yaml
E é isso! Agora você tem uma aplicação super massa com muita coisa customizada de pé. Comemore! 🚀
Extra: Criando uma conexão com o seu Storage
Como comentei durante o tópico anterior, vamos ver como fazer para autenticar suas credenciais no Storage Account da Azure. A forma mais fácil de fazer isso é pela Web UI. Como você provavelmente ainda não setou um Ingress, para que você possa visualizar a web UI, faça um port-forward
no container da web. Você pode fazer isso com o seguinte comando:
$ kubectl port-forward svc/airflow-webserver 8080:8080 --namespace airflow-dev
Com isso, vá ao seu localhost:8080
no seu navegador, logue com admin/admin
e faça o seguinte caminho:
Admin > Connections > +
Aí defina o seguinte:
Conn Type: Azure Blob Storage
Blob Storage Login: <seu-storage-account>
Blob Storage Shared Access Key: <sua-chave-de-acesso>
Essa é uma das formas de se autenticar no Blob, a outra pode ser a própria string de conexão. Por padrão os logs irão para o seu Storage em um container chamado "airflow-logs", dentro da pasta que você definiu ali em cima.
Concluindo
Se você conseguiu acompanhar até aqui, ótimo. Acho que esse post pode ser muito útil para muitas pessoas que, como eu, podem ficar por dias travadas em alguns pontos que deveriam ser simples.
Ainda existem algumas coisas bem importantes para você fazer antes de considerar que possui uma instância em produção, como, por exemplo:
- Configurar o Ingress e acessar a web UI através de um DNS
- Configurar o PgBouncer para controlar o número de conexões abertas no seu banco de dados pelo Airflow
- Dimensionar corretamente o seu cluster
- Alocar os componentes em diferentes nós através de
nodeSelectors
- Criar a mesma instância em um ambiente isolado e vincular as DAGs a uma branch diferente no seu repositório
- Construir pipelines de dados que entreguem muito valor para o seu negócio =)
Deletando os recursos
Se você quer fazer o deploy com uma fernetKey diferente, ou mesmo pausar a utilização de recursos, você pode deletar tudo o que foi feito com um simples comando:
$ helm delete airflow -n airflow
Espero que tenha sido uma boa leitura e, caso tenha alguma dúvida, me avisa aqui nos comentários.
Um abraço!
Top comments (8)
Queria te agradecer muito por esse artigo, principalmente a parte da integração com o datadog !
Realmente é algo que a documentação não contempla e que eu estava procurando
fico feliz que tenha ajudado de alguma forma! :D
Murilo, muito bom seu post. Porém, poderia me esclarecer como eu conectaria a um bucket minio do tipo s3 subido localmente no lugar de um azure blob storage para poder ver os logs das DAGs na web UI? Estou com bastante dificuldades em fazer isso, se você tiver algum link que possa me dar uma luz também ajudaria. Obg!
mano que massa que te ajudei com algo! desculpa a demora pra ver tua mensagem, vc conseguiu resolver? qqr coisa diz aí que eu tento dar uma olhada pra ti amanha cedo
Opa, acabei conseguindo sim! Foi uma luta mas consegui fazer as coisas do jeito que precisa.
Bom dia a todos. Hoje ja possuo o airflow com os fluxox de trabalho normalmente, porem preciso acrescentar um disco para workers do Airflow (Mount Path) via helm e nao sei por onde começar. Alguem pode dar uma luz por favor ? Obs: O motivo do disco é porque as dags/scripts estao gerando muitos arquivos e preciso de um disco pra isso.
fala Kayan, desculpe a demora pra te responder, tô com as notificações estranhas no Dev.
O fluxo de trabalho que costumei trabalhar com o Airflow consistia em ler e escrever sempre do lake para o lake (usávamos o Azure Data Lake, mas pode ser o GCS, s3, qualquer um). Acho que costuma ser o mais indicado. Se tiver alguma restrição quanto a isso no teu ambiente, dá uma olhada nos values do chart que você vai achar as opções para fazer o mount com os PV e PVCs github.com/apache/airflow/blob/mai...
Boa tarde a todos.
Estou precisando de ajuda para tentar resolver um problema.
Estou fazendo uso das seguintes tecnologias: Kubernetes, terraform, Azure, Devops, airflow, airbyte, helm-chart e argo-CD.
Estou precisando de ajuda pois ao publicar uma dag no airflow ela apresenta erro (ModuleNotFoundError: No module named 'airflow.providers.airbyte'). Fui pesquisar e identifiquei que o problema era relacionado a falta do provider do airbyte. Já tentei instalar esse provider de todas as formas mas não estou tendo resultado.
Se alguem quiser testar segue o git (github.com/janpaulosalvador00/infr...).
Agradeço muito a quem puder ajudar a resolver.