O Apache Airflow é uma plataforma desenvolvida pela comunidade para criar, agendar e monitorar fluxos de trabalho, tudo feito programaticamente. Com ela, os pipelines do Airflow são definidos em Python, permitindo a geração dinâmica de pipeline, sem sair da sintaxe que já conhecemos 🐍.
Saber desenvolver pipeline de dados com o Apache Airflow é um requisito mais do que essencial caso você almege uma carreira em engenharia de dados. Portanto, caso queira saber mais sobre essa poderosa ferramenta, continue lendo 😉.
Neste artigo irei ensinar como desenvolver uma pipeline de dados para extrair os famosos monstrinhos de bolso, os pokemons, da API PokeAPI. Depois da extração será aplicado algumas transformações bem simples nos dados para que, por fim, possam ser salvos localmente, simulando o carregamento dos dados em um data warehouse.
O código completo desenvolvido neste artigo pode ser conferido em meu GitHub 🤖.
Desenvolvimento baseado em conteinerização via Docker Compose
Existe muitas maneiras de instalar o Airflow em sua máquina. Todas elas podem ser conferidas na própria documentação oficial.
Para este artigo, no entanto, iremos desenvolver nossa pipeline baseada em conteinerização via Docker Compose.
Caso não tenha o Docker instalado, confira o guia de instalação na documentação.
O arquivo YML que irá subir nosso cluster pode ser baixado aqui, diretamente da página oficial do Airflow. Nenhuma mudança no arquivo é necessária para este projeto 💚.
O download deste arquivo é necessário para as próximas etapas.
Inicializando o cluster
Primeiro, vamos criar um diretório chamado pokemonsflow e adicionar o arquivo docker-compose.yml.
Depois disso, abra o terminal no diretório e digite os seguintes comandos para inicializar o cluster do Airflow:
$ docker-compose up airflow-init
$ docker-compose up -d
$ docker-compose ps
Com isso, seu cluster do Airflow já está ativo e o ambiente está pronto para o desenvolvimento 🚀.
Note que agora seu diretório pokemonsflow possue três novos subdiretórios:
dags/
logs/
plugins/
docker-compose.yml
Para finalizar, crie um subdiretório chamado data no diretório dags. Este diretório é onde ficará salvo os dados extraídos pela pipeline:
$ mkdir dags/data
Desenvolvimento da pipeline de dados
Agora que nosso ambiente de desenvolvimento via Docker já está inicializado, podemos começar a dar os primeiros passos na estruturação de nosso DAG 🎆.
Antes de tudo, crie um arquivo chamado pokemonsflow_dag.py no subdiretório dags. Note que o sufixo _dag no arquivo é necessário para o Airflow reconhecer automaticamente nosso DAG 😉:
$ cd dags
$ touch pokemonsflow_dag.py
Depois disso, adicione o seguinte código no arquivo:
import pandas as pd
import pendulum
import requests
from airflow.decorators import dag, task
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2022, 1, 1, tz='UTC'),
catchup=False
)
def pokemonsflow_dag():
@task
def extract() -> list:
pass
@task
def transform(pokemons: list) -> list:
pass
@task
def load(pokemons: list):
pass
dag = pokemonsflow_dag()
Com isso já temos a estrutura inicial de nosso DAG. A biblioteca requests e pandas são necessárias para as tarefas de extração e transformação dos dados.
No DAG temos três tarefas principais, onde:
- Extract: irá fazer a extração de vinte pokemons da PokeAPI;
- Transform: irá selecionar apenas cinco campos dos pokemons extraídos, e ordená-los de forma decrescente pelo campo base_experience;
- Load: por fim, esta task irá salvar os dados transformados no subdiretório /dags/data/, no formato CSV;
Como pôde perceber, tanto a task transform quanto a task load dependem dos dados extraídos ou transformados pela task anterior. Para que a transição dos dados seja feita entre as tasks, o Airflow usa um mecanismo interno chamado XComs, uma abreviação para Comunicação Cruzada.
Antes do Airflow 2.0, o compartilhamento de dados entre tasks usando XComs era um tanto... verbosa. Contudo, com a chegada do Airflow 2.0, podemos compartilhar os dados entre tasks apenas passando eles como se fossem parâmetros de funções. Simples, não? ☺
Agora vamos desenvolver individualmente cada task de nosso DAG.
Extração de dados
A extração de pokemons da API é feita através das seguintes etapas:
- Faz uma chamada GET no endpoint /api/v2/pokemon, com o parâmetro limit=20 para restringirmos os resultados. O resultado será um json com o campo result, semelhante a este:
[
{
'name': 'bulbasaur',
'url': 'https://pokeapi.co/api/v2/pokemon/1/'
},
{
'name': 'ivysaur',
'url': 'https://pokeapi.co/api/v2/pokemon/2/'
},
{
'name': 'venusaur',
'url': 'https://pokeapi.co/api/v2/pokemon/3/'
},
{
'name': 'charmander',
'url': 'https://pokeapi.co/api/v2/pokemon/4/'
},
...
]
- Depois, acessamos o campo results e fazemos uma chamada GET para cada uma das URLS. O resultado será uma lista com os vinte pokemons extraídos da API;
Mãos à obra! Adicione o seguinte código dentro da função extract dentro de nosso DAG, não esquecendo, é claro, de remover a palavra reservada pass:
@task
def extract() -> list:
url = 'http://pokeapi.co/api/v2/pokemon'
params = {
'limit': 20
}
response = requests.get(url=url, params=params)
json_response = response.json()
results = json_response['results']
pokemons = [requests.get(url=result['url']).json()
for result in results]
return pokemons
Parabéns! A task para extração dos vinte pokemons da API está concluída 🎉.
Transformação dos dados
Caso faça a requisição do endpoint via Postman, por exemplo, você vai notar que cada pokemon contém inúmeros campos, tais como forms, stats, ETC. Contudo, iremos pegar apenas cinco campos desse json.
As etapas da task transform são as seguintes:
- Pega os dados extraídos pela task anterior, isto é, a task extract;
- Cria um DataFrame do Pandas com os dados e seleciona cinco colunas, descarrtando as demais;
- Ordena os dados do DataFrame pela coluna base_experience, de forma decrescente;
- Converte o DataFrame para uma lista de dicionários do Python, para que os dados possam ser transferidos para a task posterior;
Agora, mãos à obra, de novo! Adicione o seguinte código dentro da função transform dentro de nosso DAG, não esquecendo, é claro, de remover a palavra reservada pass:
@task
def transform(pokemons: list) -> list:
columns = [
'name',
'order',
'base_experience',
'height',
'weight'
]
df = pd.DataFrame(data=pokemons, columns=columns)
df = df.sort_values(['base_experience'], ascending=False)
pokemons = df.to_dict('records')
return pokemons
Feito! A task para a transformação dos dados está concluída 🎉.
Carregamento dos dados
Por fim, só o que nos resta é desenvolver a task load. As etapas desta task são:
- Pega os dados transformados pela task anterior;
- Cria um DataFrame do Pandas com os dados transformados;
- Salva os dados do DataFrame no diretório /dags/data/, no formato CSV;
Adicione o seguinte código dentro da função load dentro de nosso DAG, não esquecendo, é claro, de remover a palavra reservada pass:
@task
def load(pokemons: list):
df = pd.DataFrame(data=pokemons)
df.to_csv('./dags/data/pokemons_dataset.csv', index=False)
Parabéns! Todas as tasks de nosso DAG foram desenvolvidas 🎉.
Orquestração das tarefas e transferência de dados
Calma, ainda não acabou! Antes de finalizar, precisamos dizer ao Airflow a ordem para executar essas tarefas, e transferir os dados de uma task para a outra. Abaixo das funções, adicione o seguinte código:
# ETL pipeline
# extract
extracted_pokemons = extract()
# transform
transformed_pokemons = transform(pokemons=extracted_pokemons)
# load
load(pokemons=transformed_pokemons)
Agora sim acabamos 👏🏼. Note que a transferência dos dados entre as tasks é feito passando-os como parâmetros das funções. Muito, muito simples!
O código final de nosso DAG ficou assim:
import pandas as pd
import pendulum
import requests
from airflow.decorators import dag, task
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2022, 1, 1, tz='UTC'),
catchup=False
)
def pokemonsflow_dag():
@task
def extract() -> list:
url = 'http://pokeapi.co/api/v2/pokemon'
params = {
'limit': 20
}
response = requests.get(url=url, params=params)
json_response = response.json()
results = json_response['results']
pokemons = [requests.get(url=result['url']).json()
for result in results]
return pokemons
@task
def transform(pokemons: list) -> list:
columns = [
'name',
'order',
'base_experience',
'height',
'weight'
]
df = pd.DataFrame(data=pokemons, columns=columns)
df = df.sort_values(['base_experience'], ascending=False)
pokemons = df.to_dict('records')
return pokemons
@task
def load(pokemons: list):
df = pd.DataFrame(data=pokemons)
df.to_csv('./dags/data/pokemons_dataset.csv', index=False)
# ETL pipeline
# extract
extracted_pokemons = extract()
# transform
transformed_pokemons = transform(pokemons=extracted_pokemons)
# load
load(pokemons=transformed_pokemons)
dag = pokemonsflow_dag()
Testano o DAG
Já podemos testar se nosso DAG está funcionando conforme o esperado. Para isso, vá até o terminal que está aberto na raiz do diretório e digite os seguintes comandos:
$ docker-compose exec airflow-worker bash
$ airflow dags test pokemonsflow_dag 2022-01-01
Quando a execução do DAG for finalizada, vá até o diretório /dags/data/ e confira o arquivo pokemons_dataset.csv. Todos os vinte pokemons estarão ordenados conforme a coluna base_experience.
Considerações finais
Como vimos, o Apache Airflow é uma poderosa ferramenta para a orquestração de tarefas de uma pipeline de dados. Não esqueça de conferir as outras inúmeras funcionalidades do Airflow 😉.
Neste artigo ensinei como desenvolver um DAG para extração de vinte pokemons da API PokeAPI. Foi aplicado algumas transformações bem simples nos dados antes de serem salvos localmente em formato CSV.
Se você gostou desse artigo, não esqueça de curtir e compartilhar nas redes sociais 💚.
Até a próxima!
Top comments (0)